mirror of
https://github.com/girlbossceo/conduwuit.git
synced 2025-03-14 18:55:37 +00:00
simplify request handler task base
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
ff8bbd4cfa
commit
69837671bb
6 changed files with 35 additions and 76 deletions
|
@ -19,8 +19,6 @@ pub struct Metrics {
|
|||
runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
|
||||
|
||||
// TODO: move stats
|
||||
pub requests_spawn_active: AtomicU32,
|
||||
pub requests_spawn_finished: AtomicU32,
|
||||
pub requests_handle_active: AtomicU32,
|
||||
pub requests_handle_finished: AtomicU32,
|
||||
pub requests_panic: AtomicU32,
|
||||
|
@ -48,8 +46,6 @@ impl Metrics {
|
|||
#[cfg(tokio_unstable)]
|
||||
runtime_intervals: std::sync::Mutex::new(runtime_intervals),
|
||||
|
||||
requests_spawn_active: AtomicU32::new(0),
|
||||
requests_spawn_finished: AtomicU32::new(0),
|
||||
requests_handle_active: AtomicU32::new(0),
|
||||
requests_handle_finished: AtomicU32::new(0),
|
||||
requests_panic: AtomicU32::new(0),
|
||||
|
|
|
@ -5,7 +5,7 @@ use axum::{
|
|||
Router,
|
||||
};
|
||||
use axum_client_ip::SecureClientIpSource;
|
||||
use conduwuit::{error, Result, Server};
|
||||
use conduwuit::{debug, error, Result, Server};
|
||||
use conduwuit_api::router::state::Guard;
|
||||
use conduwuit_service::Services;
|
||||
use http::{
|
||||
|
@ -50,7 +50,6 @@ pub(crate) fn build(services: &Arc<Services>) -> Result<(Router, Guard)> {
|
|||
|
||||
let layers = layers
|
||||
.layer(SetSensitiveHeadersLayer::new([header::AUTHORIZATION]))
|
||||
.layer(axum::middleware::from_fn_with_state(Arc::clone(services), request::spawn))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(tracing_span::<_>)
|
||||
|
@ -196,20 +195,26 @@ fn catch_panic(
|
|||
}
|
||||
|
||||
fn tracing_span<T>(request: &http::Request<T>) -> tracing::Span {
|
||||
let path = request.extensions().get::<MatchedPath>().map_or_else(
|
||||
|| {
|
||||
request
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.expect("all requests have a path")
|
||||
.as_str()
|
||||
},
|
||||
truncated_matched_path,
|
||||
);
|
||||
let path = request
|
||||
.extensions()
|
||||
.get::<MatchedPath>()
|
||||
.map_or_else(|| request_path_str(request), truncated_matched_path);
|
||||
|
||||
let method = request.method();
|
||||
tracing::span! {
|
||||
parent: None,
|
||||
debug::INFO_SPAN_LEVEL,
|
||||
"router",
|
||||
method = %request.method(),
|
||||
%path,
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug_span!(parent: None, "router", %method, %path)
|
||||
fn request_path_str<T>(request: &http::Request<T>) -> &str {
|
||||
request
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.expect("all requests have a path")
|
||||
.as_str()
|
||||
}
|
||||
|
||||
fn truncated_matched_path(path: &MatchedPath) -> &str {
|
||||
|
|
|
@ -8,48 +8,6 @@ use conduwuit::{debug, debug_error, debug_warn, err, error, trace, Result};
|
|||
use conduwuit_service::Services;
|
||||
use http::{Method, StatusCode, Uri};
|
||||
|
||||
#[tracing::instrument(
|
||||
parent = None,
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
handled = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_spawn_finished
|
||||
.fetch_add(1, Ordering::Relaxed),
|
||||
active = %services
|
||||
.server
|
||||
.metrics
|
||||
.requests_spawn_active
|
||||
.fetch_add(1, Ordering::Relaxed),
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn spawn(
|
||||
State(services): State<Arc<Services>>,
|
||||
req: http::Request<axum::body::Body>,
|
||||
next: axum::middleware::Next,
|
||||
) -> Result<Response, StatusCode> {
|
||||
let server = &services.server;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
conduwuit::defer! {{
|
||||
_ = server
|
||||
.metrics
|
||||
.requests_spawn_active
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}};
|
||||
|
||||
if !server.running() {
|
||||
debug_warn!("unavailable pending shutdown");
|
||||
return Err(StatusCode::SERVICE_UNAVAILABLE);
|
||||
}
|
||||
|
||||
let fut = next.run(req);
|
||||
let task = server.runtime().spawn(fut);
|
||||
task.await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
|
@ -71,17 +29,15 @@ pub(crate) async fn handle(
|
|||
req: http::Request<axum::body::Body>,
|
||||
next: axum::middleware::Next,
|
||||
) -> Result<Response, StatusCode> {
|
||||
let server = &services.server;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
conduwuit::defer! {{
|
||||
_ = server
|
||||
_ = services.server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.fetch_sub(1, Ordering::Relaxed);
|
||||
}};
|
||||
|
||||
if !server.running() {
|
||||
if !services.server.running() {
|
||||
debug_warn!(
|
||||
method = %req.method(),
|
||||
uri = %req.uri(),
|
||||
|
@ -91,10 +47,15 @@ pub(crate) async fn handle(
|
|||
return Err(StatusCode::SERVICE_UNAVAILABLE);
|
||||
}
|
||||
|
||||
let uri = req.uri().clone();
|
||||
let method = req.method().clone();
|
||||
let result = next.run(req).await;
|
||||
handle_result(&method, &uri, result)
|
||||
let uri = req.uri().clone();
|
||||
services
|
||||
.server
|
||||
.runtime()
|
||||
.spawn(next.run(req))
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.and_then(|result| handle_result(&method, &uri, result))
|
||||
}
|
||||
|
||||
fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Response, StatusCode> {
|
||||
|
|
|
@ -125,7 +125,6 @@ async fn handle_shutdown(server: Arc<Server>, tx: Sender<()>, handle: axum_serve
|
|||
let timeout = Duration::from_secs(36);
|
||||
debug!(
|
||||
?timeout,
|
||||
spawn_active = ?server.metrics.requests_spawn_active.load(Ordering::Relaxed),
|
||||
handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed),
|
||||
"Notifying for graceful shutdown"
|
||||
);
|
||||
|
|
|
@ -24,27 +24,20 @@ pub(super) async fn serve(
|
|||
info!("Listening on {addrs:?}");
|
||||
while join_set.join_next().await.is_some() {}
|
||||
|
||||
let spawn_active = server.metrics.requests_spawn_active.load(Ordering::Relaxed);
|
||||
let handle_active = server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.load(Ordering::Relaxed);
|
||||
debug_info!(
|
||||
spawn_finished = server
|
||||
.metrics
|
||||
.requests_spawn_finished
|
||||
.load(Ordering::Relaxed),
|
||||
handle_finished = server
|
||||
.metrics
|
||||
.requests_handle_finished
|
||||
.load(Ordering::Relaxed),
|
||||
panics = server.metrics.requests_panic.load(Ordering::Relaxed),
|
||||
spawn_active,
|
||||
handle_active,
|
||||
"Stopped listening on {addrs:?}",
|
||||
);
|
||||
|
||||
debug_assert!(spawn_active == 0, "active request tasks are not joined");
|
||||
debug_assert!(handle_active == 0, "active request handles still pending");
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -159,7 +159,12 @@ async fn fini(server: &Arc<Server>, listener: UnixListener, mut tasks: JoinSet<(
|
|||
drop(listener);
|
||||
|
||||
debug!("Waiting for requests to finish...");
|
||||
while server.metrics.requests_spawn_active.load(Ordering::Relaxed) > 0 {
|
||||
while server
|
||||
.metrics
|
||||
.requests_handle_active
|
||||
.load(Ordering::Relaxed)
|
||||
.gt(&0)
|
||||
{
|
||||
tokio::select! {
|
||||
task = tasks.join_next() => if task.is_none() { break; },
|
||||
() = sleep(FINI_POLL_INTERVAL) => {},
|
||||
|
|
Loading…
Add table
Reference in a new issue