From 1bf127d39bc44e1e67f6f83d858a17b6fc7b1825 Mon Sep 17 00:00:00 2001 From: Shadowfacts Date: Mon, 8 Jul 2019 22:41:18 -0400 Subject: [PATCH] Start pipeline system --- lib/frenzy/feed.ex | 10 +- lib/frenzy/filter_engine.ex | 33 ------ lib/frenzy/pipeline/filter_stage.ex | 61 ++++++++++ lib/frenzy/pipeline/scrape_stage.ex | 58 ++++++++++ lib/frenzy/pipeline/stage.ex | 3 + lib/frenzy/pipeline_stage.ex | 20 ++++ lib/frenzy/update_feeds.ex | 108 +++++++----------- .../20190708234319_create_pipeline_stages.exs | 14 +++ 8 files changed, 207 insertions(+), 100 deletions(-) delete mode 100644 lib/frenzy/filter_engine.ex create mode 100644 lib/frenzy/pipeline/filter_stage.ex create mode 100644 lib/frenzy/pipeline/scrape_stage.ex create mode 100644 lib/frenzy/pipeline/stage.ex create mode 100644 lib/frenzy/pipeline_stage.ex create mode 100644 priv/repo/migrations/20190708234319_create_pipeline_stages.exs diff --git a/lib/frenzy/feed.ex b/lib/frenzy/feed.ex index e899adb..d4f3c7f 100644 --- a/lib/frenzy/feed.ex +++ b/lib/frenzy/feed.ex @@ -41,6 +41,7 @@ defmodule Frenzy.Feed do has_many :items, Frenzy.Item, on_delete: :delete_all has_one :filter, Frenzy.Filter, on_delete: :delete_all + has_many :pipeline_stages, Frenzy.PipelineStage, on_delete: :delete_all timestamps() end @@ -48,7 +49,14 @@ defmodule Frenzy.Feed do @doc false def changeset(feed, attrs) do feed - |> cast(attrs, [:title, :feed_url, :site_url, :last_updated, :filter_enabled, :scrape_remote_content]) + |> cast(attrs, [ + :title, + :feed_url, + :site_url, + :last_updated, + :filter_enabled, + :scrape_remote_content + ]) |> cast_assoc(:filter, required: true) |> validate_required([:feed_url, :filter]) end diff --git a/lib/frenzy/filter_engine.ex b/lib/frenzy/filter_engine.ex deleted file mode 100644 index ffcc700..0000000 --- a/lib/frenzy/filter_engine.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Frenzy.FilterEngine do - def matches?(item, filter) do - score = - filter.rules - |> Enum.map(fn rule -> score(item, rule) end) - |> Enum.sum() - - score >= filter.score - end - - def score(item, rule) do - prop_value = get_property(item, rule.property) - - if matches(prop_value, rule.mode, rule.param) do - rule.weight - else - 0 - end - end - - def matches(value, "contains_string", param) do - String.contains?(value, param) - end - - def matches(value, "matches_regex", param) do - regex = Regex.compile(param) - String.match?(value, regex) - end - - def get_property(item, "url"), do: item.url - def get_property(item, "title"), do: item.title - def get_property(item, "author"), do: item.author -end diff --git a/lib/frenzy/pipeline/filter_stage.ex b/lib/frenzy/pipeline/filter_stage.ex new file mode 100644 index 0000000..292cc0b --- /dev/null +++ b/lib/frenzy/pipeline/filter_stage.ex @@ -0,0 +1,61 @@ +defmodule Frenzy.Pipeline.FilterStage do + require Logger + alias Frenzy.Pipeline.Stage + @behaviour Stage + + @impl Stage + def apply(%{"mode" => mode, "score" => score, "rules" => rules}, item_params) + when is_binary(mode) and is_integer(score) and is_list(rules) do + item_score = + rules + |> Enum.map(fn rule -> test(rule, item_params) end) + |> Enum.sum() + + matches = item_score >= score + + case {mode, matches} do + {"accept", true} -> + {:ok, item_params} + + {"reject", false} -> + {:ok, item_params} + + _ -> + Logger.debug("Skipping item #{item_params.url} due to feed filter") + :tombstone + end + end + + def matches?(opts, item_params) do + Logger.warn("Received invalid filter opts: #{opts}") + false + end + + defp test( + %{"mode" => mode, "property" => property, "param" => param, "weight" => weight}, + item_params + ) do + with prop_value <- get_property(item_params, property), + true <- is_binary(prop_value), + true <- matches(prop_value, mode, param) do + weight + else + _ -> + 0 + end + end + + def matches(value, "contains_string", param) do + String.contains?(value, param) + end + + def matches(value, "matches_regex", param) do + regex = Regex.compile(param) + String.match?(value, regex) + end + + defp get_property(item_params, "url"), do: item_params.url + defp get_property(item_params, "title"), do: item_params.title + defp get_property(item_params, "author"), do: item_params.author + defp get_property(_item_params, _property), do: {:error, "invalid property"} +end diff --git a/lib/frenzy/pipeline/scrape_stage.ex b/lib/frenzy/pipeline/scrape_stage.ex new file mode 100644 index 0000000..d29d833 --- /dev/null +++ b/lib/frenzy/pipeline/scrape_stage.ex @@ -0,0 +1,58 @@ +defmodule Frenzy.Pipeline.ScrapeStage do + require Logger + alias Frenzy.Pipeline.Stage + @behaviour Stage + + @impl Stage + def apply(_opts, %{url: url} = item_params) do + case get_article_content(url) do + {:ok, content} -> + {:ok, %{item_params | content: content}} + + {:error, reason} -> + Logger.warn("Unable to get article content: #{reason}") + item_params + end + end + + defp get_article_content(url) when is_binary(url) and url != "" do + Logger.debug("Getting article from #{url}") + + url + |> HTTPoison.get() + |> case do + {:ok, response} -> + handle_response(url, response) + + {:error, %HTTPoison.Error{reason: reason}} -> + {:error, "HTTPoison error: #{reason}"} + end + end + + defp get_article_content(_url), do: {:error, "URL must be a non-empty string"} + + defp handle_response(_url, %HTTPoison.Response{status_code: 200, body: body}) do + article = Readability.article(body) + {:ok, Readability.readable_html(article)} + end + + defp handle_response(_url, %HTTPoison.Response{status_code: 404}) do + {:error, "404 not found"} + end + + defp handle_response(url, %HTTPoison.Response{status_code: status_code, headers: headers}) + when status_code in [301, 302] do + {"Location", new_url} = Enum.find(headers, fn {name, _value} -> name == "Location" end) + + Logger.debug("Got 301 redirect from #{url} to #{new_url}") + get_article_content(new_url) + end + + defp handle_response(_url, %HTTPoison.Response{status_code: 403}) do + {:error, "403 Forbidden"} + end + + defp handle_response(_url, %HTTPoison.Response{status_code: status_code} = response) do + {:error, "No handler for response #{inspect(response)}"} + end +end diff --git a/lib/frenzy/pipeline/stage.ex b/lib/frenzy/pipeline/stage.ex new file mode 100644 index 0000000..e1208a0 --- /dev/null +++ b/lib/frenzy/pipeline/stage.ex @@ -0,0 +1,3 @@ +defmodule Frenzy.Pipeline.Stage do + @callback apply(Map.t(), Map.t()) :: {:ok, Map.t()} | :tombstone | {:error, String.t()} +end diff --git a/lib/frenzy/pipeline_stage.ex b/lib/frenzy/pipeline_stage.ex new file mode 100644 index 0000000..483d8eb --- /dev/null +++ b/lib/frenzy/pipeline_stage.ex @@ -0,0 +1,20 @@ +defmodule Frenzy.PipelineStage do + use Ecto.Schema + import Ecto.Changeset + + schema "pipeline_stages" do + field :index, :integer + field :module_name, :string + field :options, :map + + belongs_to :feed, Frenzy.Feed + + timestamps() + end + + def changeset(stage, attrs) do + stage + |> cast(attrs, [:index, :module_name, :options]) + |> validate_required([:index, :module_name]) + end +end diff --git a/lib/frenzy/update_feeds.ex b/lib/frenzy/update_feeds.ex index d519442..abb802e 100644 --- a/lib/frenzy/update_feeds.ex +++ b/lib/frenzy/update_feeds.ex @@ -123,59 +123,66 @@ defmodule Frenzy.UpdateFeeds do Logger.debug("Creating item for #{url}") - content = - if feed.scrape_remote_content do - case get_article_content(url) do - {:ok, content} -> - content - - {:err, reason} -> - Logger.warn("Unable to fetch article for #{url}: #{inspect(reason)}") - entry.description - end - else - entry.description - end - item_params = %{ guid: entry.id, title: entry.title, url: url, date: parse_date(entry.published_at), creator: "", - content: content + content: entry.description } + feed = Repo.preload(feed, :pipeline_stages) + result = - if feed.filter_enabled do - case {feed.filter.mode, FilterEngine.matches?(item_params, feed.filter)} do - {"accept", true} -> - :store + feed.pipeline_stages + |> Enum.sort_by(& &1.index) + |> Enum.reduce({:ok, item_params}, fn + stage, {:ok, item_params} -> + apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [ + stage.options, + item_params + ]) - {"reject", false} -> - :store + _stage, :tombstone -> + :tombstone - _ -> - Logger.debug("Skipping item #{url} due to feed filter") - :tombstone + _stage, {:error, _error} = error -> + error + end) + + case result do + {:err, error} -> + Logger.error(error) + + {:ok, item_params} -> + changeset = Ecto.build_assoc(feed, :items, item_params) + + case Repo.insert(changeset) do + {:ok, item} -> + item + + {:error, changeset} -> + Logger.error("Error inserting item #{entry.guid}") + Logger.error(changeset) end - else - :store - end - changeset = - case result do - :store -> - Ecto.build_assoc(feed, :items, item_params) - - :tombstone -> + :tombstone -> + changeset = Ecto.build_assoc(feed, :items, %{ guid: entry.id, tombstone: true }) - end - Repo.insert(changeset) + case Repo.insert(changeset) do + {:ok, item} -> + item + + {:error, changeset} -> + Logger.error("Error inserting tombstone for #{entry.guid}") + Logger.error(changeset) + end + end end defp parse_date(str) do @@ -203,35 +210,4 @@ defmodule Frenzy.UpdateFeeds do link.href end end - - defp get_article_content(url) when is_binary(url) and url != "" do - Logger.debug("Getting article from #{url}") - - case HTTPoison.get(url) do - {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> - article = Readability.article(body) - {:ok, Readability.readable_html(article)} - - {:ok, %HTTPoison.Response{status_code: 404}} -> - {:err, "404 not found"} - - {:ok, %HTTPoison.Response{status_code: status_code, headers: headers}} - when status_code in [301, 302] -> - {"Location", new_url} = - Enum.find(headers, fn {name, _value} -> - name == "Location" - end) - - Logger.debug("Got 301 redirect from #{url} to #{new_url}") - get_article_content(new_url) - - {:ok, %HTTPoison.Response{status_code: 403}} -> - {:err, "403 Forbidden"} - - {:error, %HTTPoison.Error{reason: reason}} -> - {:err, reason} - end - end - - defp get_article_content(_url), do: {:err, "URL must be a non-empty string"} end diff --git a/priv/repo/migrations/20190708234319_create_pipeline_stages.exs b/priv/repo/migrations/20190708234319_create_pipeline_stages.exs new file mode 100644 index 0000000..ecef738 --- /dev/null +++ b/priv/repo/migrations/20190708234319_create_pipeline_stages.exs @@ -0,0 +1,14 @@ +defmodule Frenzy.Repo.Migrations.CreatePipelineStages do + use Ecto.Migration + + def change do + create table(:pipeline_stages) do + add :index, :integer + add :module_name, :string + add :options, :jsonb, null: false, default: "{}" + add :feed_id, references(:feeds, on_delete: :delete_all) + + timestamps() + end + end +end