Make item creation multi-threaded
This commit is contained in:
parent
3bc37952d1
commit
d8139c6ce0
|
@ -20,7 +20,7 @@ config :frenzy, FrenzyWeb.Endpoint,
|
|||
# Configures Elixir's Logger
|
||||
config :logger, :console,
|
||||
format: "$time $metadata[$level] $message\n",
|
||||
metadata: [:request_id]
|
||||
metadata: [:request_id, :item_task_id]
|
||||
|
||||
# Use Jason for JSON parsing in Phoenix
|
||||
config :phoenix, :json_library, Jason
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
defmodule Frenzy.CreateItemTask do
|
||||
require Logger
|
||||
use Task
|
||||
alias Frenzy.Repo
|
||||
|
||||
def start_link(feed, entry) do
|
||||
Task.start_link(__MODULE__, :run, [feed, entry])
|
||||
end
|
||||
|
||||
def run(feed, entry) do
|
||||
Logger.metadata(item_task_id: generate_task_id())
|
||||
|
||||
url = get_real_url(entry)
|
||||
|
||||
Logger.debug("Creating item for #{url}")
|
||||
|
||||
item_params = %{
|
||||
guid: entry.guid,
|
||||
title: entry.title,
|
||||
url: url,
|
||||
date: entry.date |> Timex.Timezone.convert(:utc) |> DateTime.truncate(:second),
|
||||
creator: "",
|
||||
content: entry.content
|
||||
}
|
||||
|
||||
feed = Repo.preload(feed, :pipeline_stages)
|
||||
|
||||
result =
|
||||
feed.pipeline_stages
|
||||
|> Enum.sort_by(& &1.index)
|
||||
|> Enum.reduce({:ok, item_params}, fn
|
||||
stage, {:ok, item_params} ->
|
||||
apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [
|
||||
stage.options,
|
||||
item_params
|
||||
])
|
||||
|
||||
_stage, :tombstone ->
|
||||
:tombstone
|
||||
|
||||
_stage, {:error, _error} = error ->
|
||||
error
|
||||
end)
|
||||
|
||||
case result do
|
||||
{:err, error} ->
|
||||
Logger.error(error)
|
||||
|
||||
{:ok, item_params} ->
|
||||
changeset = Ecto.build_assoc(feed, :items, item_params)
|
||||
|
||||
case Repo.insert(changeset) do
|
||||
{:ok, item} ->
|
||||
item
|
||||
|
||||
{:error, changeset} ->
|
||||
Logger.error("Error inserting item #{entry.guid}")
|
||||
Logger.error(changeset)
|
||||
end
|
||||
|
||||
:tombstone ->
|
||||
changeset =
|
||||
Ecto.build_assoc(feed, :items, %{
|
||||
guid: item_params.guid,
|
||||
tombstone: true
|
||||
})
|
||||
|
||||
case Repo.insert(changeset) do
|
||||
{:ok, item} ->
|
||||
item
|
||||
|
||||
{:error, changeset} ->
|
||||
Logger.error("Error inserting tombstone for #{entry.guid}")
|
||||
Logger.error(changeset)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp get_real_url(entry) do
|
||||
links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end)
|
||||
|
||||
case Enum.find(links, fn {_, rel} -> rel == "related" end) do
|
||||
nil ->
|
||||
case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do
|
||||
nil ->
|
||||
[{href, _} | _] = links
|
||||
href
|
||||
|
||||
{href, _} ->
|
||||
href
|
||||
end
|
||||
|
||||
{href, _} ->
|
||||
href
|
||||
end
|
||||
end
|
||||
|
||||
# from https://github.com/elixir-plug/plug/blob/v1.8.3/lib/plug/request_id.ex#L60
|
||||
defp generate_task_id() do
|
||||
binary = <<
|
||||
System.system_time(:nanosecond)::64,
|
||||
:erlang.phash2({node(), self()}, 16_777_216)::24,
|
||||
:erlang.unique_integer()::32
|
||||
>>
|
||||
|
||||
Base.url_encode64(binary)
|
||||
end
|
||||
end
|
|
@ -1,6 +1,6 @@
|
|||
defmodule Frenzy.UpdateFeeds do
|
||||
use GenServer
|
||||
alias Frenzy.{Repo, Feed, Item}
|
||||
alias Frenzy.{Repo, Feed, Item, CreateItemTask}
|
||||
import Ecto.Query
|
||||
require Logger
|
||||
|
||||
|
@ -123,97 +123,13 @@ defmodule Frenzy.UpdateFeeds do
|
|||
|
||||
feed = Repo.preload(feed, [:items])
|
||||
|
||||
Enum.map(rss.items, fn entry ->
|
||||
Enum.each(rss.items, fn entry ->
|
||||
# todo: use Repo.exists for this
|
||||
if !Enum.any?(feed.items, fn item -> item.guid == entry.guid end) do
|
||||
create_item(feed, entry)
|
||||
CreateItemTask.start_link(feed, entry)
|
||||
# Task.start_link(__MODULE__, :create_item, [feed, entry])
|
||||
# spawn(__MODULE__, :create_item, [feed, entry])
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp create_item(feed, entry) do
|
||||
url = get_real_url(entry)
|
||||
|
||||
Logger.debug("Creating item for #{url}")
|
||||
|
||||
item_params = %{
|
||||
guid: entry.guid,
|
||||
title: entry.title,
|
||||
url: url,
|
||||
date: entry.date |> Timex.Timezone.convert(:utc) |> DateTime.truncate(:second),
|
||||
creator: "",
|
||||
content: entry.content
|
||||
}
|
||||
|
||||
feed = Repo.preload(feed, :pipeline_stages)
|
||||
|
||||
result =
|
||||
feed.pipeline_stages
|
||||
|> Enum.sort_by(& &1.index)
|
||||
|> Enum.reduce({:ok, item_params}, fn
|
||||
stage, {:ok, item_params} ->
|
||||
apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [
|
||||
stage.options,
|
||||
item_params
|
||||
])
|
||||
|
||||
_stage, :tombstone ->
|
||||
:tombstone
|
||||
|
||||
_stage, {:error, _error} = error ->
|
||||
error
|
||||
end)
|
||||
|
||||
case result do
|
||||
{:err, error} ->
|
||||
Logger.error(error)
|
||||
|
||||
{:ok, item_params} ->
|
||||
changeset = Ecto.build_assoc(feed, :items, item_params)
|
||||
|
||||
case Repo.insert(changeset) do
|
||||
{:ok, item} ->
|
||||
item
|
||||
|
||||
{:error, changeset} ->
|
||||
Logger.error("Error inserting item #{entry.guid}")
|
||||
Logger.error(changeset)
|
||||
end
|
||||
|
||||
:tombstone ->
|
||||
changeset =
|
||||
Ecto.build_assoc(feed, :items, %{
|
||||
guid: item_params.guid,
|
||||
tombstone: true
|
||||
})
|
||||
|
||||
case Repo.insert(changeset) do
|
||||
{:ok, item} ->
|
||||
item
|
||||
|
||||
{:error, changeset} ->
|
||||
Logger.error("Error inserting tombstone for #{entry.guid}")
|
||||
Logger.error(changeset)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp get_real_url(entry) do
|
||||
links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end)
|
||||
|
||||
case Enum.find(links, fn {_, rel} -> rel == "related" end) do
|
||||
nil ->
|
||||
case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do
|
||||
nil ->
|
||||
[{href, _} | _] = links
|
||||
href
|
||||
|
||||
{href, _} ->
|
||||
href
|
||||
end
|
||||
|
||||
{href, _} ->
|
||||
href
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -106,6 +106,9 @@ defmodule FrenzyWeb.FeedController do
|
|||
def refresh(conn, _params) do
|
||||
feed = conn.assigns[:feed]
|
||||
feed = Frenzy.UpdateFeeds.refresh(Frenzy.UpdateFeeds, feed)
|
||||
redirect(conn, to: Routes.feed_path(Endpoint, :show, feed.id))
|
||||
|
||||
conn
|
||||
|> put_flash(:info, "Refreshing feed. Wait a moment before reloading...")
|
||||
|> redirect(to: Routes.feed_path(Endpoint, :show, feed.id))
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue