defmodule Frenzy.UpdateFeeds do use GenServer alias Frenzy.{Repo, Feed, Item, FilterEngine} import Ecto.Query require Logger def start_link(state) do GenServer.start_link(__MODULE__, :ok, state) end def refresh(pid, feed) do GenServer.call(pid, {:refresh, feed}) end def init(state) do update_feeds() schedule_update() {:ok, state} end def handle_call({:refresh, feed}, _from, state) do update_feed(feed) new_feed = Feed |> Repo.get(feed.id) |> Repo.preload(:items) {:reply, new_feed, state} end def handle_info(:update_feeds, state) do update_feeds() schedule_update() {:noreply, state} end defp schedule_update() do # 15 minutes Process.send_after(self(), :update_feeds, 15 * 60 * 1000) # 1 minutes # Process.send_after(self(), :update_feeds, 60 * 1000) end defp update_feeds() do Logger.info("Updating all feeds") Repo.all(from Feed, preload: [:filter]) |> Enum.map(&update_feed/1) prune_old_items() end defp prune_old_items() do {count, _} = from(i in Item, where: i.read and not i.tombstone, # where: i.read_date <= from_now(-1, "week"), where: i.read_date <= from_now(-1, "minute"), update: [ set: [tombstone: true, content: nil, creator: nil, date: nil, url: nil, title: nil] ] ) |> Repo.update_all([]) Logger.info("Converted #{count} read items to tombstones") end defp update_feed(feed) do Logger.debug("Updating #{feed.feed_url}") case HTTPoison.get(feed.feed_url) do {:ok, %HTTPoison.Response{ status_code: 200, body: body, headers: headers }} = response -> {_, content_type} = headers |> Enum.find(fn {k, v} -> k == "Content-Type" end) content_type = content_type |> String.split(";") |> Enum.map(&String.trim/1) |> Enum.find(fn s -> !String.contains?(s, "=") end) case FeedParser.parse(body, content_type) do {:ok, rss} -> update_feed_from_rss(feed, rss) end {:ok, %HTTPoison.Response{status_code: 404}} -> Logger.warn("RSS feed #{feed.feed_url} not found") {:ok, %HTTPoison.Response{status_code: status_code, headers: headers}} when status_code in [301, 302] -> {"Location", new_url} = Enum.find(headers, fn {name, _value} -> name == "Location" end) Logger.debug("Got 301 redirect from #{feed.feed_url} to #{new_url}, updating feed URL") changeset = Feed.changeset(feed, %{feed_url: new_url}) {:ok, feed} = Repo.update(changeset) update_feed(feed) {:ok, %HTTPoison.Response{} = response} -> Logger.error("Couldn't load RSS feed #{feed.feed_url}, got unexpected response: #{inspect(response)}") {:error, %HTTPoison.Error{reason: reason}} -> Logger.error("Couldn't load RSS feed #{feed.feed_url}: #{inspect(reason)}") end end defp update_feed_from_rss(feed, %FeedParser.Feed{} = rss) do changeset = Feed.changeset(feed, %{ title: rss.title, site_url: rss.site_url, last_updated: (rss.last_updated || DateTime.utc_now()) |> Timex.Timezone.convert(:utc) }) Repo.update(changeset) feed = Repo.preload(feed, items: [], filter: [:rules]) Enum.map(rss.items, fn entry -> # todo: use Repo.exists for this if !Enum.any?(feed.items, fn item -> item.guid == entry.guid end) do create_item(feed, entry) end end) end defp create_item(feed, entry) do url = get_real_url(entry) Logger.debug("Creating item for #{url}") item_params = %{ guid: entry.guid, title: entry.title, url: url, date: entry.date |> Timex.Timezone.convert(:utc) |> DateTime.truncate(:second), creator: "", content: entry.content } feed = Repo.preload(feed, :pipeline_stages) result = feed.pipeline_stages |> Enum.sort_by(& &1.index) |> Enum.reduce({:ok, item_params}, fn stage, {:ok, item_params} -> apply(String.to_existing_atom("Elixir." <> stage.module_name), :apply, [ stage.options, item_params ]) _stage, :tombstone -> :tombstone _stage, {:error, _error} = error -> error end) case result do {:err, error} -> Logger.error(error) {:ok, item_params} -> changeset = Ecto.build_assoc(feed, :items, item_params) case Repo.insert(changeset) do {:ok, item} -> item {:error, changeset} -> Logger.error("Error inserting item #{entry.guid}") Logger.error(changeset) end :tombstone -> changeset = Ecto.build_assoc(feed, :items, %{ guid: entry.id, tombstone: true }) case Repo.insert(changeset) do {:ok, item} -> item {:error, changeset} -> Logger.error("Error inserting tombstone for #{entry.guid}") Logger.error(changeset) end end end defp get_real_url(entry) do links = Enum.reject(entry.links, fn {_, rel} -> rel == "shorturl" end) case Enum.find(links, fn {_, rel} -> rel == "related" end) do nil -> case Enum.find(links, fn {_, rel} -> rel == "alternate" end) do nil -> [{href, _} | _] = links href {href, _} -> href end {href, _} -> href end end end