v6/src/activitypub/db.rs

219 lines
6.6 KiB
Rust

use super::types::ActorExt;
use super::DOMAIN;
use activitystreams::object::Image;
use activitystreams::prelude::*;
use anyhow::anyhow;
use log::debug;
use serde::Deserialize;
use sqlx::SqlitePool;
#[derive(Debug, Deserialize)]
pub struct Actor {
pub id: String,
pub actor_object: String,
pub is_follower: bool,
pub display_name: Option<String>,
pub inbox: String,
pub shared_inbox: Option<String>,
pub icon_url: Option<String>,
pub public_key_pem: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct Note {
pub id: String,
pub content: String,
pub in_reply_to: String,
pub conversation: String,
pub published: String, // RFC 3339 encoded UTC date time
pub actor_id: String,
pub actor_display_name: Option<String>,
pub actor_icon_url: Option<String>,
pub digested: bool,
}
pub async fn fetch_cached_actor(id: &str, pool: &SqlitePool) -> Option<Actor> {
sqlx::query_as!(Actor, "select * from actors where id = ?", id)
.fetch_optional(pool)
.await
.ok()
.flatten()
}
pub async fn add_cached_actor(actor: &ActorExt, pool: &SqlitePool) -> anyhow::Result<()> {
let id = actor
.id_unchecked()
.map(|iri| iri.as_str())
.ok_or_else(|| anyhow!("missing actor id"))?;
debug!("adding cached actor {:?}", id);
let actor_json = serde_json::to_string(&actor)?;
let mut display_name = actor
.name()
.and_then(|names| names.as_single_xsd_string().map(|s| s.to_owned()));
if display_name.as_ref().map_or(true, |s| s.is_empty()) {
display_name = actor.preferred_username().map(|s| s.to_owned());
}
let shared_inbox = actor
.endpoints()?
.and_then(|endpoints| endpoints.shared_inbox)
.map(|s| s.as_str());
let inbox = actor.inbox()?.as_str();
// jesus christ this is ridiculously elaborate
let icon = actor
.icon()
.and_then(|icon| icon.as_one())
.and_then(|icon| Image::from_any_base(icon.clone()).ok().flatten())
.and_then(|icon| {
icon.url()
.and_then(|urls| urls.as_single_id().map(|s| s.to_string()))
});
let pub_key_pem = &actor.ext_one.public_key.public_key_pem;
if fetch_cached_actor(id, pool).await.is_some() {
sqlx::query!("update actors set actor_object = ?2, display_name = ?3, inbox = ?4, shared_inbox = ?5, icon_url = ?6, public_key_pem = ?7 where id = ?1", id, actor_json, display_name, inbox, shared_inbox, icon, pub_key_pem)
.execute(pool)
.await?;
} else {
sqlx::query!("insert into actors (id, actor_object, display_name, inbox, icon_url, public_key_pem) values (?1, ?2, ?3, ?4, ?5, ?6)", id, actor_json, display_name, inbox, icon, pub_key_pem)
.execute(pool)
.await?;
}
Ok(())
}
pub async fn get_followers(pool: &SqlitePool) -> anyhow::Result<Vec<Actor>> {
Ok(
sqlx::query_as!(Actor, "select * from actors where is_follower = 1")
.fetch_all(pool)
.await?,
)
}
pub async fn get_article_for_path(path: &str, pool: &SqlitePool) -> Option<String> {
let trailing_slash = if path.ends_with("/") { "" } else { "/" };
let permalink = format!("https://{}{}{}", *DOMAIN, path, trailing_slash);
sqlx::query!(
"select article_object from articles where id = ?",
permalink
)
.fetch_optional(pool)
.await
.ok()
.flatten()
.map(|row| row.article_object)
}
pub async fn get_article_for_conversation(conv: &str, pool: &SqlitePool) -> Option<String> {
sqlx::query!(
"select article_object from articles where conversation = ?",
conv
)
.fetch_optional(pool)
.await
.ok()
.flatten()
.map(|row| row.article_object)
}
pub async fn get_unfederated_articles(pool: &SqlitePool) -> anyhow::Result<Vec<(String, String)>> {
Ok(
sqlx::query!("select id, article_object from articles where has_federated = 0")
.fetch_all(pool)
.await?
.into_iter()
.map(|row| (row.id, row.article_object))
.collect(),
)
}
pub async fn set_has_federated(
article_id: &str,
has_federated: bool,
pool: &SqlitePool,
) -> anyhow::Result<()> {
sqlx::query!(
"update articles set has_federated = ?1 where id = ?2",
has_federated,
article_id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn get_conversation_for_article_id(
id: &str,
pool: &SqlitePool,
) -> anyhow::Result<String> {
sqlx::query!("select conversation from articles where id = ?1", id)
.fetch_optional(pool)
.await?
.map(|row| row.conversation)
.ok_or(anyhow!("missing conversation for article"))
}
pub async fn store_note(
id: &str,
content: &str,
in_reply_to: &str,
conversation: &str,
published: &str,
actor_id: &str,
pool: &SqlitePool,
) -> anyhow::Result<()> {
sqlx::query!("insert into notes (id, content, in_reply_to, conversation, published, actor_id) values (?1, ?2, ?3, ?4, ?5, ?6)", id, content, in_reply_to, conversation, published, actor_id)
.execute(pool)
.await?;
Ok(())
}
pub async fn set_follow_state(
actor_id: &str,
is_follower: bool,
pool: &SqlitePool,
) -> anyhow::Result<()> {
sqlx::query!(
"update actors set is_follower = ?1 where id = ?2",
is_follower,
actor_id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn delete_note(note_id: &str, actor_id: &str, pool: &SqlitePool) -> anyhow::Result<()> {
sqlx::query!(
"delete from notes where id = ?1 and actor_id = ?2",
note_id,
actor_id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn get_notes(conversation: &str, pool: &SqlitePool) -> Result<Vec<Note>, sqlx::Error> {
sqlx::query_as!(Note, "select notes.*, actors.display_name as actor_display_name, actors.icon_url as actor_icon_url from notes inner join actors on notes.actor_id = actors.id where conversation = ?1", conversation)
.fetch_all(pool)
.await
}
pub async fn get_undigested_notes(pool: &SqlitePool) -> Result<Vec<(String, String)>, sqlx::Error> {
Ok(
sqlx::query!("select id, conversation from notes where digested = 0")
.fetch_all(pool)
.await?
.into_iter()
.map(|row| (row.id, row.conversation))
.collect(),
)
}
pub async fn set_note_digested(pool: &SqlitePool, id: &str) -> sqlx::Result<()> {
sqlx::query!("update notes set digested = 1 where id = ?1", id)
.execute(pool)
.await?;
Ok(())
}