frenzy/lib/frenzy/task/create_item.ex

150 lines
3.8 KiB
Elixir

defmodule Frenzy.Task.CreateItem do
require Logger
use Task
alias Frenzy.{Repo, Item}
@spec start_link(Frenzy.Feed.t(), FeedParser.Item.t()) :: {:ok, pid()}
def start_link(feed, entry) do
Task.start_link(__MODULE__, :run, [feed, entry])
end
@spec run(Frenzy.Feed.t(), FeedParser.Item.t()) :: :ok
def run(feed, entry) do
Logger.metadata(item_task_id: generate_task_id())
url = get_real_url(entry)
Logger.debug("Creating item for #{url}")
date =
entry.date
|> Timex.Timezone.convert(:utc)
|> case do
{:error, reason} ->
Logger.debug("Couldn't convert date '#{entry.date}' to UTC: #{reason}")
Timex.now() |> DateTime.truncate(:second)
utc_date ->
DateTime.truncate(utc_date, :second)
end
item_params = %{
# fallback to url if guid isn't present
guid: entry.guid || url,
title: entry.title,
url: url,
date: date,
creator: entry.creator,
content: entry.content,
# we assume text/html in the feed itself, other stages may alter this
content_type: "text/html"
}
feed = Repo.preload(feed, :pipeline)
result =
if feed.pipeline do
feed.pipeline.stages
|> Enum.reduce({:ok, item_params}, fn
stage, {:ok, item_params} ->
apply(String.to_existing_atom("Elixir." <> stage["module_name"]), :apply, [
stage["options"],
item_params
])
_stage, :tombstone ->
:tombstone
_stage, {:error, _error} = error ->
error
end)
else
{:ok, item_params}
end
changeset =
case result do
{:error, error} ->
Logger.error(error)
if Frenzy.sentry_enabled?() do
Sentry.capture_message(
"Error evaluating pipeline '#{feed.pipeline.name}': #{inspect(error)}",
extra: %{feed_id: feed.id, pipeline_id: feed.pipeline.id}
)
end
:error
{:ok, item_params} ->
item_params = Map.put(item_params, :feed_id, feed.id)
Item.changeset(%Item{}, item_params)
:tombstone ->
Item.changeset(%Item{}, %{
guid: item_params.guid,
tombstone: true
})
end
case changeset do
nil ->
nil
changeset ->
case Repo.insert(changeset) do
{:ok, item} ->
item
{:error, changeset} ->
with [feed_id: {_, list}] <- changeset.errors,
true <- {:constraint_name, "items_feed_guid_index"} in list do
Logger.warn("Did not insert duplicate item for #{item_params.guid}")
else
_ ->
Logger.error("Error inserting item #{item_params.guid}")
Logger.error(changeset.errors)
if Frenzy.sentry_enabled?() do
Sentry.capture_message("Error inserting item '#{item_params.guid}'",
extra: %{feed_id: feed.id, errors: changeset.errors}
)
end
end
end
end
:ok
end
defp get_real_url(entry) do
links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end)
case Enum.find(links, fn {_, rel} -> rel == "related" end) do
nil ->
case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do
nil ->
[{href, _} | _] = links
href
{href, _} ->
href
end
{href, _} ->
href
end
end
# from https://github.com/elixir-plug/plug/blob/v1.8.3/lib/plug/request_id.ex#L60
defp generate_task_id() do
binary = <<
System.system_time(:nanosecond)::64,
:erlang.phash2({node(), self()}, 16_777_216)::24,
:erlang.unique_integer()::32
>>
Base.url_encode64(binary)
end
end