From d8139c6ce0499f7e80314fc99584a9ab937b72e9 Mon Sep 17 00:00:00 2001 From: Shadowfacts Date: Thu, 31 Oct 2019 22:21:17 -0400 Subject: [PATCH] Make item creation multi-threaded --- config/config.exs | 2 +- lib/frenzy/create_item_task.ex | 108 ++++++++++++++++++ lib/frenzy/update_feeds.ex | 94 +-------------- lib/frenzy_web/controllers/feed_controller.ex | 5 +- 4 files changed, 118 insertions(+), 91 deletions(-) create mode 100644 lib/frenzy/create_item_task.ex diff --git a/config/config.exs b/config/config.exs index 1f3ab27..9e2662b 100644 --- a/config/config.exs +++ b/config/config.exs @@ -20,7 +20,7 @@ config :frenzy, FrenzyWeb.Endpoint, # Configures Elixir's Logger config :logger, :console, format: "$time $metadata[$level] $message\n", - metadata: [:request_id] + metadata: [:request_id, :item_task_id] # Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason diff --git a/lib/frenzy/create_item_task.ex b/lib/frenzy/create_item_task.ex new file mode 100644 index 0000000..d990f70 --- /dev/null +++ b/lib/frenzy/create_item_task.ex @@ -0,0 +1,108 @@ +defmodule Frenzy.CreateItemTask do + require Logger + use Task + alias Frenzy.Repo + + def start_link(feed, entry) do + Task.start_link(__MODULE__, :run, [feed, entry]) + end + + def run(feed, entry) do + Logger.metadata(item_task_id: generate_task_id()) + + url = get_real_url(entry) + + Logger.debug("Creating item for #{url}") + + item_params = %{ + guid: entry.guid, + title: entry.title, + url: url, + date: entry.date |> Timex.Timezone.convert(:utc) |> DateTime.truncate(:second), + creator: "", + content: entry.content + } + + feed = Repo.preload(feed, :pipeline_stages) + + result = + feed.pipeline_stages + |> Enum.sort_by(& &1.index) + |> Enum.reduce({:ok, item_params}, fn + stage, {:ok, item_params} -> + apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [ + stage.options, + item_params + ]) + + _stage, :tombstone -> + :tombstone + + _stage, {:error, _error} = error -> + error + end) + + case result do + {:err, error} -> + Logger.error(error) + + {:ok, item_params} -> + changeset = Ecto.build_assoc(feed, :items, item_params) + + case Repo.insert(changeset) do + {:ok, item} -> + item + + {:error, changeset} -> + Logger.error("Error inserting item #{entry.guid}") + Logger.error(changeset) + end + + :tombstone -> + changeset = + Ecto.build_assoc(feed, :items, %{ + guid: item_params.guid, + tombstone: true + }) + + case Repo.insert(changeset) do + {:ok, item} -> + item + + {:error, changeset} -> + Logger.error("Error inserting tombstone for #{entry.guid}") + Logger.error(changeset) + end + end + end + + defp get_real_url(entry) do + links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end) + + case Enum.find(links, fn {_, rel} -> rel == "related" end) do + nil -> + case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do + nil -> + [{href, _} | _] = links + href + + {href, _} -> + href + end + + {href, _} -> + href + end + end + + # from https://github.com/elixir-plug/plug/blob/v1.8.3/lib/plug/request_id.ex#L60 + defp generate_task_id() do + binary = << + System.system_time(:nanosecond)::64, + :erlang.phash2({node(), self()}, 16_777_216)::24, + :erlang.unique_integer()::32 + >> + + Base.url_encode64(binary) + end +end diff --git a/lib/frenzy/update_feeds.ex b/lib/frenzy/update_feeds.ex index 96d71f8..4afb82f 100644 --- a/lib/frenzy/update_feeds.ex +++ b/lib/frenzy/update_feeds.ex @@ -1,6 +1,6 @@ defmodule Frenzy.UpdateFeeds do use GenServer - alias Frenzy.{Repo, Feed, Item} + alias Frenzy.{Repo, Feed, Item, CreateItemTask} import Ecto.Query require Logger @@ -123,97 +123,13 @@ defmodule Frenzy.UpdateFeeds do feed = Repo.preload(feed, [:items]) - Enum.map(rss.items, fn entry -> + Enum.each(rss.items, fn entry -> # todo: use Repo.exists for this if !Enum.any?(feed.items, fn item -> item.guid == entry.guid end) do - create_item(feed, entry) + CreateItemTask.start_link(feed, entry) + # Task.start_link(__MODULE__, :create_item, [feed, entry]) + # spawn(__MODULE__, :create_item, [feed, entry]) end end) end - - defp create_item(feed, entry) do - url = get_real_url(entry) - - Logger.debug("Creating item for #{url}") - - item_params = %{ - guid: entry.guid, - title: entry.title, - url: url, - date: entry.date |> Timex.Timezone.convert(:utc) |> DateTime.truncate(:second), - creator: "", - content: entry.content - } - - feed = Repo.preload(feed, :pipeline_stages) - - result = - feed.pipeline_stages - |> Enum.sort_by(& &1.index) - |> Enum.reduce({:ok, item_params}, fn - stage, {:ok, item_params} -> - apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [ - stage.options, - item_params - ]) - - _stage, :tombstone -> - :tombstone - - _stage, {:error, _error} = error -> - error - end) - - case result do - {:err, error} -> - Logger.error(error) - - {:ok, item_params} -> - changeset = Ecto.build_assoc(feed, :items, item_params) - - case Repo.insert(changeset) do - {:ok, item} -> - item - - {:error, changeset} -> - Logger.error("Error inserting item #{entry.guid}") - Logger.error(changeset) - end - - :tombstone -> - changeset = - Ecto.build_assoc(feed, :items, %{ - guid: item_params.guid, - tombstone: true - }) - - case Repo.insert(changeset) do - {:ok, item} -> - item - - {:error, changeset} -> - Logger.error("Error inserting tombstone for #{entry.guid}") - Logger.error(changeset) - end - end - end - - defp get_real_url(entry) do - links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end) - - case Enum.find(links, fn {_, rel} -> rel == "related" end) do - nil -> - case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do - nil -> - [{href, _} | _] = links - href - - {href, _} -> - href - end - - {href, _} -> - href - end - end end diff --git a/lib/frenzy_web/controllers/feed_controller.ex b/lib/frenzy_web/controllers/feed_controller.ex index afdf5b4..679db27 100644 --- a/lib/frenzy_web/controllers/feed_controller.ex +++ b/lib/frenzy_web/controllers/feed_controller.ex @@ -106,6 +106,9 @@ defmodule FrenzyWeb.FeedController do def refresh(conn, _params) do feed = conn.assigns[:feed] feed = Frenzy.UpdateFeeds.refresh(Frenzy.UpdateFeeds, feed) - redirect(conn, to: Routes.feed_path(Endpoint, :show, feed.id)) + + conn + |> put_flash(:info, "Refreshing feed. Wait a moment before reloading...") + |> redirect(to: Routes.feed_path(Endpoint, :show, feed.id)) end end