v6/src/activitypub/digester.rs

126 lines
3.9 KiB
Rust

use crate::activitypub::{actor::ID, db, gen_ap_id};
use activitystreams::{
activity::Create,
base::AnyBase,
context, iri,
link::Mention,
object::{Article, Note},
prelude::{ApActorExt, BaseExt, ExtendsExt, LinkExt, ObjectExt},
time::OffsetDateTime,
};
use anyhow::anyhow;
use log::{error, info};
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::fmt::Write;
use super::{
conversation_context, federate, gen_converation_id, get_actor,
types::{Conversation, NoteExt},
};
pub async fn send_digest_if_necessary(pool: &SqlitePool) {
match do_send_digest(pool).await {
Ok(()) => (),
Err(e) => error!("Sending digest failed: {:?}", e),
}
}
async fn do_send_digest(pool: &SqlitePool) -> anyhow::Result<()> {
let digest_recipient =
std::env::var("DIGEST_RECIPIENT_URL").map_err(|_| anyhow!("No digest recipient set"))?;
let digest_recipient_actor = get_actor(&digest_recipient, pool).await?;
let undigested = db::get_undigested_notes(pool).await?;
if undigested.is_empty() {
info!("No undigested notes, not sending digest");
return Ok(());
} else {
info!("Sending digest for {} notes", undigested.len());
}
// conversation => count
let mut counts = HashMap::<String, usize>::new();
for (_, conv) in undigested.iter() {
if let Some(count) = counts.get_mut(conv) {
*count += 1;
} else {
counts.insert(conv.to_owned(), 1);
}
}
let mut content = format!(
r#"<p><a href="{}" class="mention">@<span>{}</span></a> Received {} comment{} on {} post{}:</p>"#,
digest_recipient,
digest_recipient_actor.preferred_username().unwrap(),
undigested.len(),
if undigested.len() == 1 { "" } else { "s" },
counts.len(),
if counts.len() == 1 { "" } else { "s" },
);
for (conv, count) in counts {
let article = db::get_article_for_conversation(&conv, pool)
.await
.ok_or(anyhow!("Missing article for conversation"))?;
// all this unwrapping is safe because we generate this article ourselves
let mut article: Article = serde_json::from_str(&article).unwrap();
let url = article.take_id().unwrap();
let title = article
.take_name()
.unwrap()
.one()
.unwrap()
.xsd_string()
.unwrap();
write!(
content,
r#"<p><a href="{}">{}</a>: {} comment{}</p>"#,
url,
title,
count,
if count == 1 { "" } else { "s" }
)
.unwrap();
}
let mut note = NoteExt::new(Note::new(), Conversation::new());
note.add_context(context());
note.add_context(conversation_context());
note.set_id(iri!(gen_ap_id()));
note.add_to(digest_recipient_actor.id_unchecked().unwrap().to_owned());
note.set_many_ccs::<_, AnyBase>([]);
note.set_attributed_to(ID.clone());
note.set_content(content);
note.set_published(OffsetDateTime::now_utc());
note.ext_one.conversation = Some(gen_converation_id());
let mut mention = Mention::new();
mention.set_href(digest_recipient_actor.id_unchecked().unwrap().to_owned());
mention.set_name(format!(
"@{}",
digest_recipient_actor
.preferred_username()
.unwrap()
.to_owned()
));
note.set_tag(AnyBase::from_extended(mention)?);
let mut create = Create::new(ID.as_str(), note.into_any_base()?);
create.add_context(context());
create.add_context(conversation_context());
create.set_id(iri!(gen_ap_id()));
create.add_to(digest_recipient_actor.id_unchecked().unwrap().to_owned());
create.set_many_ccs::<_, AnyBase>([]);
let inbox = digest_recipient_actor.inbox()?.as_str();
federate::sign_and_send(&create, inbox).await?;
for (id, _) in undigested {
db::set_note_digested(pool, &id).await?;
}
Ok(())
}