119 lines
3.7 KiB
Elixir
119 lines
3.7 KiB
Elixir
defmodule Clacks.ActivityPub.Federator do
|
|
require Logger
|
|
alias Clacks.{Repo, Actor, User, Keys, Activity}
|
|
import Ecto.Query
|
|
|
|
@public "https://www.w3.org/ns/activitystreams#Public"
|
|
|
|
@spec federate_to_involved(activity :: Activity.t(), actor :: Actor.t()) ::
|
|
:ok | {:error, any()}
|
|
def federate_to_involved(%Activity{data: %{"to" => to, "cc" => cc}} = activity, actor) do
|
|
activity_for_federating = Activity.data_with_object(activity)
|
|
|
|
addressed =
|
|
(to ++ cc)
|
|
|> Enum.uniq()
|
|
|> List.delete(@public)
|
|
|
|
addressed_actors =
|
|
if actor.data["followers"] in addressed do
|
|
addressed = List.delete(addressed, actor.data["followers"])
|
|
[actor.followers | addressed]
|
|
else
|
|
addressed
|
|
end
|
|
|
|
Actor
|
|
|> where([a], a.ap_id in ^addressed_actors)
|
|
|> select([a], %{
|
|
shared_inbox: fragment("?->'endpoints'->>'sharedInbox'", a.data),
|
|
inbox: fragment("?->>'inbox'", a.data)
|
|
})
|
|
|> Repo.all()
|
|
|> Enum.map(&inbox_for(activity_for_federating, &1))
|
|
|> Enum.uniq()
|
|
|> Enum.reduce_while(:ok, fn inbox, _acc ->
|
|
case do_federate(activity_for_federating, inbox) do
|
|
{:error, _} = err ->
|
|
{:halt, err}
|
|
|
|
_ ->
|
|
{:cont, :ok}
|
|
end
|
|
end)
|
|
end
|
|
|
|
@spec federate(Activity.t(), String.t()) :: :ok | {:error, any()}
|
|
def federate(activity, inbox) do
|
|
activity_for_federating = Activity.data_with_object(activity)
|
|
do_federate(activity_for_federating, inbox)
|
|
end
|
|
|
|
@spec do_federate(activity :: map(), inbox :: String.t()) :: :ok | {:error, any()}
|
|
defp do_federate(%{"actor" => actor_id} = activity, inbox) do
|
|
Logger.debug("Federating #{activity["id"]} to #{inbox}")
|
|
%{host: inbox_host, path: inbox_path} = URI.parse(inbox)
|
|
|
|
{:ok, body} = Jason.encode(activity)
|
|
digest = "SHA-256=" <> Base.encode64(:crypto.hash(:sha256, body))
|
|
date = signature_timestamp()
|
|
|
|
signature_params = %{
|
|
"(request-target)": "post #{inbox_path}",
|
|
host: inbox_host,
|
|
"content-length": byte_size(body),
|
|
digest: digest
|
|
}
|
|
|
|
{private_key, key_id} = private_key_for_actor(actor_id)
|
|
signature_string = HTTPSignatures.sign(private_key, key_id, signature_params)
|
|
|
|
headers = [
|
|
{"Content-Type", "application/activity+json"},
|
|
{"Date", date},
|
|
{"Signature", signature_string},
|
|
{"Digest", digest}
|
|
]
|
|
|
|
opts = [hackney: Application.get_env(:clacks, :hackney_opts, [])]
|
|
|
|
case HTTPoison.post(inbox, body, headers, opts) do
|
|
{:ok, %HTTPoison.Response{status_code: status_code}} when status_code in 200..299 ->
|
|
:ok
|
|
|
|
{:error, _} = err ->
|
|
err
|
|
end
|
|
end
|
|
|
|
# see https://www.w3.org/TR/activitypub/#shared-inbox-delivery
|
|
@spec inbox_for(activity :: map(), actor :: map()) :: String.t()
|
|
defp inbox_for(activity, actor) do
|
|
cond do
|
|
@public in activity["to"] or @public in activity["cc"] ->
|
|
shared_inbox_for(actor)
|
|
|
|
actor["followers"] in activity["to"] or actor["followers"] in activity["cc"] ->
|
|
shared_inbox_for(actor)
|
|
|
|
true ->
|
|
actor.inbox
|
|
end
|
|
end
|
|
|
|
@spec shared_inbox_for(actor :: map()) :: String.t()
|
|
defp shared_inbox_for(%{shared_inbox: shared}) when not is_nil(shared), do: shared
|
|
defp shared_inbox_for(%{inbox: inbox}), do: inbox
|
|
|
|
@spec signature_timestamp() :: String.t()
|
|
defp signature_timestamp(date \\ NaiveDateTime.utc_now()) do
|
|
Timex.format!(date, "{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT")
|
|
end
|
|
|
|
defp private_key_for_actor(ap_id) do
|
|
%Actor{user: %User{private_key: pem}} = Actor.get_by_ap_id(ap_id) |> Repo.preload(:user)
|
|
{:ok, private_key, _} = Keys.keys_from_private_key_pem(pem)
|
|
{private_key, ap_id <> "#main-key"}
|
|
end
|
|
end
|