add configurables for frontend pool options

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-11-28 06:52:23 +00:00
parent 3ad6aa59f9
commit 2a9bb1ce11
4 changed files with 44 additions and 10 deletions

View file

@ -1338,6 +1338,18 @@
#
#admin_room_notices = true
# Sets the number of worker threads in the frontend-pool of the database.
# This number should reflect the I/O capabilities of the system,
# specifically the queue-depth or the number of simultaneous requests in
# flight. Defaults to 32 or number of CPU cores, whichever is greater.
#
#db_pool_workers = 32
# Size of the queue feeding the database's frontend-pool. Defaults to 256
# or eight times the number of CPU cores, whichever is greater.
#
#db_pool_queue_size = 256
[global.tls]
# Path to a valid TLS certificate file.

View file

@ -1500,6 +1500,20 @@ pub struct Config {
#[serde(default = "true_fn")]
pub admin_room_notices: bool,
/// Sets the number of worker threads in the frontend-pool of the database.
/// This number should reflect the I/O capabilities of the system,
/// specifically the queue-depth or the number of simultaneous requests in
/// flight. Defaults to 32 or number of CPU cores, whichever is greater.
/// default: 32
#[serde(default = "default_db_pool_workers")]
pub db_pool_workers: usize,
/// Size of the queue feeding the database's frontend-pool. Defaults to 256
/// or eight times the number of CPU cores, whichever is greater.
/// default: 256
#[serde(default = "default_db_pool_queue_size")]
pub db_pool_queue_size: usize,
#[serde(flatten)]
#[allow(clippy::zero_sized_map_values)] // this is a catchall, the map shouldn't be zero at runtime
catchall: BTreeMap<String, IgnoredAny>,
@ -2265,3 +2279,7 @@ fn parallelism_scaled_u32(val: u32) -> u32 {
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
fn default_trusted_server_batch_size() -> usize { 256 }
fn default_db_pool_workers() -> usize { sys::available_parallelism().max(32) }
fn default_db_pool_queue_size() -> usize { sys::available_parallelism().saturating_mul(8).max(256) }

View file

@ -103,6 +103,11 @@ impl Engine {
"Opened database."
);
let pool_opts = pool::Opts {
queue_size: config.db_pool_queue_size,
worker_num: config.db_pool_workers,
};
Ok(Arc::new(Self {
server: server.clone(),
row_cache,
@ -114,7 +119,7 @@ impl Engine {
corks: AtomicU32::new(0),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
pool: Pool::new(&pool::Opts::default())?,
pool: Pool::new(&pool_opts)?,
}))
}

View file

@ -17,15 +17,14 @@ pub(crate) struct Pool {
send: Sender<Cmd>,
}
#[derive(Default)]
pub(crate) struct Opts {
queue_size: Option<usize>,
worker_num: Option<usize>,
pub(crate) queue_size: usize,
pub(crate) worker_num: usize,
}
const QUEUE_LIMIT: (usize, usize) = (1, 8192);
const WORKER_LIMIT: (usize, usize) = (1, 512);
const WORKER_THREAD_NAME: &str = "conduwuit:db";
const DEFAULT_QUEUE_SIZE: usize = 1024;
const DEFAULT_WORKER_NUM: usize = 32;
#[derive(Debug)]
pub(crate) enum Cmd {
@ -43,7 +42,7 @@ type ResultSender = oneshot::Sender<Result<Handle<'static>>>;
#[implement(Pool)]
pub(crate) fn new(opts: &Opts) -> Result<Arc<Self>> {
let queue_size = opts.queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
let queue_size = opts.queue_size.clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1);
let (send, recv) = bounded(queue_size);
let pool = Arc::new(Self {
@ -52,7 +51,7 @@ pub(crate) fn new(opts: &Opts) -> Result<Arc<Self>> {
send,
});
let worker_num = opts.worker_num.unwrap_or(DEFAULT_WORKER_NUM);
let worker_num = opts.worker_num.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1);
pool.spawn_until(worker_num)?;
Ok(pool)
@ -147,12 +146,12 @@ fn worker(self: Arc<Self>, id: usize) {
#[implement(Pool)]
fn worker_loop(&self, id: usize) {
while let Ok(mut cmd) = self.recv.recv_blocking() {
self.handle(id, &mut cmd);
self.worker_handle(id, &mut cmd);
}
}
#[implement(Pool)]
fn handle(&self, id: usize, cmd: &mut Cmd) {
fn worker_handle(&self, id: usize, cmd: &mut Cmd) {
match cmd {
Cmd::Get(get) => self.handle_get(id, get),
}