defmodule Clacks.Object do require Logger use Ecto.Schema import Ecto.Changeset alias Clacks.Repo import Ecto.Query @type t() :: %__MODULE__{} schema "objects" do field :data, :map timestamps() end def changeset(%__MODULE__{} = schema, attrs) do schema |> cast(attrs, [:data]) |> validate_required([:data]) end @spec get_by_ap_id( ap_id :: String.t(), force_refetch :: boolean(), synthesize_create :: boolean() ) :: t() | nil def get_by_ap_id(ap_id, force_refetch \\ false, synthesize_create \\ true) when is_binary(ap_id) do if force_refetch do fetch(ap_id, synthesize_create) else get_cached_by_ap_id(ap_id) || fetch(ap_id, synthesize_create) end end @spec get_cached_by_ap_id(ap_id :: String.t()) :: t() | nil def get_cached_by_ap_id(ap_id) when is_binary(ap_id) do Repo.one(from o in __MODULE__, where: fragment("?->>'id'", o.data) == ^ap_id) end @spec fetch(ap_id :: String.t(), synthesize_create :: boolean()) :: t() | nil def fetch(ap_id, synthesize_create \\ true) when is_binary(ap_id) do case Clacks.ActivityPub.Fetcher.fetch_object(ap_id) do nil -> nil data -> existing = get_cached_by_ap_id(data["id"]) changeset = changeset(existing || %__MODULE__{}, %{ data: data }) case Repo.insert_or_update(changeset) do {:ok, object} -> if synthesize_create do create = Clacks.ActivityPub.synthesized_create(data) changeset = Clacks.Activity.changeset(%Clacks.Activity{}, %{ data: create, local: false, actor: data["actor"] }) {:ok, _create} = Repo.insert_or_update(changeset) end object {:error, changeset} -> Logger.error("Couldn't store remote object #{ap_id}: #{inspect(changeset)}") nil end end end end