228 lines
6.9 KiB
Rust
228 lines
6.9 KiB
Rust
#![feature(let_chains)]
|
|
|
|
mod activitypub;
|
|
mod generator;
|
|
|
|
use crate::generator::{HtmlContent, Post};
|
|
use axum::{
|
|
body::{boxed, Body, Bytes},
|
|
http::{Request, StatusCode},
|
|
response::{Html, IntoResponse, Redirect, Response},
|
|
routing::{get, get_service},
|
|
Extension, Router,
|
|
};
|
|
use clap::{arg, command, Command};
|
|
use generator::output_path;
|
|
use log::error;
|
|
use log::info;
|
|
use notify_debouncer_mini::new_debouncer;
|
|
use once_cell::sync::Lazy;
|
|
use sqlx::{
|
|
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
|
|
ConnectOptions, SqlitePool,
|
|
};
|
|
use std::path::Path;
|
|
use std::{convert::Infallible, net::SocketAddr};
|
|
use tokio::io::AsyncReadExt;
|
|
use tokio_cron_scheduler::{Job, JobScheduler};
|
|
use tower::Service;
|
|
use tower_http::services::{ServeDir, ServeFile};
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
env_logger::init();
|
|
|
|
let matches = command!()
|
|
.subcommand_required(true)
|
|
.arg_required_else_help(true)
|
|
.subcommand(Command::new("gen"))
|
|
.subcommand(
|
|
Command::new("serve")
|
|
.arg(arg!(--watch "Watch the site directory and regenerate on changes")),
|
|
)
|
|
.get_matches();
|
|
|
|
if cfg!(debug_assertions) {
|
|
info!("Running in dev mode");
|
|
} else {
|
|
info!("Running in release mode");
|
|
}
|
|
|
|
match matches.subcommand() {
|
|
Some(("gen", _)) => {
|
|
let _ = generate().await;
|
|
}
|
|
Some(("serve", matches)) => {
|
|
// ensure that the keys are loadable
|
|
_ = Lazy::force(&activitypub::keys::PUB_KEY_PEM);
|
|
_ = Lazy::force(&activitypub::keys::PRIV_KEY_PEM);
|
|
|
|
let posts = generate().await.expect("initial generation");
|
|
info!("Generated");
|
|
|
|
let pool = setup_db().await;
|
|
activitypub::articles::insert_posts(&posts, &pool).await;
|
|
|
|
if matches.is_present("watch") {
|
|
start_watcher();
|
|
}
|
|
|
|
let pool_ = pool.clone();
|
|
tokio::spawn(async move {
|
|
match activitypub::articles::federate_outgoing(&pool_).await {
|
|
Ok(()) => (),
|
|
Err(e) => error!("Federating outgoing articles: {:?}", e),
|
|
}
|
|
});
|
|
|
|
let sched = JobScheduler::new().await.expect("create JobScheduler");
|
|
let digest_schedule = if cfg!(debug_assertions) {
|
|
// every 5 minutes in debug
|
|
"0 1/5 * * * *"
|
|
} else {
|
|
// every day at midnight utc
|
|
"0 0 0 * * * *"
|
|
};
|
|
let pool_ = pool.clone();
|
|
sched
|
|
.add(
|
|
Job::new_async(digest_schedule, move |_, _| {
|
|
// this closure executes multiple times, so we need to clone the pool every
|
|
// time rather than moving it into the closure
|
|
let pool = pool_.clone();
|
|
Box::pin(async move {
|
|
activitypub::digester::send_digest_if_necessary(&pool).await;
|
|
})
|
|
})
|
|
.expect("creating digest job"),
|
|
)
|
|
.await
|
|
.expect("adding digest job");
|
|
sched.start().await.expect("starting JobScheduler");
|
|
|
|
serve(&posts, pool).await;
|
|
}
|
|
_ => unreachable!(),
|
|
}
|
|
}
|
|
|
|
async fn generate() -> anyhow::Result<Vec<Post<HtmlContent>>> {
|
|
generator::generate().await
|
|
}
|
|
|
|
fn start_watcher() {
|
|
let handle = tokio::runtime::Handle::current();
|
|
|
|
std::thread::spawn(move || {
|
|
let (tx, rx) = std::sync::mpsc::channel();
|
|
|
|
let mut debouncer =
|
|
new_debouncer(std::time::Duration::from_millis(100), None, tx).expect("debouncer");
|
|
|
|
debouncer
|
|
.watcher()
|
|
.watch(Path::new("site/"), notify::RecursiveMode::Recursive)
|
|
.expect("watch");
|
|
|
|
info!("Started watcher");
|
|
|
|
for events in rx {
|
|
let events = events.unwrap();
|
|
let paths = events.iter().map(|ev| &ev.path).collect::<Vec<_>>();
|
|
info!("Regenerating due to changes at {:?}", paths);
|
|
handle.spawn(async move {
|
|
match generate().await {
|
|
Ok(_) => (),
|
|
Err(e) => error!("Regeneration failed: {:?}", e),
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
async fn setup_db() -> SqlitePool {
|
|
let mut options = SqliteConnectOptions::new()
|
|
.filename(std::env::var("DB_PATH").unwrap_or("db.sqlite".to_owned()));
|
|
options.log_statements(log::LevelFilter::Debug);
|
|
let pool = SqlitePoolOptions::new()
|
|
.connect_with(options)
|
|
.await
|
|
.unwrap();
|
|
|
|
sqlx::migrate!()
|
|
.run(&pool)
|
|
.await
|
|
.expect("database migrated");
|
|
|
|
pool
|
|
}
|
|
|
|
async fn serve(posts: &[Post<HtmlContent>], pool: SqlitePool) {
|
|
let articles_or_fallback = tower::service_fn(handle_ap_article_or_serve_dir);
|
|
|
|
let app = Router::new()
|
|
.merge(activitypub::router(pool.clone()))
|
|
.merge(redirect_router(posts))
|
|
.fallback(get_service(articles_or_fallback))
|
|
.layer(Extension(pool));
|
|
|
|
let addr = if cfg!(debug_assertions) {
|
|
SocketAddr::from(([0, 0, 0, 0], 8084))
|
|
} else {
|
|
SocketAddr::from(([127, 0, 0, 1], 8084))
|
|
};
|
|
info!("Listening on {}", addr);
|
|
axum::Server::bind(&addr)
|
|
.serve(app.into_make_service())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
async fn handle_ap_article_or_serve_dir(req: Request<Body>) -> Result<Response, Infallible> {
|
|
let articles_res = activitypub::articles::handle(&req).await;
|
|
|
|
match articles_res {
|
|
Some(res) => Ok(res.into_response()),
|
|
None => {
|
|
let mut serve_dir =
|
|
ServeDir::new("out").fallback(ServeFile::new(output_path("404.html")));
|
|
match serve_dir.call(req).await {
|
|
Ok(resp) => Ok(resp.map(boxed)),
|
|
Err(e) => Ok(handle_error(e).await.into_response()),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn redirect_router(posts: &[Post<HtmlContent>]) -> Router {
|
|
let mut r = Router::new();
|
|
for post in posts.iter() {
|
|
for path in post.metadata.old_permalink.iter().flatten() {
|
|
let new_permalink = post.permalink();
|
|
r = r.route(
|
|
path,
|
|
get(|_: Request<Body>| async move { Redirect::permanent(&new_permalink) }),
|
|
);
|
|
}
|
|
}
|
|
r
|
|
}
|
|
|
|
async fn handle_error(err: std::io::Error) -> impl IntoResponse {
|
|
error!("Unhandled error: {}", err);
|
|
(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
serve_500().await.unwrap_or_else(|err| {
|
|
error!("Unhandled error serving 500 page: {}", &err);
|
|
"Internal server error".into_response()
|
|
}),
|
|
)
|
|
}
|
|
|
|
async fn serve_500() -> Result<Response, std::io::Error> {
|
|
let mut buf = vec![];
|
|
let mut f = tokio::fs::File::open(output_path("500.html")).await?;
|
|
f.read_to_end(&mut buf).await?;
|
|
Ok(Html(Bytes::from(buf)).into_response())
|
|
}
|