add unconstrained feature to service worker

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-02-06 23:58:45 +00:00 committed by strawberry
parent 5428526120
commit 59c073d0d8
3 changed files with 23 additions and 4 deletions

View file

@ -1,7 +1,7 @@
use std::{panic::AssertUnwindSafe, sync::Arc, time::Duration};
use conduwuit::{debug, debug_warn, error, trace, utils::time, warn, Err, Error, Result, Server};
use futures::FutureExt;
use futures::{FutureExt, TryFutureExt};
use tokio::{
sync::{Mutex, MutexGuard},
task::{JoinHandle, JoinSet},
@ -183,9 +183,14 @@ async fn worker(service: Arc<dyn Service>) -> WorkerResult {
let service_ = Arc::clone(&service);
let result = AssertUnwindSafe(service_.worker())
.catch_unwind()
.await
.map_err(Error::from_panic);
let result = if service.unconstrained() {
tokio::task::unconstrained(result).await
} else {
result.await
};
// flattens JoinError for panic into worker's Error
(service, result.unwrap_or_else(Err))
}

View file

@ -22,7 +22,7 @@ use ruma::{
RoomId, ServerName, UserId,
};
use smallvec::SmallVec;
use tokio::task::JoinSet;
use tokio::{task, task::JoinSet};
use self::data::Data;
pub use self::{
@ -111,8 +111,15 @@ impl crate::Service for Service {
.enumerate()
.fold(JoinSet::new(), |mut joinset, (id, _)| {
let self_ = self.clone();
let worker = self_.sender(id);
let worker = if self.unconstrained() {
task::unconstrained(worker).boxed()
} else {
worker.boxed()
};
let runtime = self.server.runtime();
let _abort = joinset.spawn_on(self_.sender(id).boxed(), runtime);
let _abort = joinset.spawn_on(worker, runtime);
joinset
});
@ -139,6 +146,8 @@ impl crate::Service for Service {
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
fn unconstrained(&self) -> bool { true }
}
impl Service {

View file

@ -39,6 +39,11 @@ pub(crate) trait Service: Any + Send + Sync {
/// Return the name of the service.
/// i.e. `crate::service::make_name(std::module_path!())`
fn name(&self) -> &str;
/// Return true if the service worker opts out of the tokio cooperative
/// budgeting. This can reduce tail latency at the risk of event loop
/// starvation.
fn unconstrained(&self) -> bool { false }
}
/// Args are passed to `Service::build` when a service is constructed. This