use std::convert::Infallible; use anyhow::anyhow; use futures::{SinkExt, stream::FusedStream}; use http::{Request, Response}; use http_body_util::{BodyExt, combinators::UnsyncBoxBody}; use hyper::body::Bytes; use hyper_tungstenite::{HyperWebsocket, tungstenite::Message}; use log::error; use tokio::sync::broadcast::error::RecvError; use tower::Service; use tower_http::{ services::{ServeDir, ServeFile}, set_status::SetStatus, }; use crate::CloneableReceiver; pub async fn handle( mut request: Request, mut fallback: ServeDir>, regen_complete_rx: CloneableReceiver<()>, ) -> anyhow::Result>> { if request.uri().path() == "/_dev/live_reload" && hyper_tungstenite::is_upgrade_request(&request) { let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?; tokio::spawn(async move { if let Err(e) = serve_websocket(websocket, regen_complete_rx).await { error!("Error serving websocket: {e:?}"); } }); Ok(response.map(|b| b.map_err(|_: Infallible| unreachable!()).boxed_unsync())) } else { let fallback_resp = fallback.call(request).await?; Ok(fallback_resp.map(|b| b.boxed_unsync())) } } async fn serve_websocket( websocket: HyperWebsocket, mut regen_complete_rx: CloneableReceiver<()>, ) -> anyhow::Result<()> { let mut websocket = websocket.await?; 'outer: loop { match regen_complete_rx.0.recv().await { Err(RecvError::Closed) => { break 'outer; } Err(_) => (), Ok(_) => { if websocket.is_terminated() { return Ok(()); } let result = websocket.send(Message::text("regenerated")).await; if let Err(e) = result { return Err(anyhow!(e)); } } } } Ok(()) }