defmodule Frenzy.UpdateFeeds do use GenServer alias Frenzy.{Network, Repo, Feed, Item} alias Frenzy.Task.{CreateItem, FetchFavicon} import Ecto.Query require Logger def start_link(state) do GenServer.start_link(__MODULE__, :ok, state) end def refresh(pid, feed) do GenServer.cast(pid, {:refresh, feed}) end def init(state) do Process.send_after(self(), :update_feeds, 5 * 1000) {:ok, state} end def handle_cast({:refresh, feed}, state) do update_feed(feed) {:noreply, state} end def handle_info({:update_feed, feed_id, retry_count}, state) do update_feed(Repo.get(Feed, feed_id), retry_count) {:noreply, state} end def handle_info(:update_feeds, state) do update_feeds() schedule_update() {:noreply, state} end # workaround for unhanled {:ssl_closed, {:sslsocket, {:gen_tcp, ...}}} message when Gemini module def handle_info({:ssl_closed, _}, state), do: {:noreply, state} defp schedule_update() do # 30 minutes Process.send_after(self(), :update_feeds, 30 * 60 * 1000) # 1 minutes # Process.send_after(self(), :update_feeds, 60 * 1000) end defp update_feeds() do {count, feeds} = Feed |> where( [f], is_nil(f.last_refreshed_at) or f.last_refreshed_at <= from_now(-1 * f.refresh_frequency, "second") ) |> select([f], f) |> Repo.update_all(set: [last_refreshed_at: DateTime.utc_now()]) Logger.info("Updating #{count} feeds") do_update_feeds(feeds) prune_old_items() end def force_update_feeds() do feeds = Repo.all(Feed) Logger.info("Force updating #{Enum.count(feeds)} feeds") do_update_feeds(feeds) end defp do_update_feeds(feeds) do Enum.each(feeds, fn feed -> try do update_feed(feed) rescue error -> Logger.warning( "Encountered error updating feed #{feed.id} #{feed.feed_url}: #{inspect(error)}" ) if Frenzy.sentry_enabled?() do Sentry.capture_exception(error, stacktrace: __STACKTRACE__, extra: %{feed_id: feed.id, feed_url: feed.feed_url} ) end end end) end defp prune_old_items() do {count, _} = from(i in Item, where: not i.tombstone, # todo: these time intervals should be configurable by the admin where: (i.read and i.read_date <= from_now(-1, "week")) or (not i.read and i.inserted_at <= from_now(-2, "week")), update: [ set: [tombstone: true, content: nil, creator: nil, date: nil, url: nil, title: nil] ] ) |> Repo.update_all([]) Logger.info("Converted #{count} read items to tombstones") end defp update_feed(feed, retry_count \\ 0) do Logger.debug("Updating #{feed.feed_url}") case URI.parse(feed.feed_url) do %URI{scheme: "gemini"} = uri -> update_feed_gemini(feed, uri, retry_count) %URI{scheme: scheme} when scheme in ["http", "https"] -> update_feed_http(feed, retry_count) %URI{scheme: scheme} -> Logger.warning("Unhandled scheme for feed: #{scheme}") end end defp update_feed_http(feed, retry_count) do case Network.http_get(feed.feed_url) do {:ok, %Tesla.Env{ status: 200, body: body, headers: headers }} -> {_, content_type} = headers |> Enum.find(fn {k, _v} -> String.downcase(k) == "content-type" end) content_type = content_type |> String.split(";") |> Enum.map(&String.trim/1) |> Enum.find(fn s -> !String.contains?(s, "=") end) do_update_feed(feed, content_type, body) {:ok, %Tesla.Env{status: status}} -> Logger.error("Couldn't load feed #{feed.feed_url}: HTTP #{status}") if Frenzy.sentry_enabled?() do Sentry.capture_message("Got HTTP #{status} when loading feed '#{feed.feed_url}'", extra: %{feed_id: feed.id} ) end {:error, reason} -> if retry_count < 5 do Process.send_after( self(), {:update_feed, feed, retry_count + 1}, trunc(:math.pow(4, retry_count)) * 1000 ) else Logger.error("Couldn't load feed #{feed.feed_url}: #{inspect(reason)}") if Frenzy.sentry_enabled?() do Sentry.capture_message("Error loading HTTP feed: #{inspect(reason)}", extra: %{feed_id: feed.id, feed_url: feed.feed_url} ) end end end end defp update_feed_gemini(feed, feed_uri, retry_count) do case Network.gemini_request(feed_uri) do {:ok, %Gemini.Response{meta: content_type, body: body}} -> do_update_feed(feed, content_type, body) {:error, reason} -> if retry_count < 5 do Process.send_after( self(), {:update_feed, feed, retry_count + 1}, trunc(:math.pow(4, retry_count)) * 1000 ) else Logger.error("Couldn't load feed #{feed.feed_url}: #{inspect(reason)}") if Frenzy.sentry_enabled?() do Sentry.capture_message( "Error loading Gemini feed: #{inspect(reason)}", extra: %{feed_id: feed.id, feed_url: feed.feed_url} ) end end end end defp do_update_feed(feed, content_type, data) do case FeedParser.parse(data, content_type) do {:ok, rss} -> update_feed_from_rss(feed, rss) {:error, :no_data} -> :ok {:error, reason} -> Logger.error("Unable to parse feed at '#{feed.feed_url}': #{inspect(reason)}") if Frenzy.sentry_enabled?() do Sentry.capture_message("Unable to parse feed: #{inspect(reason)}", extra: %{feed_id: feed.id, feed_url: feed.feed_url} ) end end end defp update_feed_from_rss(feed, %FeedParser.Feed{} = rss) do changeset = Feed.changeset(feed, %{ title: rss.title, site_url: rss.site_url, last_updated: (rss.last_updated || DateTime.utc_now()) |> Timex.Timezone.convert(:utc) }) {:ok, feed} = Repo.update(changeset) if is_nil(feed.favicon) do FetchFavicon.run(feed) end Enum.each(rss.items, fn entry -> {guid, _} = CreateItem.real_guid_and_url(entry) unless Item.exists?(feed.id, guid) do CreateItem.start_link(feed, entry) end end) end end