defmodule Frenzy.UpdateFeeds do use GenServer alias Frenzy.{Repo, Feed, Item, FilterEngine} import Ecto.Query require Logger def start_link(state) do GenServer.start_link(__MODULE__, :ok, state) end def refresh(pid, feed) do GenServer.call(pid, {:refresh, feed}) end def init(state) do update_feeds() schedule_update() {:ok, state} end def handle_call({:refresh, feed}, _from, state) do update_feed(feed) new_feed = Feed |> Repo.get(feed.id) |> Repo.preload(:items) {:reply, new_feed, state} end def handle_info(:update_feeds, state) do update_feeds() schedule_update() {:noreply, state} end defp schedule_update() do # 15 minutes Process.send_after(self(), :update_feeds, 15 * 60 * 1000) # 1 minutes # Process.send_after(self(), :update_feeds, 60 * 1000) end defp update_feeds() do Logger.info("Updating all feeds") Repo.all(from Feed, preload: [:filter]) |> Enum.map(&update_feed/1) prune_old_items() end defp prune_old_items() do {count, _} = from(i in Item, where: i.read and not i.tombstone, # where: i.read_date <= from_now(-1, "week"), where: i.read_date <= from_now(-1, "minute"), update: [ set: [tombstone: true, content: nil, creator: nil, date: nil, url: nil, title: nil] ] ) |> Repo.update_all([]) Logger.info("Converted #{count} read items to tombstones") end defp update_feed(feed) do Logger.debug("Updating #{feed.feed_url}") case HTTPoison.get(feed.feed_url) do {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> case Fiet.parse(body) do {:ok, rss} -> update_feed_from_rss(feed, rss) end {:ok, %HTTPoison.Response{status_code: 404}} -> Logger.warn("RSS feed #{feed.feed_url} not found") {:ok, %HTTPoison.Response{status_code: status_code, headers: headers}} when status_code in [301, 302] -> {"Location", new_url} = Enum.find(headers, fn {name, _value} -> name == "Location" end) Logger.debug("Got 301 redirect from #{feed.feed_url} to #{new_url}, updating feed URL") changeset = Feed.changeset(feed, %{feed_url: new_url}) {:ok, feed} = Repo.insert(changeset) update_feed(feed) {:error, %HTTPoison.Error{reason: reason}} -> Logger.error("Couldn't load RSS feed: #{reason}") end end defp update_feed_from_rss(feed, rss) do last_updated = if rss.updated_at do parse_date(rss.updated_at) else DateTime.utc_now() end changeset = Feed.changeset(feed, %{ title: rss.title, site_url: rss.link.href, last_updated: last_updated }) Repo.update(changeset) feed = Repo.preload(feed, items: [], filter: [:rules]) Enum.map(rss.items, fn entry -> # todo: use Repo.exists for this if !Enum.any?(feed.items, fn item -> item.guid == entry.id end) do create_item(feed, entry) end end) end defp create_item(feed, entry) do url = get_real_url(entry) Logger.debug("Creating item for #{url}") content = case get_article_content(url) do {:ok, content} -> content {:err, reason} -> Logger.warn("Unable to fetch article for #{url}: #{reason}") entry.description end item_params = %{ guid: entry.id, title: entry.title, url: url, date: parse_date(entry.published_at), creator: "", content: content } result = if feed.filter_enabled do case {feed.filter.mode, FilterEngine.matches?(item_params, feed.filter)} do {"accept", true} -> :store {"reject", false} -> :store _ -> Logger.debug("Skipping item #{url} due to feed filter") :tombstone end else :store end changeset = case result do :store -> Ecto.build_assoc(feed, :items, item_params) :tombstone -> Ecto.build_assoc(feed, :items, %{ guid: entry.id, tombstone: true }) end Repo.insert(changeset) end defp parse_date(str) do case Timex.parse(str, "{RFC1123}") do {:ok, date} -> Timex.Timezone.convert(date, :utc) _ -> {:ok, date, _} = DateTime.from_iso8601(str) Timex.Timezone.convert(date, :utc) end end defp get_real_url(entry) do links = Enum.reject(entry.links, fn l -> l.rel == "shorturl" end) case Enum.find(links, fn l -> l.rel == "related" end) do nil -> case Enum.find(links, fn l -> l.rel == "alternate" end) do nil -> Enum.fetch!(links, 0).href link -> link.href end link -> link.href end end defp get_article_content(url) do Logger.debug("Getting article from #{url}") case HTTPoison.get(url) do {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> article = Readability.article(body) {:ok, Readability.readable_html(article)} {:ok, %HTTPoison.Response{status_code: 404}} -> {:err, "404 not found"} {:ok, %HTTPoison.Response{status_code: status_code, headers: headers}} when status_code in [301, 302] -> {"Location", new_url} = Enum.find(headers, fn {name, _value} -> name == "Location" end) Logger.debug("Got 301 redirect from #{url} to #{new_url}") get_article_content(new_url) {:ok, %HTTPoison.Response{status_code: 403}} -> {:err, "403 Forbidden"} {:error, %HTTPoison.Error{reason: reason}} -> {:err, reason} end end end