v6/src/activitypub/inbox.rs

456 lines
15 KiB
Rust
Raw Normal View History

2022-12-10 18:15:32 +00:00
use super::{
actor::ID,
conversation_context, db,
db::get_article_for_conversation,
federate, gen_ap_id, gen_converation_id,
types::{ActorExt, Conversation, NoteExt},
util::sanitize::sanitize_html,
};
use activitystreams::{
activity::{kind::FollowType, Accept, ActorAndObject, Create, Follow},
actor::ApActorExt,
base::AnyBase,
context, iri,
link::{LinkExt, Mention},
object::{kind::NoteType, Note, ObjectExt},
prelude::*,
primitives::OneOrMany,
public,
time::{format_description, OffsetDateTime},
};
use anyhow::anyhow;
use axum::{
body::Bytes,
http::request::Parts,
response::{IntoResponse, Response},
};
use http_signature_normalization::Config;
use hyper::{body, Body, Request, StatusCode};
use log::{debug, error, info};
use openssl::{hash::MessageDigest, pkey::PKey, sha::Sha256, sign::Verifier};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use std::collections::BTreeMap;
use thiserror::Error;
pub async fn handle(req: Request<Body>) -> Response {
let pool = req.extensions().get::<SqlitePool>().unwrap().clone();
match try_handle_inbox(req, &pool).await {
Ok(res) => res,
Err(e) => {
error!("Error handling inbox: {:?}", &e);
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Unhandled error: {}", e),
)
.into_response()
}
}
}
async fn try_handle_inbox(req: Request<Body>, pool: &SqlitePool) -> anyhow::Result<Response> {
let (parts, body) = req.into_parts();
let data = body::to_bytes(body).await?;
let activity: AcceptedActivity = serde_json::from_slice(&data)?;
match verify_signature_twice(&activity, &data, &parts, pool).await {
Ok(()) => (),
Err(VerifySigError::MissingActor(_)) if activity.is_kind(&ValidTypes::Delete) => {
// We may not be able to verify the signature for a Delete activity if the actor has
// already been deleted and we don't have it cached, so just tell the remote server we
// accepted it (to prevent a flood of retries) and don't do anything.
return Ok(StatusCode::OK.into_response());
}
Err(e) => {
Err(e)?;
}
}
debug!("Verified signature for inbox activity: {:?}", activity);
match activity.kind().unwrap() {
ValidTypes::Follow => handle_follow(activity, pool).await,
ValidTypes::Create => {
let object = activity.object_unchecked().as_single_base().unwrap();
if let Ok(note_base) = object.to_owned().solidify::<NoteType>()
&& let Ok(note) = note_base.extend::<NoteExt>() {
handle_create_note(activity, note, pool).await
} else {
// unhandled, but we say ok so the remote doesn't keep retrying
Ok(StatusCode::OK.into_response())
}
}
ValidTypes::Undo => {
let object = activity.object_unchecked().as_single_base().unwrap();
if let Ok(follow_base) = object.to_owned().solidify::<FollowType>() {
let follow = follow_base.extend::<Follow>().unwrap();
handle_undo_follow(activity, follow, pool).await
} else {
// we don't handle anything else that can be undone, so just ignore it and say ok
Ok(StatusCode::OK.into_response())
}
}
ValidTypes::Delete => handle_delete(activity, pool).await,
ValidTypes::Like | ValidTypes::Announce => Ok(StatusCode::OK.into_response()),
}
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
pub enum ValidTypes {
Follow,
Create,
Undo,
Delete,
// we don't do anything with likes/announces, but we accept them in case anyone hits the button
Like,
Announce,
}
pub type AcceptedActivity = ActorAndObject<ValidTypes>;
async fn verify_signature_twice(
activity: &AcceptedActivity,
body: &Bytes,
req_parts: &Parts,
pool: &SqlitePool,
) -> Result<(), VerifySigError> {
match verify_signature(activity, body, req_parts, pool, false).await {
Ok(()) => Ok(()),
Err(_) => verify_signature(activity, body, req_parts, pool, true).await,
}
}
async fn verify_signature(
activity: &AcceptedActivity,
body: &Bytes,
req_parts: &Parts,
pool: &SqlitePool,
force_refetch_actor: bool,
) -> Result<(), VerifySigError> {
let actor_id = activity
.actor()
.or(Err(VerifySigError::MissingActorId))?
.as_single_xsd_any_uri()
.ok_or(VerifySigError::MissingActorId)?
.as_str();
let actor = if force_refetch_actor {
super::fetch_actor(actor_id, pool).await
} else {
super::get_actor(actor_id, pool).await
}
.map_err(|err| VerifySigError::MissingActor(err))?;
let pem = &actor.ext_one.public_key.public_key_pem;
let public_key =
PKey::public_key_from_pem(pem.as_bytes()).map_err(|e| VerifySigError::PubKey(e))?;
let mut headers_btreemap = BTreeMap::new();
for (k, v) in req_parts.headers.iter() {
let k = k.as_str().to_lowercase();
// recompute the digest of the body, rather than taking the sender's word for it
if k.as_str() == "digest" {
let mut sha256 = Sha256::new();
sha256.update(&body);
let digest = base64::encode(sha256.finish());
headers_btreemap.insert(k, format!("SHA-256={}", digest));
} else {
headers_btreemap.insert(
k,
v.to_str()
.map_err(|_| VerifySigError::BadHeader)?
.to_owned(),
);
}
}
Config::default()
.begin_verify(
req_parts.method.as_str(),
req_parts.uri.path_and_query().unwrap().as_str(),
headers_btreemap,
)
.map_err(|e| VerifySigError::PrepareVerifyError(e))?
.verify(|sig, signing_str| {
let decoded_sig =
base64::decode(sig).map_err(|e| VerifySigError::DecodingSignature(e))?;
let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)
.map_err(|e| VerifySigError::CreatingVerifier(e))?;
verifier
.update(signing_str.as_bytes())
.map_err(|e| VerifySigError::UpdatingVerifier(e))?;
let verified = verifier
.verify(&decoded_sig)
.map_err(|e| VerifySigError::Verifying(e))?;
if verified {
Ok(())
} else {
Err(VerifySigError::Invalid)
}
})?;
Ok(())
}
#[derive(Error, Debug)]
enum VerifySigError {
#[error("Missing actor id")]
MissingActorId,
#[error("Missing actor: {0:?}")]
MissingActor(anyhow::Error),
#[error("Parsing public key: {0:?}")]
PubKey(openssl::error::ErrorStack),
#[error("Could not convert header to string")]
BadHeader,
#[error("Preparing: {0:?}")]
PrepareVerifyError(http_signature_normalization::PrepareVerifyError),
#[error("Decoding signature from base64: {0:?}")]
DecodingSignature(base64::DecodeError),
#[error("Creating verifier: {0:?}")]
CreatingVerifier(openssl::error::ErrorStack),
#[error("Updating verifier: {0:?}")]
UpdatingVerifier(openssl::error::ErrorStack),
#[error("Verifying: {0:?}")]
Verifying(openssl::error::ErrorStack),
#[error("Invalid signature")]
Invalid,
}
async fn handle_follow(follow: AcceptedActivity, pool: &SqlitePool) -> anyhow::Result<Response> {
// must have an actor, and actor must be a string otherwise sig validation would have failed
// and we wouldn't have reached this point
let actor_id = follow
.actor_unchecked()
.as_single_xsd_any_uri()
.unwrap()
.as_str()
.to_owned();
if !follow.object_is(&*ID) {
return Ok((StatusCode::BAD_REQUEST, "No such actor").into_response());
}
// need a new pool handle that we can move into the closure
let pool = pool.clone();
tokio::spawn(async move {
match db::set_follow_state(&actor_id, true, &pool).await {
Ok(()) => (),
Err(e) => {
error!("Setting follow state: {:?}", e);
return;
}
}
match send_accept(follow, &pool, actor_id).await {
Ok(()) => (),
Err(e) => error!("Handling follow: {:?}", e),
}
});
Ok(StatusCode::OK.into_response())
}
async fn send_accept(
follow: AcceptedActivity,
pool: &SqlitePool,
actor_id: String,
) -> anyhow::Result<()> {
let mut accept = Accept::new(ID.as_str(), AnyBase::from_extended(follow)?);
accept.set_context(context());
accept.set_id(iri!(gen_ap_id()));
let actor = super::get_actor(&actor_id, pool).await?;
federate::sign_and_send(&accept, actor.inbox_unchecked().as_str())
.await
.map_err(|e| anyhow!("Sending accept: {:?}", e))?;
Ok(())
}
2023-01-06 05:26:21 +00:00
const ACCEPT_NON_PUBLIC_NOTES: bool = cfg!(debug_assertions);
2022-12-10 18:15:32 +00:00
async fn handle_create_note(
_create: AcceptedActivity,
note: NoteExt,
pool: &SqlitePool,
) -> anyhow::Result<Response> {
let attributed_to = note
.attributed_to()
.and_then(|one_or_many| one_or_many.as_single_id())
.ok_or(anyhow!(
"Missing attributed_to for Note {}",
note.id_unchecked().unwrap()
))?;
let actor = super::fetch_actor(attributed_to.as_str(), pool).await?;
if !ACCEPT_NON_PUBLIC_NOTES && !contains_public(note.to()) && !contains_public(note.cc()) {
debug!("Ignoring non-public post: {}", note.id_unchecked().unwrap());
tokio::spawn(async move {
match send_non_public_reply(note, actor).await {
Ok(()) => (),
Err(e) => error!("Sending non-public reply Note: {:?}", e),
}
});
return Ok(StatusCode::OK.into_response());
}
let conv: &str;
if let Some(s) = &note.ext_one.conversation {
conv = s;
} else {
info!(
"Ignoring incoming Note without conversation: {}",
note.id_unchecked().unwrap()
);
return Ok(StatusCode::OK.into_response());
}
let article_id = if let Some(id) = get_article_for_conversation(conv, pool).await {
id
} else {
info!(
"Ignoring incoming Note not in response to an article: {}",
note.id_unchecked().unwrap()
);
return Ok(StatusCode::OK.into_response());
};
let content: &str;
if let Some(s) = note.content().and_then(|one_or_many| one_or_many.one()) {
content = s.as_str();
} else {
info!(
"Ignoring incoming Note without content: {}",
note.id_unchecked().unwrap()
);
return Ok(StatusCode::OK.into_response());
}
let sanitized = sanitize_html(content);
// if we reach this point, we've validated the signature so the actor must be in the db
let note_id = note.id_unchecked().unwrap().as_str();
let in_reply_to = note
.in_reply_to()
.and_then(|one_or_many| one_or_many.as_single_id())
.map(|ri_str| ri_str.as_str())
// if there's no in_reply_to, we just attach it directly to the article
.unwrap_or(&article_id);
let published = note
.published()
.unwrap_or(OffsetDateTime::now_utc())
.format(&format_description::well_known::Rfc3339)?;
let actor_id = attributed_to.as_str();
db::store_note(
note_id,
&sanitized,
in_reply_to,
conv,
&published,
actor_id,
pool,
)
.await?;
info!("inserted note {}", note_id);
Ok(StatusCode::OK.into_response())
}
fn contains_public<'a>(addresses: Option<&'a OneOrMany<AnyBase>>) -> bool {
addresses
.map(|one_or_many| {
one_or_many.as_slice().iter().any(|anybase| {
anybase
.as_xsd_any_uri()
.map(|iri| public().eq(iri))
.unwrap_or(false)
})
})
.unwrap_or(false)
}
async fn send_non_public_reply(note: NoteExt, actor: ActorExt) -> anyhow::Result<()> {
let mut reply = NoteExt::new(Note::new(), Conversation::new());
reply.add_context(context());
reply.add_context(conversation_context());
reply.set_id(iri!(gen_ap_id()));
reply.add_to(actor.id_unchecked().unwrap().to_owned());
reply.set_many_ccs::<_, AnyBase>([]);
reply.set_attributed_to(ID.clone());
let url = actor
.url()
.and_then(|one_or_many| one_or_many.as_single_id())
.unwrap_or(actor.id_unchecked().unwrap());
let preferred_username = actor.preferred_username().unwrap();
let content = format!(
r#"<a href="{}" class="mention">@<span>{}</span></a> Non-public posts are not accepted. To respond to a blog post, use either Public or Unlisted."#,
url, preferred_username
);
reply.set_content(content);
reply.set_published(OffsetDateTime::now_utc());
reply.set_in_reply_to(note.id_unchecked().unwrap().to_owned());
reply.ext_one.conversation = Some(
note.ext_one
.conversation
.unwrap_or_else(|| gen_converation_id()),
);
let mut mention = Mention::new();
mention.set_href(actor.id_unchecked().unwrap().to_owned());
mention.set_name(format!("@{}", actor.preferred_username().unwrap()));
reply.set_tag(AnyBase::from_extended(mention)?);
let mut create = Create::new(ID.as_str(), reply.into_any_base()?);
create.add_context(context());
create.add_context(conversation_context());
create.set_id(iri!(gen_ap_id()));
create.add_to(actor.id_unchecked().unwrap().to_owned());
create.set_many_ccs::<_, AnyBase>([]);
let inbox = actor.inbox()?.as_str();
federate::sign_and_send(&create, inbox).await
}
async fn handle_undo_follow(
undo: AcceptedActivity,
follow: Follow,
pool: &SqlitePool,
) -> anyhow::Result<Response> {
// we don't care about unfollows of anyone other than ourself
if !follow.object_is(&*ID) {
return Ok(StatusCode::OK.into_response());
}
let actor_id = undo.actor_unchecked().as_single_id().unwrap();
// don't let anyone force other people to unfollow
if !follow.actor_is(actor_id) {
return Ok(StatusCode::BAD_REQUEST.into_response());
}
db::set_follow_state(actor_id.as_str(), false, pool).await?;
Ok(StatusCode::OK.into_response())
}
async fn handle_delete(delete: AcceptedActivity, pool: &SqlitePool) -> anyhow::Result<Response> {
let object_id = if let Some(id) = delete.object()?.as_single_id() {
id
} else {
return Ok(StatusCode::OK.into_response());
};
db::delete_note(
object_id.as_str(),
delete.actor_unchecked().as_single_id().unwrap().as_str(),
pool,
)
.await?;
Ok(StatusCode::OK.into_response())
}