defmodule Clacks.Activity do require Logger use Ecto.Schema import Ecto.Changeset alias Clacks.Repo import Ecto.Query @type t() :: %__MODULE__{} @primary_key {:id, FlakeId.Ecto.Type, autogenerate: true} schema "activities" do field :data, :map field :local, :boolean field :actor, :string has_one :object, Clacks.Object, on_delete: :nothing, foreign_key: :id timestamps() end def changeset(%__MODULE__{} = schema, attrs) do schema |> cast(attrs, [:data, :local, :actor]) |> validate_required([:data, :local, :actor]) end def changeset_for_creating(activity, local \\ false) do changeset(%__MODULE__{}, %{ data: activity, local: local, actor: activity["actor"] }) end @spec get(id :: String.t()) :: t() | nil def get(id) do Repo.get(__MODULE__, id) end @spec get_by_ap_id(ap_id :: String.t(), force_refetch :: boolean()) :: t() | nil def get_by_ap_id(ap_id, force_refetch \\ false) do if force_refetch do fetch(ap_id) else get_cached_by_ap_id(ap_id) || fetch(ap_id) end end @spec get_cached_by_ap_id(ap_id :: String.t()) :: t() | nil def get_cached_by_ap_id(ap_id) do Repo.one(from a in __MODULE__, where: fragment("?->>'id'", a.data) == ^ap_id) end @spec fetch(ap_id :: String.t()) :: t() | nil def fetch(ap_id) do case Clacks.ActivityPub.Fetcher.fetch_activity(ap_id) do nil -> nil data -> actor = data["actor"] || data["attributedTo"] existing = get_cached_by_ap_id(data["id"]) changeset = changeset(existing || %__MODULE__{}, %{ data: data, local: false, actor: actor }) case Repo.insert_or_update(changeset) do {:ok, activity} -> _ = Clacks.Actor.get_by_ap_id(actor) activity {:error, changeset} -> Logger.error("Couldn't store remote activity #{ap_id}: #{inspect(changeset)}") nil end end end end