Start pipeline system

This commit is contained in:
Shadowfacts 2019-07-08 22:41:18 -04:00
parent 24c942c5d2
commit 0a1909dbc4
Signed by: shadowfacts
GPG Key ID: 94A5AB95422746E5
8 changed files with 208 additions and 100 deletions

View File

@ -41,6 +41,7 @@ defmodule Frenzy.Feed do
has_many :items, Frenzy.Item, on_delete: :delete_all
has_one :filter, Frenzy.Filter, on_delete: :delete_all
has_many :pipeline_stages, Frenzy.PipelineStage, on_delete: :delete_all
timestamps()
end
@ -48,7 +49,14 @@ defmodule Frenzy.Feed do
@doc false
def changeset(feed, attrs) do
feed
|> cast(attrs, [:title, :feed_url, :site_url, :last_updated, :filter_enabled, :scrape_remote_content])
|> cast(attrs, [
:title,
:feed_url,
:site_url,
:last_updated,
:filter_enabled,
:scrape_remote_content
])
|> cast_assoc(:filter, required: true)
|> validate_required([:feed_url, :filter])
end

View File

@ -1,33 +0,0 @@
defmodule Frenzy.FilterEngine do
def matches?(item, filter) do
score =
filter.rules
|> Enum.map(fn rule -> score(item, rule) end)
|> Enum.sum()
score >= filter.score
end
def score(item, rule) do
prop_value = get_property(item, rule.property)
if matches(prop_value, rule.mode, rule.param) do
rule.weight
else
0
end
end
def matches(value, "contains_string", param) do
String.contains?(value, param)
end
def matches(value, "matches_regex", param) do
regex = Regex.compile(param)
String.match?(value, regex)
end
def get_property(item, "url"), do: item.url
def get_property(item, "title"), do: item.title
def get_property(item, "author"), do: item.author
end

View File

@ -0,0 +1,62 @@
defmodule Frenzy.Pipeline.FilterStage do
require Logger
alias Frenzy.Pipeline.Stage
@behaviour Stage
@impl Stage
def apply(%{"mode" => mode, "score" => score, "rules" => rules}, item_params)
when is_binary(mode) and is_integer(score) and is_list(rules) do
item_score =
rules
|> Enum.map(fn rule -> test(rule, item_params) end)
|> Enum.sum()
matches = item_score >= score
case {mode, matches} do
{"accept", true} ->
{:ok, item_params}
{"reject", false} ->
{:ok, item_params}
_ ->
Logger.debug("Skipping item #{item_params.url} due to feed filter")
:tombstone
end
end
@impl Stage
def apply(opts, item_params) do
Logger.warn("Received invalid filter opts: #{opts}")
{:ok, item_params}
end
defp test(
%{"mode" => mode, "property" => property, "param" => param, "weight" => weight},
item_params
) do
with prop_value <- get_property(item_params, property),
true <- is_binary(prop_value),
true <- matches(prop_value, mode, param) do
weight
else
_ ->
0
end
end
def matches(value, "contains_string", param) do
String.contains?(value, param)
end
def matches(value, "matches_regex", param) do
regex = Regex.compile(param)
String.match?(value, regex)
end
defp get_property(item_params, "url"), do: item_params.url
defp get_property(item_params, "title"), do: item_params.title
defp get_property(item_params, "author"), do: item_params.author
defp get_property(_item_params, _property), do: {:error, "invalid property"}
end

View File

@ -0,0 +1,58 @@
defmodule Frenzy.Pipeline.ScrapeStage do
require Logger
alias Frenzy.Pipeline.Stage
@behaviour Stage
@impl Stage
def apply(_opts, %{url: url} = item_params) do
case get_article_content(url) do
{:ok, content} ->
{:ok, %{item_params | content: content}}
{:error, reason} ->
Logger.warn("Unable to get article content: #{reason}")
item_params
end
end
defp get_article_content(url) when is_binary(url) and url != "" do
Logger.debug("Getting article from #{url}")
url
|> HTTPoison.get()
|> case do
{:ok, response} ->
handle_response(url, response)
{:error, %HTTPoison.Error{reason: reason}} ->
{:error, "HTTPoison error: #{reason}"}
end
end
defp get_article_content(_url), do: {:error, "URL must be a non-empty string"}
defp handle_response(_url, %HTTPoison.Response{status_code: 200, body: body}) do
article = Readability.article(body)
{:ok, Readability.readable_html(article)}
end
defp handle_response(_url, %HTTPoison.Response{status_code: 404}) do
{:error, "404 not found"}
end
defp handle_response(url, %HTTPoison.Response{status_code: status_code, headers: headers})
when status_code in [301, 302] do
{"Location", new_url} = Enum.find(headers, fn {name, _value} -> name == "Location" end)
Logger.debug("Got 301 redirect from #{url} to #{new_url}")
get_article_content(new_url)
end
defp handle_response(_url, %HTTPoison.Response{status_code: 403}) do
{:error, "403 Forbidden"}
end
defp handle_response(_url, %HTTPoison.Response{status_code: status_code} = response) do
{:error, "No handler for response #{inspect(response)}"}
end
end

View File

@ -0,0 +1,3 @@
defmodule Frenzy.Pipeline.Stage do
@callback apply(Map.t(), Map.t()) :: {:ok, Map.t()} | :tombstone | {:error, String.t()}
end

View File

@ -0,0 +1,20 @@
defmodule Frenzy.PipelineStage do
use Ecto.Schema
import Ecto.Changeset
schema "pipeline_stages" do
field :index, :integer
field :module_name, :string
field :options, :map
belongs_to :feed, Frenzy.Feed
timestamps()
end
def changeset(stage, attrs) do
stage
|> cast(attrs, [:index, :module_name, :options])
|> validate_required([:index, :module_name])
end
end

View File

@ -123,59 +123,66 @@ defmodule Frenzy.UpdateFeeds do
Logger.debug("Creating item for #{url}")
content =
if feed.scrape_remote_content do
case get_article_content(url) do
{:ok, content} ->
content
{:err, reason} ->
Logger.warn("Unable to fetch article for #{url}: #{inspect(reason)}")
entry.description
end
else
entry.description
end
item_params = %{
guid: entry.id,
title: entry.title,
url: url,
date: parse_date(entry.published_at),
creator: "",
content: content
content: entry.description
}
feed = Repo.preload(feed, :pipeline_stages)
result =
if feed.filter_enabled do
case {feed.filter.mode, FilterEngine.matches?(item_params, feed.filter)} do
{"accept", true} ->
:store
feed.pipeline_stages
|> Enum.sort_by(& &1.index)
|> Enum.reduce({:ok, item_params}, fn
stage, {:ok, item_params} ->
apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [
stage.options,
item_params
])
{"reject", false} ->
:store
_stage, :tombstone ->
:tombstone
_ ->
Logger.debug("Skipping item #{url} due to feed filter")
:tombstone
_stage, {:error, _error} = error ->
error
end)
case result do
{:err, error} ->
Logger.error(error)
{:ok, item_params} ->
changeset = Ecto.build_assoc(feed, :items, item_params)
case Repo.insert(changeset) do
{:ok, item} ->
item
{:error, changeset} ->
Logger.error("Error inserting item #{entry.guid}")
Logger.error(changeset)
end
else
:store
end
changeset =
case result do
:store ->
Ecto.build_assoc(feed, :items, item_params)
:tombstone ->
:tombstone ->
changeset =
Ecto.build_assoc(feed, :items, %{
guid: entry.id,
tombstone: true
})
end
Repo.insert(changeset)
case Repo.insert(changeset) do
{:ok, item} ->
item
{:error, changeset} ->
Logger.error("Error inserting tombstone for #{entry.guid}")
Logger.error(changeset)
end
end
end
defp parse_date(str) do
@ -203,35 +210,4 @@ defmodule Frenzy.UpdateFeeds do
link.href
end
end
defp get_article_content(url) when is_binary(url) and url != "" do
Logger.debug("Getting article from #{url}")
case HTTPoison.get(url) do
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
article = Readability.article(body)
{:ok, Readability.readable_html(article)}
{:ok, %HTTPoison.Response{status_code: 404}} ->
{:err, "404 not found"}
{:ok, %HTTPoison.Response{status_code: status_code, headers: headers}}
when status_code in [301, 302] ->
{"Location", new_url} =
Enum.find(headers, fn {name, _value} ->
name == "Location"
end)
Logger.debug("Got 301 redirect from #{url} to #{new_url}")
get_article_content(new_url)
{:ok, %HTTPoison.Response{status_code: 403}} ->
{:err, "403 Forbidden"}
{:error, %HTTPoison.Error{reason: reason}} ->
{:err, reason}
end
end
defp get_article_content(_url), do: {:err, "URL must be a non-empty string"}
end

View File

@ -0,0 +1,14 @@
defmodule Frenzy.Repo.Migrations.CreatePipelineStages do
use Ecto.Migration
def change do
create table(:pipeline_stages) do
add :index, :integer
add :module_name, :string
add :options, :jsonb, null: false, default: "{}"
add :feed_id, references(:feeds, on_delete: :delete_all)
timestamps()
end
end
end