gracefully ignore unknown columns; add dropped flag in descriptor

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-10 22:23:02 +00:00
parent 16fa2eca87
commit 66231676f1
3 changed files with 42 additions and 25 deletions

View file

@ -72,7 +72,7 @@ fn descriptor_cf_options(
}
fn set_table_options(opts: &mut Options, desc: &Descriptor, cache: Option<&Cache>) -> Result {
let mut table = table_options(desc);
let mut table = table_options(desc, cache.is_some());
if let Some(cache) = cache {
table.set_block_cache(cache);
} else {
@ -119,13 +119,13 @@ fn uc_options(desc: &Descriptor) -> UniversalCompactOptions {
opts
}
fn table_options(desc: &Descriptor) -> BlockBasedOptions {
fn table_options(desc: &Descriptor, has_cache: bool) -> BlockBasedOptions {
let mut opts = BlockBasedOptions::default();
opts.set_block_size(desc.block_size);
opts.set_metadata_block_size(desc.index_size);
opts.set_cache_index_and_filter_blocks(true);
opts.set_cache_index_and_filter_blocks(has_cache);
opts.set_pin_top_level_index_and_filter(false);
opts.set_pin_l0_filter_and_index_blocks_in_cache(false);
opts.set_partition_pinning_tier(BlockBasedPinningTier::None);
@ -144,10 +144,13 @@ fn table_options(desc: &Descriptor) -> BlockBasedOptions {
}
fn get_cache(ctx: &Context, desc: &Descriptor) -> Option<Cache> {
let config = &ctx.server.config;
if desc.dropped {
return None;
}
// Some cache capacities are overriden by server config in a strange but
// legacy-compat way
let config = &ctx.server.config;
let cap = match desc.name {
| "eventid_pduid" => Some(config.eventid_pdu_cache_capacity),
| "eventid_shorteventid" => Some(config.eventidshort_cache_capacity),

View file

@ -14,6 +14,7 @@ pub(crate) enum CacheDisp {
#[derive(Debug, Clone)]
pub(crate) struct Descriptor {
pub(crate) name: &'static str,
pub(crate) dropped: bool,
pub(crate) cache_disp: CacheDisp,
pub(crate) key_size_hint: Option<usize>,
pub(crate) val_size_hint: Option<usize>,
@ -39,6 +40,7 @@ pub(crate) struct Descriptor {
pub(crate) static BASE: Descriptor = Descriptor {
name: EMPTY,
dropped: false,
cache_disp: CacheDisp::Shared,
key_size_hint: None,
val_size_hint: None,

View file

@ -4,11 +4,15 @@ use std::{
sync::{atomic::AtomicU32, Arc},
};
use conduwuit::{debug, debug_warn, implement, info, warn, Result};
use conduwuit::{debug, implement, info, warn, Result};
use rocksdb::{ColumnFamilyDescriptor, Options};
use super::{
cf_opts::cf_options, db_opts::db_options, descriptor::Descriptor, repair::repair, Db, Engine,
cf_opts::cf_options,
db_opts::db_options,
descriptor::{self, Descriptor},
repair::repair,
Db, Engine,
};
use crate::{or_else, Context};
@ -72,38 +76,46 @@ fn configure_cfds(
let config = &server.config;
let path = &config.database_path;
let existing = Self::discover_cfs(path, db_opts);
debug!(
"Found {} existing columns; have {} described columns",
existing.len(),
desc.len()
);
existing
let creating = desc.iter().filter(|desc| !existing.contains(desc.name));
let missing = existing
.iter()
.filter(|&name| name != "default")
.filter(|&name| !desc.iter().any(|desc| desc.name == name))
.for_each(|name| {
debug_warn!("Found unknown column {name:?} in database which will not be opened.");
});
.filter(|&name| !desc.iter().any(|desc| desc.name == name));
desc.iter()
.filter(|desc| !existing.contains(desc.name))
.for_each(|desc| {
debug!(
"Creating new column {:?} which was not found in the existing database.",
desc.name,
);
});
debug!(
existing = existing.len(),
described = desc.len(),
missing = missing.clone().count(),
creating = creating.clone().count(),
"Discovered database columns"
);
missing.clone().for_each(|name| {
debug!("Found unrecognized column {name:?} in existing database.");
});
creating.map(|desc| desc.name).for_each(|name| {
debug!("Creating new column {name:?} not previously found in existing database.");
});
let missing_descriptors = missing
.clone()
.map(|_| Descriptor { dropped: true, ..descriptor::BASE });
let cfopts: Vec<_> = desc
.iter()
.map(|desc| cf_options(ctx, db_opts.clone(), desc))
.cloned()
.chain(missing_descriptors)
.map(|ref desc| cf_options(ctx, db_opts.clone(), desc))
.collect::<Result<_>>()?;
let cfds: Vec<_> = desc
.iter()
.map(|desc| desc.name)
.map(ToOwned::to_owned)
.chain(missing.cloned())
.zip(cfopts.into_iter())
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
.collect();