Make pipelines not tied directly to feeeds

Allows using the same pipeline for multiple different feeds
This commit is contained in:
Shadowfacts 2019-11-08 22:27:46 -05:00
parent f84d849432
commit 0d0c749b68
Signed by: shadowfacts
GPG Key ID: 94A5AB95422746E5
18 changed files with 377 additions and 226 deletions

View File

@ -23,24 +23,27 @@ defmodule Frenzy.CreateItemTask do
content: entry.content
}
feed = Repo.preload(feed, :pipeline_stages)
feed = Repo.preload(feed, :pipeline)
result =
feed.pipeline_stages
|> Enum.sort_by(& &1.index)
|> Enum.reduce({:ok, item_params}, fn
stage, {:ok, item_params} ->
apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [
stage.options,
item_params
])
if feed.pipeline do
feed.pipeline.stages
|> Enum.reduce({:ok, item_params}, fn
stage, {:ok, item_params} ->
apply(String.to_existing_atom("Elixir." <> stage["module_name"]), :apply, [
stage["options"],
item_params
])
_stage, :tombstone ->
:tombstone
_stage, :tombstone ->
:tombstone
_stage, {:error, _error} = error ->
error
end)
_stage, {:error, _error} = error ->
error
end)
else
{:ok, item_params}
end
case result do
{:err, error} ->

View File

@ -36,9 +36,9 @@ defmodule Frenzy.Feed do
field :title, :string
belongs_to :group, Frenzy.Group
belongs_to :pipeline, Frenzy.Pipeline
has_many :items, Frenzy.Item, on_delete: :delete_all
has_many :pipeline_stages, Frenzy.PipelineStage, on_delete: :delete_all
timestamps()
end
@ -51,8 +51,8 @@ defmodule Frenzy.Feed do
site_url: String.t() | nil,
title: String.t() | nil,
group: Frenzy.Group.t() | Ecto.Association.NotLoaded.t(),
pipeline: Frenzy.Pipeline.t() | nil | Ecto.Association.NotLoaded.t(),
items: [Frenzy.Item.t()] | Ecto.Association.NotLoaded.t(),
pipeline_stages: [Frenzy.PipelineStage.t()] | Ecto.Association.NotLoaded.t(),
inserted_at: NaiveDateTime.t(),
updated_at: NaiveDateTime.t()
}
@ -64,7 +64,8 @@ defmodule Frenzy.Feed do
:title,
:feed_url,
:site_url,
:last_updated
:last_updated,
:pipeline_id
])
|> validate_required([:feed_url])
end

29
lib/frenzy/pipeline.ex Normal file
View File

@ -0,0 +1,29 @@
defmodule Frenzy.Pipeline do
use Ecto.Schema
import Ecto.Changeset
schema "pipelines" do
field :name, :string
field :stages, {:array, :map}
belongs_to :user, Frenzy.User
has_many :feeds, Frenzy.Feed
timestamps()
end
@type t() :: %__MODULE__{
__meta__: Ecto.Schema.Metadata.t(),
id: integer() | nil,
name: String.t(),
stages: [map()],
user: Frenzy.User.t() | Ecto.Association.NotLoaded.t(),
feeds: [Frenzy.Feed.t()] | Ecto.Association.NotLoaded.t()
}
def changeset(pipeline, attrs) do
pipeline
|> cast(attrs, [:name, :stages, :user_id])
|> validate_required([:name, :stages, :user_id])
end
end

View File

@ -1,31 +0,0 @@
defmodule Frenzy.PipelineStage do
use Ecto.Schema
import Ecto.Changeset
schema "pipeline_stages" do
field :index, :integer
field :module_name, :string
field :options, :map
belongs_to :feed, Frenzy.Feed
timestamps()
end
@type t() :: %__MODULE__{
__meta__: Ecto.Schema.Metadata.t(),
id: integer() | nil,
index: integer(),
module_name: String.t(),
options: map(),
feed: Frenzy.Feed.t() | Ecto.Association.NotLoaded.t(),
inserted_at: NaiveDateTime.t(),
updated_at: NaiveDateTime.t()
}
def changeset(stage, attrs) do
stage
|> cast(attrs, [:index, :module_name, :options])
|> validate_required([:index, :module_name])
end
end

View File

@ -1,6 +1,6 @@
defmodule FrenzyWeb.FeedController do
use FrenzyWeb, :controller
alias Frenzy.{Repo, Group, Feed, Item}
alias Frenzy.{Repo, Group, Feed, Item, Pipeline}
alias FrenzyWeb.Router.Helpers, as: Routes
alias FrenzyWeb.Endpoint
import Ecto.Query
@ -67,12 +67,17 @@ defmodule FrenzyWeb.FeedController do
end
def edit(conn, _params) do
feed = conn.assigns[:feed] |> Repo.preload(:pipeline_stages)
stages = Enum.sort_by(feed.pipeline_stages, fn stage -> stage.index end)
feed = conn.assigns[:feed]
edit_changeset = Feed.changeset(feed, %{})
pipelines =
Repo.all(Pipeline)
|> Enum.map(fn pipeline -> {pipeline.name, pipeline.id} end)
render(conn, "edit.html", %{
feed: feed,
stages: stages
changeset: edit_changeset,
pipelines: [{"No Pipeline", nil} | pipelines]
})
end

View File

@ -1,21 +1,20 @@
defmodule FrenzyWeb.PipelineController do
use FrenzyWeb, :controller
alias Frenzy.{Repo, Feed, PipelineStage}
alias Frenzy.{Repo, Pipeline}
alias FrenzyWeb.Router.Helpers, as: Routes
alias FrenzyWeb.Endpoint
import Ecto.Query
plug :user_owns_feed
plug :user_owns_stage
plug :user_owns_pipeline
defp user_owns_feed(%Plug.Conn{path_params: %{"feed_id" => feed_id}} = conn, _opts) do
defp user_owns_pipeline(%Plug.Conn{path_params: %{"id" => pipeline_id}} = conn, _opts) do
user = conn.assigns[:user]
feed = Repo.get(Feed, feed_id) |> Repo.preload(:pipeline_stages)
pipeline = Repo.get(Pipeline, pipeline_id)
if Enum.any?(user.groups, fn g -> g.id == feed.group_id end) do
if pipeline.user_id == user.id do
conn
|> assign(:feed, feed)
|> assign(:pipeline, pipeline)
else
conn
|> put_flash(:error, "You do not have permission to access that resource.")
@ -24,133 +23,137 @@ defmodule FrenzyWeb.PipelineController do
end
end
defp user_owns_feed(conn, _opts), do: conn
defp user_owns_pipeline(conn, _opts), do: conn
defp user_owns_stage(%Plug.Conn{path_params: %{"stage_id" => stage_id}} = conn, _opts) do
feed = conn.assigns[:feed]
def index(conn, _params) do
user = conn.assigns[:user]
stage = Repo.get(PipelineStage, stage_id)
pipelines = Repo.all(from p in Pipeline, where: p.user_id == ^user.id, preload: [:feeds])
if stage.feed_id == feed.id do
conn
|> assign(:stage, stage)
else
conn
|> put_flash(:error, "You do not have permission to access that resource.")
|> redirect(to: Routes.group_path(Endpoint, :index))
|> halt()
end
end
defp user_owns_stage(conn, _opts), do: conn
def edit(conn, %{"stage_id" => stage_id}) do
feed = conn.assigns[:feed]
stage = conn.assigns[:stage]
{:ok, options_json} = Jason.encode(stage.options, pretty: true)
changeset =
PipelineStage.changeset(stage, %{
options: options_json
})
render(conn, "edit.html", %{
feed: feed,
stage: stage,
changeset: changeset
})
end
def update(conn, %{"pipeline_stage" => %{"options" => options_json}}) do
feed = conn.assigns[:feed]
stage = conn.assigns[:stage]
with {:ok, options} <- Jason.decode(options_json),
{:ok, options} <-
apply(String.to_existing_atom("Elixir." <> stage.module_name), :validate_opts, [
options
]) do
changeset = PipelineStage.changeset(stage, %{options: options})
{:ok, _stage} = Repo.update(changeset)
conn
|> put_flash(:info, "Pipeline Stage updated")
|> redirect(to: Routes.feed_path(Endpoint, :edit, feed.id))
else
result ->
error_changeset = PipelineStage.changeset(stage, %{options: options_json})
conn
|> put_flash(:error, "Unable to update pipeline stage: #{inspect(result)}")
|> render("edit.html", %{
feed: feed,
stage: stage,
changeset: error_changeset
})
end
render(conn, "index.html", %{pipelines: pipelines})
end
def new(conn, _params) do
feed = conn.assigns[:feed]
changeset =
PipelineStage.changeset(%PipelineStage{}, %{
index: feed.pipeline_stages |> Enum.count(),
options: "{}"
Pipeline.changeset(%Pipeline{}, %{
name: "",
stages: "[\n]"
})
render(conn, "new.html", %{
feed: feed,
changeset: changeset
render(conn, "new.html", %{changeset: changeset})
end
def create(conn, %{"pipeline" => %{"name" => name, "stages" => stages_json}}) do
user = conn.assigns[:user]
with {:ok, stages} <- Jason.decode(stages_json),
{:ok, stages} <- validate_pipeline(stages) do
changeset = Pipeline.changeset(%Pipeline{}, %{user_id: user.id, name: name, stages: stages})
{:ok, _pipeline} = Repo.insert(changeset)
conn
|> put_flash(:info, "Pipeline created")
|> redirect(to: Routes.pipeline_path(Endpoint, :index))
else
{:error, reason} ->
error_changeset = Pipeline.changeset(%Pipeline{}, %{name: name, stages: stages_json})
conn
|> put_flash(:error, "Unable to create pipeline: #{reason}")
|> render("new.html", %{changeset: error_changeset})
end
end
def show(conn, _params) do
pipeline = conn.assigns[:pipeline]
render(conn, "show.html", %{pipeline: pipeline})
end
def delete(conn, _params) do
conn.assigns[:pipeline]
|> Repo.delete()
redirect(conn, to: Routes.pipeline_path(Endpoint, :index))
end
def edit(conn, _params) do
pipeline = conn.assigns[:pipeline]
{:ok, stages_json} = Jason.encode(pipeline.stages, pretty: true)
render(conn, "edit.html", %{
pipeline: pipeline,
name: pipeline.name,
stages_json: stages_json
})
end
def create(conn, %{
"pipeline_stage" =>
%{
"index" => index,
"module_name" => module_name,
"options" => options_json
} = params
}) do
feed = conn.assigns[:feed]
def update(conn, %{"pipeline" => %{"name" => name, "stages" => stages_json}}) do
pipeline = conn.assigns[:pipeline]
with {index, _} <- Integer.parse(index),
module_atom <- String.to_existing_atom("Elixir." <> module_name),
{:ok, options} <- Jason.decode(options_json),
{:ok, options} <- apply(module_atom, :validate_opts, [options]) do
changeset =
Ecto.build_assoc(feed, :pipeline_stages, %{
index: index,
module_name: module_name,
options: options
})
with {:ok, stages} <- Jason.decode(stages_json),
{:ok, stages} <- validate_pipeline(stages) do
changeset = Pipeline.changeset(%Pipeline{}, %{name: name, stages: stages})
{:ok, _stage} = Repo.insert(changeset)
{:ok, _pipeline} = Repo.update(changeset)
conn
|> put_flash(:info, "Pipeline Stage created")
|> redirect(to: Routes.feed_path(Endpoint, :edit, feed.id))
|> put_flash(:info, "Pipeline edited")
|> redirect(to: Routes.pipeline_path(Endpoint, :show, pipeline.id))
else
result ->
error_changeset = PipelineStage.changeset(%PipelineStage{}, params)
{:error, reason} ->
conn
|> put_flash(:error, "Unable to create pipeline stage: #{inspect(result)}")
|> render("new.html", %{
feed: feed,
changeset: error_changeset
|> put_flash(:error, "Unable to edit pipeline: #{reason}")
|> render("edit.html", %{
pipeline: pipeline,
name: name,
stages_json: stages_json
})
end
end
def delete(conn, _params) do
feed = conn.assigns[:feed]
stage = conn.assigns[:stage]
{:ok, _stage} = Repo.delete(stage)
defp validate_pipeline(stages) do
stages
|> Enum.with_index()
|> Enum.reduce_while({:ok, []}, fn {stage, index}, {:ok, new_stages} ->
case validate_stage(stage) do
{:ok, stage} ->
{:cont, {:ok, new_stages ++ [stage]}}
conn
|> put_flash(:info, "Pipeline Stage deleted")
|> redirect(to: Routes.feed_path(Endpoint, :edit, feed.id))
{:error, reason} ->
{:error, "invalid stage at #{index}: #{reason}"}
end
end)
end
defp validate_stage(%{"module_name" => module_name, "options" => options}) do
with true <- module_exists(module_name),
{:ok, options} <-
apply(String.to_existing_atom("Elixir." <> module_name), :validate_opts, [
options
]) do
{:ok, %{"module_name" => module_name, "options" => options}}
else
false ->
{:error, "module #{module_name} does not exist"}
{:error, reason} ->
{:error, "invalid options for #{module_name}: #{reason}"}
end
end
defp validate_stage(_),
do: {:error, "pipeline stage must be a map with module_name and options"}
defp module_exists(module_name) do
try do
String.to_existing_atom("Elixir." <> module_name)
true
rescue
ArgumentError ->
false
end
end
end

View File

@ -52,13 +52,7 @@ defmodule FrenzyWeb.Router do
resources "/feeds", FeedController, except: [:index, :new]
post "/feeds/:id/refresh", FeedController, :refresh
# resources "/pipelines", PipelineController, only: [:edit, :update, :new, :create]
get "/feeds/:feed_id/pipelines/:stage_id/edit", PipelineController, :edit
put "/feeds/:feed_id/pipelines/:stage_id/update", PipelineController, :update
delete "/feeds/:feed_id/pipelines/:stage_id/delete", PipelineController, :delete
get "/feeds/:feed_id/pipelines/new", PipelineController, :new
post "/feeds/:feed_id/pipelines/create", PipelineController, :create
resources "/pipelines", PipelineController
resources "/items", ItemController, only: [:show]
post "/items/:id/read", ItemController, :read

View File

@ -1,29 +1,22 @@
<h2>Feed Pipeline</h2>
<table class="table table-striped mt-4">
<thead>
<tr>
<th>Module</th>
<th></th>
</tr>
</thead>
<tbody>
<%= for stage <- @stages do %>
<tr>
<td>
<code><%= stage.module_name %></code>
</td>
<td>
<a href="<%= Routes.pipeline_path(@conn, :edit, @feed.id, stage.id) %>" class="btn btn-primary btn-sm">
Edit
</a>
<%= form_for @conn, Routes.pipeline_path(@conn, :delete, @feed.id, stage.id), [method: :delete, style: "display: inline-block;"], fn f -> %>
<%= submit "Delete", class: "btn btn-danger btn-sm" %>
<% end %>
</td>
</tr>
<% end %>
</tbody>
</table>
<a href="<%= Routes.pipeline_path(@conn, :new, @feed.id) %>" class="btn btn-primary">Add Stage</a>
<%= form_for @changeset, Routes.feed_path(@conn, :update, @feed.id), fn f -> %>
<div class="form-group row">
<label class="col-sm-2 col-form-label" for="pipeline_id">Pipeline ID</label>
<div class="col-sm-10">
<%= select f, :pipeline_id, @pipelines, class: "custom-select" %>
</div>
</div>
<%= if @feed.pipeline_id do %>
<div class="form-group row">
<div class="col-sm-10">
<a href="<%= Routes.pipeline_path(@conn, :show, @feed.pipeline_id) %>" class="col-sm-2">View Pipeline</a>
</div>
</div>
<% end %>
<div class="form-group row">
<div class="col-sm-10">
<%= submit "Update Feed", class: "btn btn-primary" %>
</div>
</div>
<% end %>

View File

@ -1,3 +1,4 @@
<h1>Groups</h1>
<a href="<%= Routes.group_path(@conn, :new) %>" class="btn btn-primary">Add Group</a>
<table class="table table-striped mt-4">

View File

@ -15,11 +15,18 @@
<nav rol="navigation" class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container">
<a href="/" class="navbar-brand">Frenzy</a>
<button class="navbar-toggler" type="button" data-toggle="collapse" data-target="#navbarContent" aria-controls="navbarContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="" id="navbarContent">
<div class="collapse navbar-collapse" id="navbarContent">
<ul class="navbar-nav mr-auto">
<%= unless is_nil(@conn.assigns[:user]) do %>
<li class="nav-item"><a class="nav-link" href="<%= Routes.group_path(@conn, :index) %>">Groups</a></li>
<li class="nav-item"><a class="nav-link" href="<%= Routes.pipeline_path(@conn, :index) %>">Pipelines</a></li>
<% end %>
</ul>
<ul class="navbar-nav flex-row ml-md-auto">
<ul class="navbar-nav ml-md-auto">
<%= unless is_nil(@conn.assigns[:user]) do %>
<li class="nav-item"><a href="<%= Routes.account_path(@conn, :show) %>" class="nav-link">Account</a></li>
<li class="nav-item"><a href="<%= Routes.login_path(@conn, :logout) %>" class="nav-link">Log Out</a></li>
@ -29,7 +36,7 @@
</div>
</nav>
</header>
<main role="main" class="main mt-4">
<main role="main" class="main mt-2">
<div class="container">
<%= if get_flash(@conn, :info) do %>
<p class="alert alert-primary" role="alert"><%= get_flash(@conn, :info) %></p>

View File

@ -1,9 +1,16 @@
<h2>Pipeline Stage</h2>
<h3>Feed: <%= @feed.title %></h3>
<h3>Module: <code><%= @stage.module_name %></code></h3>
<%= form_for @changeset, Routes.pipeline_path(@conn, :update, @feed.id, @stage.id), fn f -> %>
<%= textarea f, :options, class: "form-control", rows: 15, style: "font-family: monospace;" %>
<%= submit "Update", class: "btn btn-primary mt-2" %>
<%= form_tag Routes.pipeline_path(@conn, :update, @pipeline.id), method: :put do %>
<div class="form-group row">
<label class="col-sm-2 col-form-label" for="name">Name</label>
<div class="col-sm-10">
<%= text_input :pipeline, :name, value: @name, placeholder: "My New Pipeline", class: "form-control" %>
</div>
</div>
<div class="form-group row">
<%= textarea :pipeline, :stages, value: @stages_json, class: "form-control", rows: 15, style: "font-family: monospace;" %>
</div>
<div class="form-group row">
<div class="col-sm-10">
<%= submit "Edit Pipeline", class: "btn btn-primary" %>
</div>
</div>
<% end %>

View File

@ -0,0 +1,27 @@
<h1>Pipelines</h1>
<a href="<%= Routes.pipeline_path(@conn, :new) %>" class="btn btn-primary">Add Pipeline</a>
<table class="table table-striped mt-4">
<thead>
<tr>
<th>Name</th>
<th>Stage Count</th>
<th>Feed Count</th>
</tr>
</thead>
<tbody>
<%= for pipeline <- @pipelines do %>
<tr>
<td>
<a href="/pipelines/<%= pipeline.id %>"><%= pipeline.name %></a>
</td>
<td>
<%= pipeline.stages |> Enum.count() %>
</td>
<td>
<%= pipeline.feeds |> Enum.count() %>
</td>
</tr>
<% end %>
</tbody>
</table>

View File

@ -1,27 +1,16 @@
<h2>Feed Pipeline</h2>
<%= form_for @changeset, Routes.pipeline_path(@conn, :create, @feed.id), fn f -> %>
<%= form_for @changeset, Routes.pipeline_path(@conn, :create), fn form -> %>
<div class="form-group row">
<label class="col-sm-2 col-form-label" for="module_name">Module Name</label>
<label class="col-sm-2 col-form-label" for="name">Name</label>
<div class="col-sm-10">
<%= text_input f, :module_name, placeholder: "Frenzy.Pipeline.FilterStage", class: "form-control" %>
<%= text_input form, :name, placeholder: "My New Pipeline", class: "form-control" %>
</div>
</div>
<div class="form-group row">
<label class="col-sm-2 col-form-label" for="index">Index</label>
<div class="col-sm-10">
<%= number_input f, :index, class: "form-control" %>
</div>
</div>
<div class="form-group row">
<label class="col-sm-2 col-form-label" for="options">Options</label>
<div class="col-sm-10">
<%= textarea f, :options, class: "form-control", rows: 15, style: "font-family: monospace;" %>
</div>
<%= textarea form, :stages, class: "form-control", rows: 15, style: "font-family: monospace;" %>
</div>
<div class="form-group row">
<div class="col-sm-10">
<%= submit "Create Stage", class: "btn btn-primary" %>
<%= submit "Create Pipeline", class: "btn btn-primary" %>
</div>
</div>
<% end %>

View File

@ -0,0 +1,29 @@
<h1><%= @pipeline.name %></h1>
<h2>Stages: <%= @pipeline.stages |> Enum.count() %></h2>
<a href="<%= Routes.pipeline_path(@conn, :edit, @pipeline.id) %>" class="btn btn-primary">Edit Pipeline</a>
<%= form_tag Routes.pipeline_path(@conn, :delete, @pipeline.id), method: :delete do %>
<%= submit "Delete Pipeline", class: "btn btn-danger mt-2" %>
<% end %>
<table class="table table-striped mt-4">
<thead>
<tr>
<th>Module Name</th>
<th>Options</th>
</tr>
</thead>
<tbody>
<%= for stage <- @pipeline.stages do %>
<tr>
<td>
<code><%= stage["module_name"] %></code>
</td>
<td>
<% {:ok, result} = Jason.encode(stage["options"], pretty: true) %>
<pre><%= result %></pre>
</td>
</tr>
<% end %>
</tbody>
</table>

View File

@ -0,0 +1,14 @@
defmodule Frenzy.Repo.Migrations.CreatePipelines do
use Ecto.Migration
def change do
create table(:pipelines) do
add :name, :string
add :stages, :jsonb, null: false, default: "[]"
add :user_id, references(:users)
timestamps()
end
end
end

View File

@ -0,0 +1,9 @@
defmodule Frenzy.Repo.Migrations.FeedsAddPipelineId do
use Ecto.Migration
def change do
alter table(:feeds) do
add :pipeline_id, references(:pipelines, on_delete: :nilify_all)
end
end
end

View File

@ -0,0 +1,64 @@
defmodule Frenzy.Repo.Migrations.FeedsConvertPipelineStagesToPipelines do
use Ecto.Migration
alias Frenzy.Repo
import Ecto.Query
def up do
pipeline_entries =
Repo.all(
from(stage in "pipeline_stages",
join: feed in "feeds",
on: stage.feed_id == feed.id,
join: group in "groups",
on: feed.group_id == group.id,
select: %{
index: stage.index,
module_name: stage.module_name,
options: stage.options,
feed_id: feed.id,
feed_title: feed.title,
user_id: group.user_id
}
)
)
|> Enum.group_by(fn data -> data.feed_id end)
|> Enum.map(fn {feed_id, [first | _] = pipeline_data} ->
feed_title = first.feed_title
user_id = first.user_id
stages =
pipeline_data
|> Enum.sort_by(& &1.index)
|> Enum.map(fn data ->
%{
module_name: data.module_name,
options: data.options
}
end)
{feed_id,
[
name:
if(feed_title,
do: "#{feed_title} Pipeline",
else: "Unknown Feed Pipeline (feed ID #{feed_id})"
),
stages: stages,
user_id: user_id,
inserted_at: NaiveDateTime.utc_now(),
updated_at: NaiveDateTime.utc_now()
]}
end)
|> Enum.each(fn {feed_id, pipeline_entry} ->
{_, [%{id: pipeline_id}]} =
Repo.insert_all("pipelines", [pipeline_entry], returning: [:id])
feed_query = from(f in "feeds", where: f.id == ^feed_id)
Repo.update_all(feed_query, set: [pipeline_id: pipeline_id])
end)
end
def down do
# we don't have a down migration for this, since we're not modifying any schemas, we're just converting the existing data from the pipeline_stages table to the new pipelines table and updating the corresponding feeds
end
end

View File

@ -0,0 +1,7 @@
defmodule Frenzy.Repo.Migrations.RemovePipelineStages do
use Ecto.Migration
def change do
drop table(:pipeline_stages)
end
end