126 lines
3.9 KiB
Rust
126 lines
3.9 KiB
Rust
use crate::activitypub::{actor::ID, db, gen_ap_id};
|
|
use activitystreams::{
|
|
activity::Create,
|
|
base::AnyBase,
|
|
context, iri,
|
|
link::Mention,
|
|
object::{Article, Note},
|
|
prelude::{ApActorExt, BaseExt, ExtendsExt, LinkExt, ObjectExt},
|
|
time::OffsetDateTime,
|
|
};
|
|
use anyhow::anyhow;
|
|
use log::{error, info};
|
|
use sqlx::SqlitePool;
|
|
use std::collections::HashMap;
|
|
use std::fmt::Write;
|
|
|
|
use super::{
|
|
conversation_context, federate, gen_converation_id, get_actor,
|
|
types::{Conversation, NoteExt},
|
|
};
|
|
|
|
pub async fn send_digest_if_necessary(pool: &SqlitePool) {
|
|
match do_send_digest(pool).await {
|
|
Ok(()) => (),
|
|
Err(e) => error!("Sending digest failed: {:?}", e),
|
|
}
|
|
}
|
|
|
|
async fn do_send_digest(pool: &SqlitePool) -> anyhow::Result<()> {
|
|
let digest_recipient =
|
|
std::env::var("DIGEST_RECIPIENT_URL").map_err(|_| anyhow!("No digest recipient set"))?;
|
|
|
|
let digest_recipient_actor = get_actor(&digest_recipient, pool).await?;
|
|
|
|
let undigested = db::get_undigested_notes(pool).await?;
|
|
|
|
if undigested.is_empty() {
|
|
info!("No undigested notes, not sending digest");
|
|
return Ok(());
|
|
} else {
|
|
info!("Sending digest for {} notes", undigested.len());
|
|
}
|
|
|
|
// conversation => count
|
|
let mut counts = HashMap::<String, usize>::new();
|
|
for (_, conv) in undigested.iter() {
|
|
if let Some(count) = counts.get_mut(conv) {
|
|
*count += 1;
|
|
} else {
|
|
counts.insert(conv.to_owned(), 1);
|
|
}
|
|
}
|
|
|
|
let mut content = format!(
|
|
r#"<p><a href="{}" class="mention">@<span>{}</span></a> Received {} comment{} on {} post{}:</p>"#,
|
|
digest_recipient,
|
|
digest_recipient_actor.preferred_username().unwrap(),
|
|
undigested.len(),
|
|
if undigested.len() == 1 { "" } else { "s" },
|
|
counts.len(),
|
|
if counts.len() == 1 { "" } else { "s" },
|
|
);
|
|
|
|
for (conv, count) in counts {
|
|
let article = db::get_article_for_conversation(&conv, pool)
|
|
.await
|
|
.ok_or(anyhow!("Missing article for conversation"))?;
|
|
// all this unwrapping is safe because we generate this article ourselves
|
|
let mut article: Article = serde_json::from_str(&article).unwrap();
|
|
let url = article.take_id().unwrap();
|
|
let title = article
|
|
.take_name()
|
|
.unwrap()
|
|
.one()
|
|
.unwrap()
|
|
.xsd_string()
|
|
.unwrap();
|
|
write!(
|
|
content,
|
|
r#"<p><a href="{}">{}</a>: {} comment{}</p>"#,
|
|
url,
|
|
title,
|
|
count,
|
|
if count == 1 { "" } else { "s" }
|
|
)
|
|
.unwrap();
|
|
}
|
|
|
|
let mut note = NoteExt::new(Note::new(), Conversation::new());
|
|
note.add_context(context());
|
|
note.add_context(conversation_context());
|
|
note.set_id(iri!(gen_ap_id()));
|
|
note.add_to(digest_recipient_actor.id_unchecked().unwrap().to_owned());
|
|
note.set_many_ccs::<_, AnyBase>([]);
|
|
note.set_attributed_to(ID.clone());
|
|
note.set_content(content);
|
|
note.set_published(OffsetDateTime::now_utc());
|
|
note.ext_one.conversation = Some(gen_converation_id());
|
|
let mut mention = Mention::new();
|
|
mention.set_href(digest_recipient_actor.id_unchecked().unwrap().to_owned());
|
|
mention.set_name(format!(
|
|
"@{}",
|
|
digest_recipient_actor
|
|
.preferred_username()
|
|
.unwrap()
|
|
.to_owned()
|
|
));
|
|
note.set_tag(AnyBase::from_extended(mention)?);
|
|
|
|
let mut create = Create::new(ID.as_str(), note.into_any_base()?);
|
|
create.add_context(context());
|
|
create.add_context(conversation_context());
|
|
create.set_id(iri!(gen_ap_id()));
|
|
create.add_to(digest_recipient_actor.id_unchecked().unwrap().to_owned());
|
|
create.set_many_ccs::<_, AnyBase>([]);
|
|
|
|
let inbox = digest_recipient_actor.inbox()?.as_str();
|
|
federate::sign_and_send(&create, inbox).await?;
|
|
|
|
for (id, _) in undigested {
|
|
db::set_note_digested(pool, &id).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|