defmodule Frenzy.UpdateFeeds do use GenServer alias Frenzy.{Network, Repo, Feed, Item} alias Frenzy.Task.{CreateItem, FetchFavicon} import Ecto.Query require Logger def start_link(state) do GenServer.start_link(__MODULE__, :ok, state) end def refresh(pid, feed) do GenServer.cast(pid, {:refresh, feed}) end def init(state) do update_feeds() schedule_update() {:ok, state} end def handle_cast({:refresh, feed}, state) do update_feed(feed) {:noreply, state} end def handle_info(:update_feeds, state) do update_feeds() schedule_update() {:noreply, state} end # workaround for unhanled {:ssl_closed, {:sslsocket, {:gen_tcp, ...}}} message when Gemini module def handle_info({:ssl_closed, _}, state), do: {:noreply, state} defp schedule_update() do # 30 minutes Process.send_after(self(), :update_feeds, 30 * 60 * 1000) # 1 minutes # Process.send_after(self(), :update_feeds, 60 * 1000) end defp update_feeds() do {count, feeds} = Feed |> where( [f], is_nil(f.last_refreshed_at) or f.last_refreshed_at <= from_now(-1 * f.refresh_frequency, "second") ) |> select([f], f) |> Repo.update_all(set: [last_refreshed_at: DateTime.utc_now()]) Logger.info("Updating #{count} feeds") Enum.each(feeds, &update_feed/1) Enum.each(feeds, fn feed -> try do update_feed(feed) rescue error -> Logger.warn( "Encountered error updating feed #{feed.id} #{feed.feed_url}: #{inspect(error)}" ) end end) prune_old_items() end defp prune_old_items() do {count, _} = from(i in Item, where: not i.tombstone, # todo: these time intervals should be configurable by the admin where: (i.read and i.read_date <= from_now(-1, "week")) or (not i.read and i.inserted_at <= from_now(-2, "week")), update: [ set: [tombstone: true, content: nil, creator: nil, date: nil, url: nil, title: nil] ] ) |> Repo.update_all([]) Logger.info("Converted #{count} read items to tombstones") end defp update_feed(feed) do Logger.debug("Updating #{feed.feed_url}") case URI.parse(feed.feed_url) do %URI{scheme: "gemini"} = uri -> update_feed_gemini(feed, uri) %URI{scheme: scheme} when scheme in ["http", "https"] -> update_feed_http(feed) %URI{scheme: scheme} -> Logger.warn("Unhandled scheme for feed: #{scheme}") end end defp update_feed_http(feed) do case Network.http_get(feed.feed_url) do {:ok, %Mojito.Response{ status_code: 200, body: body, headers: headers }} -> {_, content_type} = headers |> Enum.find(fn {k, _v} -> k == "content-type" end) content_type = content_type |> String.split(";") |> Enum.map(&String.trim/1) |> Enum.find(fn s -> !String.contains?(s, "=") end) do_update_feed(feed, content_type, body) {:error, reason} -> Logger.error("Couldn't load feed #{feed.feed_url}: #{inspect(reason)}") end end defp update_feed_gemini(feed, feed_uri) do case Network.gemini_request(feed_uri) do {:ok, %Gemini.Response{meta: content_type, body: body}} -> do_update_feed(feed, content_type, body) {:error, reason} -> Logger.error("Couldn't load feed #{feed.feed_url}: #{inspect(reason)}") end end defp do_update_feed(feed, content_type, data) do case FeedParser.parse(data, content_type) do {:ok, rss} -> update_feed_from_rss(feed, rss) {:error, reason} -> Logger.error("Unable to parse feed at '#{feed.feed_url}': #{inspect(reason)}") end end defp update_feed_from_rss(feed, %FeedParser.Feed{} = rss) do changeset = Feed.changeset(feed, %{ title: rss.title, site_url: rss.site_url, last_updated: (rss.last_updated || DateTime.utc_now()) |> Timex.Timezone.convert(:utc) }) {:ok, feed} = Repo.update(changeset) if is_nil(feed.favicon) do FetchFavicon.run(feed) end feed = Repo.preload(feed, [:items]) Enum.each(rss.items, fn entry -> # todo: use Repo.exists for this if !Enum.any?(feed.items, fn item -> item.guid == entry.guid end) do CreateItem.start_link(feed, entry) end end) end end