defmodule Clacks.Inbox do require Logger alias Clacks.{Repo, Activity, Object, Actor, ActivityPub, Notification} defp store_activity(%{"actor" => actor, "id" => ap_id} = activity, local \\ false) when is_binary(actor) do # remove the embedded object (if there is one) from the activity activity_without_embedded_object = case Map.get(activity, "object") do %{"id" => object_id} -> # todo: this assumes we already have stored the object Map.put(activity, "object", object_id) _ -> activity end changeset = Activity.changeset(Activity.get_cached_by_ap_id(ap_id) || %Activity{}, %{ data: activity_without_embedded_object, local: local, actor_ap_id: actor }) case Repo.insert_or_update(changeset) do {:ok, activity} -> Notification.process_notifications_for_incoming(activity) {:ok, activity} {:error, reason} -> {:error, reason} end end @spec handle(activity :: map()) :: :ok | {:error, reason :: any()} def handle(%{"type" => "Create", "object" => object} = activity) do object = Clacks.Inbox.Transformer.restrict_incoming_object(object) changeset = Object.changeset_for_creating(object) with {:ok, _object} <- Repo.insert(changeset), {:ok, _activity} <- store_activity(activity) do :ok else {:error, changeset} -> Logger.error("Couldn't store object or activity: #{inspect(changeset)}") {:error, "Couldn't store activity"} end end def handle(%{"type" => "Follow", "object" => followed_id, "actor" => follower_id} = activity) when is_binary(followed_id) do followed = Actor.get_by_ap_id(followed_id) follower = Actor.get_by_ap_id(follower_id) store_activity(activity) new_followers = [follower_id | followed.followers] |> Enum.uniq() changeset = Actor.changeset(followed, %{followers: new_followers}) case Repo.update(changeset) do {:error, changeset} -> Logger.error("Couldn't store updated followers: #{inspect(changeset)}") {:error, "Couldn't store updated followers"} {:ok, _followed} -> accept = ActivityPub.accept_follow(activity) case store_activity(accept, true) do {:error, changeset} -> Logger.error("Couldn't store Accept activity: #{inspect(changeset)}") {:error, "Couldn't store Accept activity"} {:ok, accept} -> ActivityPub.Federator.federate(accept, follower.data["inbox"]) end end end def handle( %{ "type" => "Accept", "actor" => followee_id, "object" => %{"type" => "Follow", "id" => follow_activity_id, "actor" => follower_id} } = activity ) do followee = Actor.get_by_ap_id(followee_id) store_activity(activity) follow_activity = Activity.get_cached_by_ap_id(follow_activity_id) changeset = Activity.changeset(follow_activity, %{ data: %{follow_activity.data | "state" => "accepted"} }) case Repo.update(changeset) do {:error, changeset} -> Logger.error("Couldn't store updated Follow activity: #{inspect(changeset)}") {:error, "Couldn't store updated Follow activity"} {:ok, _follow_activity} -> new_followers = [follower_id | followee.followers] |> Enum.uniq() changeset = Actor.changeset(followee, %{followers: new_followers}) case Repo.update(changeset) do {:error, changeset} -> Logger.error("Couldn't store updated followers: #{inspect(changeset)}") {:error, "Couldn't store updated followers"} {:ok, _followee} -> :ok end end end # as a fallback, just store the activity def handle(activity) do Logger.debug("Unhandled activity: #{inspect(activity)}") case store_activity(activity) do {:error, changeset} -> Logger.error("Could not store activity: #{inspect(changeset)}") {:error, "Could not store activity"} {:ok, _activity} -> :ok end end end