defmodule Clacks.ActivityPub.Federator do require Logger alias Clacks.{Repo, Actor, User, Keys, Activity} import Ecto.Query @public "https://www.w3.org/ns/activitystreams#Public" @spec federate_to_involved(activity :: Activity.t(), actor :: Actor.t()) :: :ok | {:error, any()} def federate_to_involved(%Activity{data: %{"to" => to, "cc" => cc}} = activity, actor) do activity_for_federating = Activity.data_with_object(activity) addressed = (to ++ cc) |> Enum.uniq() |> List.delete(@public) addressed_actors = if actor.data["followers"] in addressed do addressed = List.delete(addressed, actor.data["followers"]) [actor.followers | addressed] else addressed end Actor |> where([a], a.ap_id in ^addressed_actors) |> select([a], %{ shared_inbox: fragment("?->'endpoints'->>'sharedInbox'", a.data), inbox: fragment("?->>'inbox'", a.data) }) |> Repo.all() |> Enum.map(&inbox_for(activity_for_federating, &1)) |> Enum.uniq() |> Enum.reduce_while(:ok, fn inbox, _acc -> case do_federate(activity_for_federating, inbox) do {:error, _} = err -> {:halt, err} _ -> {:cont, :ok} end end) end @spec federate(Activity.t(), String.t()) :: :ok | {:error, any()} def federate(activity, inbox) do activity_for_federating = Activity.data_with_object(activity) do_federate(activity_for_federating, inbox) end @spec do_federate(activity :: map(), inbox :: String.t()) :: :ok | {:error, any()} defp do_federate(%{"actor" => actor_id} = activity, inbox) do Logger.debug("Federating #{activity["id"]} to #{inbox}") %{host: inbox_host, path: inbox_path} = URI.parse(inbox) {:ok, body} = Jason.encode(activity) digest = "SHA-256=" <> Base.encode64(:crypto.hash(:sha256, body)) date = signature_timestamp() signature_params = %{ "(request-target)": "post #{inbox_path}", host: inbox_host, "content-length": byte_size(body), digest: digest, date: date } {private_key, key_id} = private_key_for_actor(actor_id) signature_string = HTTPSignatures.sign(private_key, key_id, signature_params) headers = [ {"Content-Type", "application/activity+json"}, {"Date", date}, {"Signature", signature_string}, {"Digest", digest} ] opts = [hackney: Application.get_env(:clacks, :hackney_opts, [])] case HTTPoison.post(inbox, body, headers, opts) do {:ok, %HTTPoison.Response{status_code: status_code}} when status_code in 200..299 -> :ok {:ok, %HTTPoison.Response{status_code: status_code, body: body}} -> {:error, "unexpected response code #{status_code}: #{inspect(body)}"} {:error, _} = err -> err end end # see https://www.w3.org/TR/activitypub/#shared-inbox-delivery @spec inbox_for(activity :: map(), actor :: map()) :: String.t() defp inbox_for(activity, actor) do cond do @public in activity["to"] or @public in activity["cc"] -> shared_inbox_for(actor) actor["followers"] in activity["to"] or actor["followers"] in activity["cc"] -> shared_inbox_for(actor) true -> actor.inbox end end @spec shared_inbox_for(actor :: map()) :: String.t() defp shared_inbox_for(%{shared_inbox: shared}) when not is_nil(shared), do: shared defp shared_inbox_for(%{inbox: inbox}), do: inbox @spec signature_timestamp() :: String.t() defp signature_timestamp(date \\ NaiveDateTime.utc_now()) do Timex.format!(date, "{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT") end defp private_key_for_actor(ap_id) do %Actor{user: %User{private_key: pem}} = Actor.get_by_ap_id(ap_id) |> Repo.preload(:user) {:ok, private_key, _} = Keys.keys_from_private_key_pem(pem) {private_key, ap_id <> "#main-key"} end end