Start pipeline system
This commit is contained in:
parent
24c942c5d2
commit
d72e07cb46
|
@ -41,6 +41,7 @@ defmodule Frenzy.Feed do
|
||||||
|
|
||||||
has_many :items, Frenzy.Item, on_delete: :delete_all
|
has_many :items, Frenzy.Item, on_delete: :delete_all
|
||||||
has_one :filter, Frenzy.Filter, on_delete: :delete_all
|
has_one :filter, Frenzy.Filter, on_delete: :delete_all
|
||||||
|
has_many :pipeline_stages, Frenzy.PipelineStage, on_delete: :delete_all
|
||||||
|
|
||||||
timestamps()
|
timestamps()
|
||||||
end
|
end
|
||||||
|
@ -48,7 +49,14 @@ defmodule Frenzy.Feed do
|
||||||
@doc false
|
@doc false
|
||||||
def changeset(feed, attrs) do
|
def changeset(feed, attrs) do
|
||||||
feed
|
feed
|
||||||
|> cast(attrs, [:title, :feed_url, :site_url, :last_updated, :filter_enabled, :scrape_remote_content])
|
|> cast(attrs, [
|
||||||
|
:title,
|
||||||
|
:feed_url,
|
||||||
|
:site_url,
|
||||||
|
:last_updated,
|
||||||
|
:filter_enabled,
|
||||||
|
:scrape_remote_content
|
||||||
|
])
|
||||||
|> cast_assoc(:filter, required: true)
|
|> cast_assoc(:filter, required: true)
|
||||||
|> validate_required([:feed_url, :filter])
|
|> validate_required([:feed_url, :filter])
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,33 +0,0 @@
|
||||||
defmodule Frenzy.FilterEngine do
|
|
||||||
def matches?(item, filter) do
|
|
||||||
score =
|
|
||||||
filter.rules
|
|
||||||
|> Enum.map(fn rule -> score(item, rule) end)
|
|
||||||
|> Enum.sum()
|
|
||||||
|
|
||||||
score >= filter.score
|
|
||||||
end
|
|
||||||
|
|
||||||
def score(item, rule) do
|
|
||||||
prop_value = get_property(item, rule.property)
|
|
||||||
|
|
||||||
if matches(prop_value, rule.mode, rule.param) do
|
|
||||||
rule.weight
|
|
||||||
else
|
|
||||||
0
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def matches(value, "contains_string", param) do
|
|
||||||
String.contains?(value, param)
|
|
||||||
end
|
|
||||||
|
|
||||||
def matches(value, "matches_regex", param) do
|
|
||||||
regex = Regex.compile(param)
|
|
||||||
String.match?(value, regex)
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_property(item, "url"), do: item.url
|
|
||||||
def get_property(item, "title"), do: item.title
|
|
||||||
def get_property(item, "author"), do: item.author
|
|
||||||
end
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
defmodule Frenzy.Pipeline.FilterStage do
|
||||||
|
require Logger
|
||||||
|
alias Frenzy.Pipeline.Stage
|
||||||
|
@behaviour Stage
|
||||||
|
|
||||||
|
# @impl Stage
|
||||||
|
# def apply(opts, item_params) do
|
||||||
|
# matches = matches?(opts, item_params)
|
||||||
|
#
|
||||||
|
# end
|
||||||
|
|
||||||
|
@impl Stage
|
||||||
|
def apply(%{"mode" => mode, "score" => score, "rules" => rules}, item_params)
|
||||||
|
when is_binary(mode) and is_integer(score) and is_list(rules) do
|
||||||
|
item_score =
|
||||||
|
rules
|
||||||
|
|> Enum.map(fn rule -> test(rule, item_params) end)
|
||||||
|
|> Enum.sum()
|
||||||
|
|
||||||
|
matches = item_score >= score
|
||||||
|
|
||||||
|
case {mode, matches} do
|
||||||
|
{"accept", true} ->
|
||||||
|
{:ok, item_params}
|
||||||
|
|
||||||
|
{"reject", false} ->
|
||||||
|
{:ok, item_params}
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
Logger.debug("Skipping item #{item_params.url} due to feed filter")
|
||||||
|
:tombstone
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def matches?(opts, item_params) do
|
||||||
|
Logger.warn("Received invalid filter opts: #{opts}")
|
||||||
|
false
|
||||||
|
end
|
||||||
|
|
||||||
|
defp test(
|
||||||
|
%{"mode" => mode, "property" => property, "param" => param, "weight" => weight},
|
||||||
|
item_params
|
||||||
|
) do
|
||||||
|
with prop_value <- get_property(item_params, property),
|
||||||
|
true <- is_binary(prop_value),
|
||||||
|
true <- matches(prop_value, mode, param) do
|
||||||
|
weight
|
||||||
|
else
|
||||||
|
_ ->
|
||||||
|
0
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def matches(value, "contains_string", param) do
|
||||||
|
String.contains?(value, param)
|
||||||
|
end
|
||||||
|
|
||||||
|
def matches(value, "matches_regex", param) do
|
||||||
|
regex = Regex.compile(param)
|
||||||
|
String.match?(value, regex)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_property(item_params, "url"), do: item_params.url
|
||||||
|
defp get_property(item_params, "title"), do: item_params.title
|
||||||
|
defp get_property(item_params, "author"), do: item_params.author
|
||||||
|
defp get_property(_item_params, _property), do: {:error, "invalid property"}
|
||||||
|
end
|
|
@ -0,0 +1,58 @@
|
||||||
|
defmodule Frenzy.Pipeline.ScrapeStage do
|
||||||
|
require Logger
|
||||||
|
alias Frenzy.Pipeline.Stage
|
||||||
|
@behaviour Stage
|
||||||
|
|
||||||
|
@impl Stage
|
||||||
|
def apply(_opts, %{url: url} = item_params) do
|
||||||
|
case get_article_content(url) do
|
||||||
|
{:ok, content} ->
|
||||||
|
{:ok, %{item_params | content: content}}
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.warn("Unable to get article content: #{reason}")
|
||||||
|
item_params
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_article_content(url) when is_binary(url) and url != "" do
|
||||||
|
Logger.debug("Getting article from #{url}")
|
||||||
|
|
||||||
|
url
|
||||||
|
|> HTTPoison.get()
|
||||||
|
|> case do
|
||||||
|
{:ok, response} ->
|
||||||
|
handle_response(url, response)
|
||||||
|
|
||||||
|
{:error, %HTTPoison.Error{reason: reason}} ->
|
||||||
|
{:error, "HTTPoison error: #{reason}"}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp get_article_content(_url), do: {:error, "URL must be a non-empty string"}
|
||||||
|
|
||||||
|
defp handle_response(_url, %HTTPoison.Response{status_code: 200, body: body}) do
|
||||||
|
article = Readability.article(body)
|
||||||
|
{:ok, Readability.readable_html(article)}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_response(_url, %HTTPoison.Response{status_code: 404}) do
|
||||||
|
{:error, "404 not found"}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_response(url, %HTTPoison.Response{status_code: status_code, headers: headers})
|
||||||
|
when status_code in [301, 302] do
|
||||||
|
{"Location", new_url} = Enum.find(headers, fn {name, _value} -> name == "Location" end)
|
||||||
|
|
||||||
|
Logger.debug("Got 301 redirect from #{url} to #{new_url}")
|
||||||
|
get_article_content(new_url)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_response(_url, %HTTPoison.Response{status_code: 403}) do
|
||||||
|
{:error, "403 Forbidden"}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_response(_url, %HTTPoison.Response{status_code: status_code} = response) do
|
||||||
|
{:error, "No handler for response #{inspect(response)}"}
|
||||||
|
end
|
||||||
|
end
|
|
@ -0,0 +1,3 @@
|
||||||
|
defmodule Frenzy.Pipeline.Stage do
|
||||||
|
@callback apply(Map.t(), Map.t()) :: {:ok, Map.t()} | :tombstone | {:error, String.t()}
|
||||||
|
end
|
|
@ -0,0 +1,20 @@
|
||||||
|
defmodule Frenzy.PipelineStage do
|
||||||
|
use Ecto.Schema
|
||||||
|
import Ecto.Changeset
|
||||||
|
|
||||||
|
schema "pipeline_stages" do
|
||||||
|
field :index, :integer
|
||||||
|
field :module_name, :string
|
||||||
|
field :options, :map
|
||||||
|
|
||||||
|
belongs_to :feed, Frenzy.Feed
|
||||||
|
|
||||||
|
timestamps()
|
||||||
|
end
|
||||||
|
|
||||||
|
def changeset(stage, attrs) do
|
||||||
|
stage
|
||||||
|
|> cast(attrs, [:index, :module_name, :options])
|
||||||
|
|> validate_required([:index, :module_name])
|
||||||
|
end
|
||||||
|
end
|
|
@ -123,59 +123,66 @@ defmodule Frenzy.UpdateFeeds do
|
||||||
|
|
||||||
Logger.debug("Creating item for #{url}")
|
Logger.debug("Creating item for #{url}")
|
||||||
|
|
||||||
content =
|
|
||||||
if feed.scrape_remote_content do
|
|
||||||
case get_article_content(url) do
|
|
||||||
{:ok, content} ->
|
|
||||||
content
|
|
||||||
|
|
||||||
{:err, reason} ->
|
|
||||||
Logger.warn("Unable to fetch article for #{url}: #{inspect(reason)}")
|
|
||||||
entry.description
|
|
||||||
end
|
|
||||||
else
|
|
||||||
entry.description
|
|
||||||
end
|
|
||||||
|
|
||||||
item_params = %{
|
item_params = %{
|
||||||
guid: entry.id,
|
guid: entry.id,
|
||||||
title: entry.title,
|
title: entry.title,
|
||||||
url: url,
|
url: url,
|
||||||
date: parse_date(entry.published_at),
|
date: parse_date(entry.published_at),
|
||||||
creator: "",
|
creator: "",
|
||||||
content: content
|
content: entry.description
|
||||||
}
|
}
|
||||||
|
|
||||||
|
feed = Repo.preload(feed, :pipeline_stages)
|
||||||
|
|
||||||
result =
|
result =
|
||||||
if feed.filter_enabled do
|
feed.pipeline_stages
|
||||||
case {feed.filter.mode, FilterEngine.matches?(item_params, feed.filter)} do
|
|> Enum.sort_by(& &1.index)
|
||||||
{"accept", true} ->
|
|> Enum.reduce({:ok, item_params}, fn
|
||||||
:store
|
stage, {:ok, item_params} ->
|
||||||
|
apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [
|
||||||
|
stage.options,
|
||||||
|
item_params
|
||||||
|
])
|
||||||
|
|
||||||
{"reject", false} ->
|
_stage, :tombstone ->
|
||||||
:store
|
:tombstone
|
||||||
|
|
||||||
_ ->
|
_stage, {:error, _error} = error ->
|
||||||
Logger.debug("Skipping item #{url} due to feed filter")
|
error
|
||||||
:tombstone
|
end)
|
||||||
|
|
||||||
|
case result do
|
||||||
|
{:err, error} ->
|
||||||
|
Logger.error(error)
|
||||||
|
|
||||||
|
{:ok, item_params} ->
|
||||||
|
changeset = Ecto.build_assoc(feed, :items, item_params)
|
||||||
|
|
||||||
|
case Repo.insert(changeset) do
|
||||||
|
{:ok, item} ->
|
||||||
|
item
|
||||||
|
|
||||||
|
{:error, changeset} ->
|
||||||
|
Logger.error("Error inserting item #{entry.guid}")
|
||||||
|
Logger.error(changeset)
|
||||||
end
|
end
|
||||||
else
|
|
||||||
:store
|
|
||||||
end
|
|
||||||
|
|
||||||
changeset =
|
:tombstone ->
|
||||||
case result do
|
changeset =
|
||||||
:store ->
|
|
||||||
Ecto.build_assoc(feed, :items, item_params)
|
|
||||||
|
|
||||||
:tombstone ->
|
|
||||||
Ecto.build_assoc(feed, :items, %{
|
Ecto.build_assoc(feed, :items, %{
|
||||||
guid: entry.id,
|
guid: entry.id,
|
||||||
tombstone: true
|
tombstone: true
|
||||||
})
|
})
|
||||||
end
|
|
||||||
|
|
||||||
Repo.insert(changeset)
|
case Repo.insert(changeset) do
|
||||||
|
{:ok, item} ->
|
||||||
|
item
|
||||||
|
|
||||||
|
{:error, changeset} ->
|
||||||
|
Logger.error("Error inserting tombstone for #{entry.guid}")
|
||||||
|
Logger.error(changeset)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp parse_date(str) do
|
defp parse_date(str) do
|
||||||
|
@ -203,35 +210,4 @@ defmodule Frenzy.UpdateFeeds do
|
||||||
link.href
|
link.href
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp get_article_content(url) when is_binary(url) and url != "" do
|
|
||||||
Logger.debug("Getting article from #{url}")
|
|
||||||
|
|
||||||
case HTTPoison.get(url) do
|
|
||||||
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
|
|
||||||
article = Readability.article(body)
|
|
||||||
{:ok, Readability.readable_html(article)}
|
|
||||||
|
|
||||||
{:ok, %HTTPoison.Response{status_code: 404}} ->
|
|
||||||
{:err, "404 not found"}
|
|
||||||
|
|
||||||
{:ok, %HTTPoison.Response{status_code: status_code, headers: headers}}
|
|
||||||
when status_code in [301, 302] ->
|
|
||||||
{"Location", new_url} =
|
|
||||||
Enum.find(headers, fn {name, _value} ->
|
|
||||||
name == "Location"
|
|
||||||
end)
|
|
||||||
|
|
||||||
Logger.debug("Got 301 redirect from #{url} to #{new_url}")
|
|
||||||
get_article_content(new_url)
|
|
||||||
|
|
||||||
{:ok, %HTTPoison.Response{status_code: 403}} ->
|
|
||||||
{:err, "403 Forbidden"}
|
|
||||||
|
|
||||||
{:error, %HTTPoison.Error{reason: reason}} ->
|
|
||||||
{:err, reason}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp get_article_content(_url), do: {:err, "URL must be a non-empty string"}
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
defmodule Frenzy.Repo.Migrations.CreatePipelineStages do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def change do
|
||||||
|
create table(:pipeline_stages) do
|
||||||
|
add :index, :integer
|
||||||
|
add :module_name, :string
|
||||||
|
add :options, :jsonb, null: false, default: "{}"
|
||||||
|
add :feed_id, references(:feeds, on_delete: :delete_all)
|
||||||
|
|
||||||
|
timestamps()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue