use super::{ actor::ID, conversation_context, db, db::get_article_for_conversation, federate, gen_ap_id, gen_converation_id, types::{ActorExt, Conversation, NoteExt}, util::sanitize::sanitize_html, }; use activitystreams::{ activity::{kind::FollowType, Accept, ActorAndObject, Create, Follow}, actor::ApActorExt, base::AnyBase, context, iri, link::{LinkExt, Mention}, object::{kind::NoteType, Note, ObjectExt}, prelude::*, primitives::OneOrMany, public, time::{format_description, OffsetDateTime}, }; use anyhow::anyhow; use axum::{ body::Bytes, http::request::Parts, response::{IntoResponse, Response}, }; use http_signature_normalization::Config; use hyper::{body, Body, Request, StatusCode}; use log::{debug, error, info}; use openssl::{hash::MessageDigest, pkey::PKey, sha::Sha256, sign::Verifier}; use serde::{Deserialize, Serialize}; use sqlx::SqlitePool; use std::collections::BTreeMap; use thiserror::Error; pub async fn handle(req: Request) -> Response { let pool = req.extensions().get::().unwrap().clone(); match try_handle_inbox(req, &pool).await { Ok(res) => res, Err(e) => { error!("Error handling inbox: {:?}", &e); ( StatusCode::INTERNAL_SERVER_ERROR, format!("Unhandled error: {}", e), ) .into_response() } } } async fn try_handle_inbox(req: Request, pool: &SqlitePool) -> anyhow::Result { let (parts, body) = req.into_parts(); let data = body::to_bytes(body).await?; let activity: AcceptedActivity = serde_json::from_slice(&data)?; match verify_signature_twice(&activity, &data, &parts, pool).await { Ok(()) => (), Err(VerifySigError::MissingActor(_)) if activity.is_kind(&ValidTypes::Delete) => { // We may not be able to verify the signature for a Delete activity if the actor has // already been deleted and we don't have it cached, so just tell the remote server we // accepted it (to prevent a flood of retries) and don't do anything. return Ok(StatusCode::OK.into_response()); } Err(e) => { Err(e)?; } } debug!("Verified signature for inbox activity: {:?}", activity); match activity.kind().unwrap() { ValidTypes::Follow => handle_follow(activity, pool).await, ValidTypes::Create => { let object = activity.object_unchecked().as_single_base().unwrap(); if let Ok(note_base) = object.to_owned().solidify::() && let Ok(note) = note_base.extend::() { handle_create_note(activity, note, pool).await } else { // unhandled, but we say ok so the remote doesn't keep retrying Ok(StatusCode::OK.into_response()) } } ValidTypes::Undo => { let object = activity.object_unchecked().as_single_base().unwrap(); if let Ok(follow_base) = object.to_owned().solidify::() { let follow = follow_base.extend::().unwrap(); handle_undo_follow(activity, follow, pool).await } else { // we don't handle anything else that can be undone, so just ignore it and say ok Ok(StatusCode::OK.into_response()) } } ValidTypes::Delete => handle_delete(activity, pool).await, ValidTypes::Like | ValidTypes::Announce => Ok(StatusCode::OK.into_response()), } } #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "PascalCase")] pub enum ValidTypes { Follow, Create, Undo, Delete, // we don't do anything with likes/announces, but we accept them in case anyone hits the button Like, Announce, } pub type AcceptedActivity = ActorAndObject; async fn verify_signature_twice( activity: &AcceptedActivity, body: &Bytes, req_parts: &Parts, pool: &SqlitePool, ) -> Result<(), VerifySigError> { match verify_signature(activity, body, req_parts, pool, false).await { Ok(()) => Ok(()), Err(_) => verify_signature(activity, body, req_parts, pool, true).await, } } async fn verify_signature( activity: &AcceptedActivity, body: &Bytes, req_parts: &Parts, pool: &SqlitePool, force_refetch_actor: bool, ) -> Result<(), VerifySigError> { let actor_id = activity .actor() .or(Err(VerifySigError::MissingActorId))? .as_single_xsd_any_uri() .ok_or(VerifySigError::MissingActorId)? .as_str(); let actor = if force_refetch_actor { super::fetch_actor(actor_id, pool).await } else { super::get_actor(actor_id, pool).await } .map_err(|err| VerifySigError::MissingActor(err))?; let pem = &actor.ext_one.public_key.public_key_pem; let public_key = PKey::public_key_from_pem(pem.as_bytes()).map_err(|e| VerifySigError::PubKey(e))?; let mut headers_btreemap = BTreeMap::new(); for (k, v) in req_parts.headers.iter() { let k = k.as_str().to_lowercase(); // recompute the digest of the body, rather than taking the sender's word for it if k.as_str() == "digest" { let mut sha256 = Sha256::new(); sha256.update(&body); let digest = base64::encode(sha256.finish()); headers_btreemap.insert(k, format!("SHA-256={}", digest)); } else { headers_btreemap.insert( k, v.to_str() .map_err(|_| VerifySigError::BadHeader)? .to_owned(), ); } } Config::default() .begin_verify( req_parts.method.as_str(), req_parts.uri.path_and_query().unwrap().as_str(), headers_btreemap, ) .map_err(|e| VerifySigError::PrepareVerifyError(e))? .verify(|sig, signing_str| { let decoded_sig = base64::decode(sig).map_err(|e| VerifySigError::DecodingSignature(e))?; let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key) .map_err(|e| VerifySigError::CreatingVerifier(e))?; verifier .update(signing_str.as_bytes()) .map_err(|e| VerifySigError::UpdatingVerifier(e))?; let verified = verifier .verify(&decoded_sig) .map_err(|e| VerifySigError::Verifying(e))?; if verified { Ok(()) } else { Err(VerifySigError::Invalid) } })?; Ok(()) } #[derive(Error, Debug)] enum VerifySigError { #[error("Missing actor id")] MissingActorId, #[error("Missing actor: {0:?}")] MissingActor(anyhow::Error), #[error("Parsing public key: {0:?}")] PubKey(openssl::error::ErrorStack), #[error("Could not convert header to string")] BadHeader, #[error("Preparing: {0:?}")] PrepareVerifyError(http_signature_normalization::PrepareVerifyError), #[error("Decoding signature from base64: {0:?}")] DecodingSignature(base64::DecodeError), #[error("Creating verifier: {0:?}")] CreatingVerifier(openssl::error::ErrorStack), #[error("Updating verifier: {0:?}")] UpdatingVerifier(openssl::error::ErrorStack), #[error("Verifying: {0:?}")] Verifying(openssl::error::ErrorStack), #[error("Invalid signature")] Invalid, } async fn handle_follow(follow: AcceptedActivity, pool: &SqlitePool) -> anyhow::Result { // must have an actor, and actor must be a string otherwise sig validation would have failed // and we wouldn't have reached this point let actor_id = follow .actor_unchecked() .as_single_xsd_any_uri() .unwrap() .as_str() .to_owned(); if !follow.object_is(&*ID) { return Ok((StatusCode::BAD_REQUEST, "No such actor").into_response()); } // need a new pool handle that we can move into the closure let pool = pool.clone(); tokio::spawn(async move { match db::set_follow_state(&actor_id, true, &pool).await { Ok(()) => (), Err(e) => { error!("Setting follow state: {:?}", e); return; } } match send_accept(follow, &pool, actor_id).await { Ok(()) => (), Err(e) => error!("Handling follow: {:?}", e), } }); Ok(StatusCode::OK.into_response()) } async fn send_accept( follow: AcceptedActivity, pool: &SqlitePool, actor_id: String, ) -> anyhow::Result<()> { let mut accept = Accept::new(ID.as_str(), AnyBase::from_extended(follow)?); accept.set_context(context()); accept.set_id(iri!(gen_ap_id())); let actor = super::get_actor(&actor_id, pool).await?; federate::sign_and_send(&accept, actor.inbox_unchecked().as_str()) .await .map_err(|e| anyhow!("Sending accept: {:?}", e))?; Ok(()) } const ACCEPT_NON_PUBLIC_NOTES: bool = !cfg!(release); async fn handle_create_note( _create: AcceptedActivity, note: NoteExt, pool: &SqlitePool, ) -> anyhow::Result { let attributed_to = note .attributed_to() .and_then(|one_or_many| one_or_many.as_single_id()) .ok_or(anyhow!( "Missing attributed_to for Note {}", note.id_unchecked().unwrap() ))?; let actor = super::fetch_actor(attributed_to.as_str(), pool).await?; if !ACCEPT_NON_PUBLIC_NOTES && !contains_public(note.to()) && !contains_public(note.cc()) { debug!("Ignoring non-public post: {}", note.id_unchecked().unwrap()); tokio::spawn(async move { match send_non_public_reply(note, actor).await { Ok(()) => (), Err(e) => error!("Sending non-public reply Note: {:?}", e), } }); return Ok(StatusCode::OK.into_response()); } let conv: &str; if let Some(s) = ¬e.ext_one.conversation { conv = s; } else { info!( "Ignoring incoming Note without conversation: {}", note.id_unchecked().unwrap() ); return Ok(StatusCode::OK.into_response()); } let article_id = if let Some(id) = get_article_for_conversation(conv, pool).await { id } else { info!( "Ignoring incoming Note not in response to an article: {}", note.id_unchecked().unwrap() ); return Ok(StatusCode::OK.into_response()); }; let content: &str; if let Some(s) = note.content().and_then(|one_or_many| one_or_many.one()) { content = s.as_str(); } else { info!( "Ignoring incoming Note without content: {}", note.id_unchecked().unwrap() ); return Ok(StatusCode::OK.into_response()); } let sanitized = sanitize_html(content); // if we reach this point, we've validated the signature so the actor must be in the db let note_id = note.id_unchecked().unwrap().as_str(); let in_reply_to = note .in_reply_to() .and_then(|one_or_many| one_or_many.as_single_id()) .map(|ri_str| ri_str.as_str()) // if there's no in_reply_to, we just attach it directly to the article .unwrap_or(&article_id); let published = note .published() .unwrap_or(OffsetDateTime::now_utc()) .format(&format_description::well_known::Rfc3339)?; let actor_id = attributed_to.as_str(); db::store_note( note_id, &sanitized, in_reply_to, conv, &published, actor_id, pool, ) .await?; info!("inserted note {}", note_id); Ok(StatusCode::OK.into_response()) } fn contains_public<'a>(addresses: Option<&'a OneOrMany>) -> bool { addresses .map(|one_or_many| { one_or_many.as_slice().iter().any(|anybase| { anybase .as_xsd_any_uri() .map(|iri| public().eq(iri)) .unwrap_or(false) }) }) .unwrap_or(false) } async fn send_non_public_reply(note: NoteExt, actor: ActorExt) -> anyhow::Result<()> { let mut reply = NoteExt::new(Note::new(), Conversation::new()); reply.add_context(context()); reply.add_context(conversation_context()); reply.set_id(iri!(gen_ap_id())); reply.add_to(actor.id_unchecked().unwrap().to_owned()); reply.set_many_ccs::<_, AnyBase>([]); reply.set_attributed_to(ID.clone()); let url = actor .url() .and_then(|one_or_many| one_or_many.as_single_id()) .unwrap_or(actor.id_unchecked().unwrap()); let preferred_username = actor.preferred_username().unwrap(); let content = format!( r#"@{} Non-public posts are not accepted. To respond to a blog post, use either Public or Unlisted."#, url, preferred_username ); reply.set_content(content); reply.set_published(OffsetDateTime::now_utc()); reply.set_in_reply_to(note.id_unchecked().unwrap().to_owned()); reply.ext_one.conversation = Some( note.ext_one .conversation .unwrap_or_else(|| gen_converation_id()), ); let mut mention = Mention::new(); mention.set_href(actor.id_unchecked().unwrap().to_owned()); mention.set_name(format!("@{}", actor.preferred_username().unwrap())); reply.set_tag(AnyBase::from_extended(mention)?); let mut create = Create::new(ID.as_str(), reply.into_any_base()?); create.add_context(context()); create.add_context(conversation_context()); create.set_id(iri!(gen_ap_id())); create.add_to(actor.id_unchecked().unwrap().to_owned()); create.set_many_ccs::<_, AnyBase>([]); let inbox = actor.inbox()?.as_str(); federate::sign_and_send(&create, inbox).await } async fn handle_undo_follow( undo: AcceptedActivity, follow: Follow, pool: &SqlitePool, ) -> anyhow::Result { // we don't care about unfollows of anyone other than ourself if !follow.object_is(&*ID) { return Ok(StatusCode::OK.into_response()); } let actor_id = undo.actor_unchecked().as_single_id().unwrap(); // don't let anyone force other people to unfollow if !follow.actor_is(actor_id) { return Ok(StatusCode::BAD_REQUEST.into_response()); } db::set_follow_state(actor_id.as_str(), false, pool).await?; Ok(StatusCode::OK.into_response()) } async fn handle_delete(delete: AcceptedActivity, pool: &SqlitePool) -> anyhow::Result { let object_id = if let Some(id) = delete.object()?.as_single_id() { id } else { return Ok(StatusCode::OK.into_response()); }; db::delete_note( object_id.as_str(), delete.actor_unchecked().as_single_id().unwrap().as_str(), pool, ) .await?; Ok(StatusCode::OK.into_response()) }