Compare commits

..

3 Commits

5 changed files with 201 additions and 123 deletions

View File

@ -20,7 +20,7 @@ config :frenzy, FrenzyWeb.Endpoint,
# Configures Elixir's Logger # Configures Elixir's Logger
config :logger, :console, config :logger, :console,
format: "$time $metadata[$level] $message\n", format: "$time $metadata[$level] $message\n",
metadata: [:request_id] metadata: [:request_id, :item_task_id]
# Use Jason for JSON parsing in Phoenix # Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason config :phoenix, :json_library, Jason

View File

@ -0,0 +1,108 @@
defmodule Frenzy.CreateItemTask do
require Logger
use Task
alias Frenzy.Repo
def start_link(feed, entry) do
Task.start_link(__MODULE__, :run, [feed, entry])
end
def run(feed, entry) do
Logger.metadata(item_task_id: generate_task_id())
url = get_real_url(entry)
Logger.debug("Creating item for #{url}")
item_params = %{
guid: entry.guid,
title: entry.title,
url: url,
date: entry.date |> Timex.Timezone.convert(:utc) |> DateTime.truncate(:second),
creator: "",
content: entry.content
}
feed = Repo.preload(feed, :pipeline_stages)
result =
feed.pipeline_stages
|> Enum.sort_by(& &1.index)
|> Enum.reduce({:ok, item_params}, fn
stage, {:ok, item_params} ->
apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [
stage.options,
item_params
])
_stage, :tombstone ->
:tombstone
_stage, {:error, _error} = error ->
error
end)
case result do
{:err, error} ->
Logger.error(error)
{:ok, item_params} ->
changeset = Ecto.build_assoc(feed, :items, item_params)
case Repo.insert(changeset) do
{:ok, item} ->
item
{:error, changeset} ->
Logger.error("Error inserting item #{entry.guid}")
Logger.error(changeset)
end
:tombstone ->
changeset =
Ecto.build_assoc(feed, :items, %{
guid: item_params.guid,
tombstone: true
})
case Repo.insert(changeset) do
{:ok, item} ->
item
{:error, changeset} ->
Logger.error("Error inserting tombstone for #{entry.guid}")
Logger.error(changeset)
end
end
end
defp get_real_url(entry) do
links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end)
case Enum.find(links, fn {_, rel} -> rel == "related" end) do
nil ->
case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do
nil ->
[{href, _} | _] = links
href
{href, _} ->
href
end
{href, _} ->
href
end
end
# from https://github.com/elixir-plug/plug/blob/v1.8.3/lib/plug/request_id.ex#L60
defp generate_task_id() do
binary = <<
System.system_time(:nanosecond)::64,
:erlang.phash2({node(), self()}, 16_777_216)::24,
:erlang.unique_integer()::32
>>
Base.url_encode64(binary)
end
end

View File

@ -5,7 +5,7 @@ defmodule Frenzy.Pipeline.ScrapeStage do
@impl Stage @impl Stage
def apply(opts, %{url: url} = item_params) do def apply(opts, %{url: url} = item_params) do
case get_article_content(url, opts["extractor"]) do case get_article_content(url, opts) do
{:ok, content} -> {:ok, content} ->
{:ok, %{item_params | content: content}} {:ok, %{item_params | content: content}}
@ -18,24 +18,36 @@ defmodule Frenzy.Pipeline.ScrapeStage do
@impl Stage @impl Stage
def validate_opts(opts) when is_map(opts) do def validate_opts(opts) when is_map(opts) do
# todo: figure out why this errors when an empty map is provided # todo: figure out why this errors when an empty map is provided
case opts["extractor"] do opts =
case opts["extractor"] do
nil ->
{:ok, %{opts | extractor: "builtin"}}
extractor when not is_binary(extractor) ->
{:error, "extractor must be a string"}
"builtin" ->
{:ok, opts}
extractor ->
try do
String.to_existing_atom("Elixir." <> extractor)
{:ok, opts}
rescue
ArgumentError ->
{:error, "extractor must be \"builtin\" or a module that exists"}
end
end
case opts["convert_to_data_uris"] do
nil -> nil ->
{:ok, %{opts | extractor: "builtin"}} {:ok, %{opts | convert_to_data_uris: true}}
extractor when not is_binary(extractor) -> value when is_boolean(value) ->
{:error, "extractor must be a string"}
"builtin" ->
{:ok, opts} {:ok, opts}
extractor -> _ ->
try do {:error, "convert_to_data_uris must be a boolean"}
String.to_existing_atom("Elixir." <> extractor)
{:ok, opts}
rescue
ArgumentError ->
{:error, "extractor must be \"builtin\" or a module that exists"}
end
end end
end end
@ -43,14 +55,14 @@ defmodule Frenzy.Pipeline.ScrapeStage do
def validate_opts(_), do: {:error, "options must be a map"} def validate_opts(_), do: {:error, "options must be a map"}
@spec get_article_content(String.t(), String.t()) :: {:ok, String.t()} | {:error, String.t()} @spec get_article_content(String.t(), String.t()) :: {:ok, String.t()} | {:error, String.t()}
defp get_article_content(url, extractor) when is_binary(url) and url != "" do defp get_article_content(url, opts) when is_binary(url) and url != "" do
Logger.debug("Getting article from #{url}") Logger.debug("Getting article from #{url}")
url url
|> HTTPoison.get() |> HTTPoison.get()
|> case do |> case do
{:ok, response} -> {:ok, response} ->
handle_response(url, response, extractor) handle_response(url, response, opts)
{:error, %HTTPoison.Error{reason: reason}} -> {:error, %HTTPoison.Error{reason: reason}} ->
{:error, "HTTPoison error: #{reason}"} {:error, "HTTPoison error: #{reason}"}
@ -61,8 +73,8 @@ defmodule Frenzy.Pipeline.ScrapeStage do
@spec handle_response(String.t(), HTTPoison.Response.t(), String.t()) :: @spec handle_response(String.t(), HTTPoison.Response.t(), String.t()) ::
{:ok, String.t()} | {:error, String.t()} {:ok, String.t()} | {:error, String.t()}
defp handle_response(url, %HTTPoison.Response{status_code: 200, body: body}, extractor) do defp handle_response(url, %HTTPoison.Response{status_code: 200, body: body}, opts) do
case extractor do case opts["extractor"] do
"builtin" -> "builtin" ->
{:ok, Readability.article(body)} {:ok, Readability.article(body)}
@ -72,9 +84,15 @@ defmodule Frenzy.Pipeline.ScrapeStage do
end end
|> case do |> case do
{:ok, html} -> {:ok, html} ->
html = Floki.map(html, rewrite_image_urls(URI.parse(url))) convert_to_data_uris =
case opts["convert_to_data_uris"] do
nil -> true
value -> value
end
case extractor do html = Floki.map(html, rewrite_image_urls(convert_to_data_uris, URI.parse(url)))
case opts["extractor"] do
"builtin" -> "builtin" ->
{:ok, Readability.readable_html(html)} {:ok, Readability.readable_html(html)}
@ -120,21 +138,54 @@ defmodule Frenzy.Pipeline.ScrapeStage do
# Generates a helper function for the article with the given URI that takes an HTML element and, # Generates a helper function for the article with the given URI that takes an HTML element and,
# if it's an <img> element whose src attribute does not have a hostname, adds the hostname and # if it's an <img> element whose src attribute does not have a hostname, adds the hostname and
# scheme to the element. # scheme to the element.
defp rewrite_image_urls(%URI{host: host, scheme: scheme}) do defp rewrite_image_urls(convert_to_data_uris, %URI{host: host, scheme: scheme}) do
fn fn
{"img", [{"src", src} | attrs]} = elem -> {"img", attrs} ->
case URI.parse(src) do new_attrs =
%URI{host: nil, path: path} -> Enum.map(attrs, fn
new_src = URI.to_string(%URI{path: path, host: host, scheme: scheme}) {"src", src} ->
case URI.parse(src) do
%URI{host: nil, path: path} ->
new_src =
URI.to_string(%URI{path: path, host: host, scheme: scheme})
|> image_to_data_uri(convert_to_data_uris)
{"img", [{"src", new_src} | attrs]} {"src", new_src}
_ -> _ ->
elem {"src", image_to_data_uri(convert_to_data_uris, src)}
end end
attr ->
attr
end)
{"img", new_attrs}
elem -> elem ->
elem elem
end end
end end
@content_type_allowlist ["image/jpeg", "image/png", "image/heic", "image/heif", "image/tiff"]
# convert images to data URIs so that they're stored by clients as part of the body
defp image_to_data_uri(true, src) do
case HTTPoison.get(src) do
{:ok, %HTTPoison.Response{status_code: 200, body: body, headers: headers}} ->
{"Content-Type", content_type} =
Enum.find(headers, fn {header, _value} -> header == "Content-Type" end)
if content_type in @content_type_allowlist do
"data:#{content_type};base64,#{Base.encode64(body)}"
else
src
end
_ ->
src
end
end
defp image_to_data_uri(false, src), do: src
end end

View File

@ -1,6 +1,6 @@
defmodule Frenzy.UpdateFeeds do defmodule Frenzy.UpdateFeeds do
use GenServer use GenServer
alias Frenzy.{Repo, Feed, Item} alias Frenzy.{Repo, Feed, Item, CreateItemTask}
import Ecto.Query import Ecto.Query
require Logger require Logger
@ -31,8 +31,8 @@ defmodule Frenzy.UpdateFeeds do
end end
defp schedule_update() do defp schedule_update() do
# 15 minutes # 30 minutes
Process.send_after(self(), :update_feeds, 15 * 60 * 1000) Process.send_after(self(), :update_feeds, 30 * 60 * 1000)
# 1 minutes # 1 minutes
# Process.send_after(self(), :update_feeds, 60 * 1000) # Process.send_after(self(), :update_feeds, 60 * 1000)
end end
@ -123,97 +123,13 @@ defmodule Frenzy.UpdateFeeds do
feed = Repo.preload(feed, [:items]) feed = Repo.preload(feed, [:items])
Enum.map(rss.items, fn entry -> Enum.each(rss.items, fn entry ->
# todo: use Repo.exists for this # todo: use Repo.exists for this
if !Enum.any?(feed.items, fn item -> item.guid == entry.guid end) do if !Enum.any?(feed.items, fn item -> item.guid == entry.guid end) do
create_item(feed, entry) CreateItemTask.start_link(feed, entry)
# Task.start_link(__MODULE__, :create_item, [feed, entry])
# spawn(__MODULE__, :create_item, [feed, entry])
end end
end) end)
end end
defp create_item(feed, entry) do
url = get_real_url(entry)
Logger.debug("Creating item for #{url}")
item_params = %{
guid: entry.guid,
title: entry.title,
url: url,
date: entry.date |> Timex.Timezone.convert(:utc) |> DateTime.truncate(:second),
creator: "",
content: entry.content
}
feed = Repo.preload(feed, :pipeline_stages)
result =
feed.pipeline_stages
|> Enum.sort_by(& &1.index)
|> Enum.reduce({:ok, item_params}, fn
stage, {:ok, item_params} ->
apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [
stage.options,
item_params
])
_stage, :tombstone ->
:tombstone
_stage, {:error, _error} = error ->
error
end)
case result do
{:err, error} ->
Logger.error(error)
{:ok, item_params} ->
changeset = Ecto.build_assoc(feed, :items, item_params)
case Repo.insert(changeset) do
{:ok, item} ->
item
{:error, changeset} ->
Logger.error("Error inserting item #{entry.guid}")
Logger.error(changeset)
end
:tombstone ->
changeset =
Ecto.build_assoc(feed, :items, %{
guid: item_params.guid,
tombstone: true
})
case Repo.insert(changeset) do
{:ok, item} ->
item
{:error, changeset} ->
Logger.error("Error inserting tombstone for #{entry.guid}")
Logger.error(changeset)
end
end
end
defp get_real_url(entry) do
links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end)
case Enum.find(links, fn {_, rel} -> rel == "related" end) do
nil ->
case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do
nil ->
[{href, _} | _] = links
href
{href, _} ->
href
end
{href, _} ->
href
end
end
end end

View File

@ -106,6 +106,9 @@ defmodule FrenzyWeb.FeedController do
def refresh(conn, _params) do def refresh(conn, _params) do
feed = conn.assigns[:feed] feed = conn.assigns[:feed]
feed = Frenzy.UpdateFeeds.refresh(Frenzy.UpdateFeeds, feed) feed = Frenzy.UpdateFeeds.refresh(Frenzy.UpdateFeeds, feed)
redirect(conn, to: Routes.feed_path(Endpoint, :show, feed.id))
conn
|> put_flash(:info, "Refreshing feed. Wait a moment before reloading...")
|> redirect(to: Routes.feed_path(Endpoint, :show, feed.id))
end end
end end