clacks/lib/clacks/activity.ex

73 lines
1.7 KiB
Elixir

defmodule Clacks.Activity do
require Logger
use Ecto.Schema
import Ecto.Changeset
alias Clacks.Repo
import Ecto.Query
@type t() :: %__MODULE__{}
@primary_key {:id, FlakeId.Ecto.Type, autogenerate: true}
schema "activities" do
field :data, :map
field :local, :boolean
field :actor, :string
has_one :object, Clacks.Object, on_delete: :nothing, foreign_key: :id
timestamps()
end
def changeset(%__MODULE__{} = schema, attrs) do
schema
|> cast(attrs, [:data, :local, :actor])
|> validate_required([:data, :local, :actor])
end
@spec get_by_ap_id(ap_id :: String.t(), force_refetch :: boolean()) :: t() | nil
def get_by_ap_id(ap_id, force_refetch \\ false) do
if force_refetch do
fetch(ap_id)
else
get_cached_by_ap_id(ap_id) || fetch(ap_id)
end
end
@spec get_cached_by_ap_id(ap_id :: String.t()) :: t() | nil
def get_cached_by_ap_id(ap_id) do
Repo.one(from a in __MODULE__, where: fragment("?->>'id'", a.data) == ^ap_id)
end
@spec fetch(ap_id :: String.t()) :: t() | nil
def fetch(ap_id) do
case Clacks.ActivityPub.Fetcher.fetch_activity(ap_id) do
nil ->
nil
data ->
actor = data["actor"] || data["attributedTo"]
existing = get_cached_by_ap_id(data["id"])
changeset =
changeset(existing || %__MODULE__{}, %{
data: data,
local: false,
actor: actor
})
case Repo.insert_or_update(changeset) do
{:ok, activity} ->
_ = Clacks.Actor.get_by_ap_id(actor)
activity
{:error, changeset} ->
Logger.error("Couldn't store remote activity #{ap_id}: #{inspect(changeset)}")
nil
end
end
end
end