#![feature(let_chains)] mod activitypub; mod generator; use crate::generator::{HtmlContent, Post}; use axum::{ body::{boxed, Body, Bytes}, http::{Request, StatusCode}, response::{Html, IntoResponse, Redirect, Response}, routing::{get, get_service}, Extension, Router, }; use clap::{arg, command, Command}; use generator::output_path; use log::error; use log::info; use notify_debouncer_mini::new_debouncer; use once_cell::sync::Lazy; use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePoolOptions}, ConnectOptions, SqlitePool, }; use std::path::Path; use std::{convert::Infallible, net::SocketAddr}; use tokio::io::AsyncReadExt; use tokio_cron_scheduler::{Job, JobScheduler}; use tower::Service; use tower_http::services::{ServeDir, ServeFile}; #[tokio::main] async fn main() { env_logger::init(); let matches = command!() .subcommand_required(true) .arg_required_else_help(true) .subcommand(Command::new("gen")) .subcommand( Command::new("serve") .arg(arg!(--watch "Watch the site directory and regenerate on changes")), ) .get_matches(); if cfg!(debug_assertions) { info!("Running in dev mode"); } else { info!("Running in release mode"); } match matches.subcommand() { Some(("gen", _)) => { let _ = generate().await; } Some(("serve", matches)) => { // ensure that the keys are loadable _ = Lazy::force(&activitypub::keys::PUB_KEY_PEM); _ = Lazy::force(&activitypub::keys::PRIV_KEY_PEM); let posts = generate().await.expect("initial generation"); info!("Generated"); let pool = setup_db().await; activitypub::articles::insert_posts(&posts, &pool).await; if matches.is_present("watch") { start_watcher(); } let pool_ = pool.clone(); tokio::spawn(async move { match activitypub::articles::federate_outgoing(&pool_).await { Ok(()) => (), Err(e) => error!("Federating outgoing articles: {:?}", e), } }); let sched = JobScheduler::new().await.expect("create JobScheduler"); let digest_schedule = if cfg!(debug_assertions) { // every 5 minutes in debug "0 1/5 * * * *" } else { // every day at midnight utc "0 0 0 * * * *" }; let pool_ = pool.clone(); sched .add( Job::new_async(digest_schedule, move |_, _| { // this closure executes multiple times, so we need to clone the pool every // time rather than moving it into the closure let pool = pool_.clone(); Box::pin(async move { activitypub::digester::send_digest_if_necessary(&pool).await; }) }) .expect("creating digest job"), ) .await .expect("adding digest job"); sched.start().await.expect("starting JobScheduler"); serve(&posts, pool).await; } _ => unreachable!(), } } async fn generate() -> anyhow::Result>> { generator::generate().await } fn start_watcher() { let handle = tokio::runtime::Handle::current(); std::thread::spawn(move || { let (tx, rx) = std::sync::mpsc::channel(); let mut debouncer = new_debouncer(std::time::Duration::from_millis(100), None, tx).expect("debouncer"); debouncer .watcher() .watch(Path::new("site/"), notify::RecursiveMode::Recursive) .expect("watch"); info!("Started watcher"); for events in rx { let events = events.unwrap(); let paths = events.iter().map(|ev| &ev.path).collect::>(); info!("Regenerating due to changes at {:?}", paths); handle.spawn(async move { match generate().await { Ok(_) => (), Err(e) => error!("Regeneration failed: {:?}", e), } }); } }); } async fn setup_db() -> SqlitePool { let mut options = SqliteConnectOptions::new() .filename(std::env::var("DB_PATH").unwrap_or("db.sqlite".to_owned())); options.log_statements(log::LevelFilter::Debug); let pool = SqlitePoolOptions::new() .connect_with(options) .await .unwrap(); sqlx::migrate!() .run(&pool) .await .expect("database migrated"); pool } async fn serve(posts: &[Post], pool: SqlitePool) { let articles_or_fallback = tower::service_fn(handle_ap_article_or_serve_dir); let app = Router::new() .merge(activitypub::router(pool.clone())) .merge(redirect_router(posts)) .fallback(get_service(articles_or_fallback)) .layer(Extension(pool)); let addr = if cfg!(debug_assertions) { SocketAddr::from(([0, 0, 0, 0], 8084)) } else { SocketAddr::from(([127, 0, 0, 1], 8084)) }; info!("Listening on {}", addr); axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); } async fn handle_ap_article_or_serve_dir(req: Request) -> Result { let articles_res = activitypub::articles::handle(&req).await; match articles_res { Some(res) => Ok(res.into_response()), None => { let mut serve_dir = ServeDir::new("out").fallback(ServeFile::new(output_path("404.html"))); match serve_dir.call(req).await { Ok(resp) => Ok(resp.map(boxed)), Err(e) => Ok(handle_error(e).await.into_response()), } } } } fn redirect_router(posts: &[Post]) -> Router { let mut r = Router::new(); for post in posts.iter() { for path in post.metadata.old_permalink.iter().flatten() { let new_permalink = post.permalink(); r = r.route( path, get(|_: Request| async move { Redirect::permanent(&new_permalink) }), ); } } r } async fn handle_error(err: std::io::Error) -> impl IntoResponse { error!("Unhandled error: {}", err); ( StatusCode::INTERNAL_SERVER_ERROR, serve_500().await.unwrap_or_else(|err| { error!("Unhandled error serving 500 page: {}", &err); "Internal server error".into_response() }), ) } async fn serve_500() -> Result { let mut buf = vec![]; let mut f = tokio::fs::File::open(output_path("500.html")).await?; f.read_to_end(&mut buf).await?; Ok(Html(Bytes::from(buf)).into_response()) }