parallelize get_auth_chain outer

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-20 09:02:50 +00:00
parent ea25dc04b2
commit 4c0ae8c2f7

View file

@ -7,11 +7,14 @@ use std::{
};
use conduwuit::{
debug, debug_error, trace,
utils::{stream::ReadyExt, IterStream},
at, debug, debug_error, trace,
utils::{
stream::{ReadyExt, TryBroadbandExt},
IterStream,
},
validated, warn, Err, Result,
};
use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use ruma::{EventId, OwnedEventId, RoomId};
use self::data::Data;
@ -112,66 +115,61 @@ impl Service {
"start",
);
let mut hits: usize = 0;
let mut misses: usize = 0;
let mut full_auth_chain = Vec::with_capacity(buckets.len());
for chunk in buckets {
if chunk.is_empty() {
continue;
}
let full_auth_chain: Vec<_> = buckets
.into_iter()
.try_stream()
.broad_and_then(|chunk| async move {
let chunk_key: Vec<ShortEventId> = chunk.iter().map(at!(0)).collect();
let chunk_key: Vec<ShortEventId> =
chunk.iter().map(|(short, _)| short).copied().collect();
if let Ok(cached) = self.get_cached_eventid_authchain(&chunk_key).await {
trace!("Found cache entry for whole chunk");
full_auth_chain.extend(cached.iter().copied());
hits = hits.saturating_add(1);
continue;
}
if chunk_key.is_empty() {
return Ok(Vec::new());
}
let mut hits2: usize = 0;
let mut misses2: usize = 0;
let mut chunk_cache = Vec::with_capacity(chunk.len());
for (sevent_id, event_id) in chunk {
if let Ok(cached) = self.get_cached_eventid_authchain(&[sevent_id]).await {
trace!(?event_id, "Found cache entry for event");
chunk_cache.extend(cached.iter().copied());
hits2 = hits2.saturating_add(1);
} else {
let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?;
self.cache_auth_chain(vec![sevent_id], &auth_chain);
chunk_cache.extend(auth_chain.iter());
misses2 = misses2.saturating_add(1);
debug!(
event_id = ?event_id,
chain_length = ?auth_chain.len(),
chunk_cache_length = ?chunk_cache.len(),
elapsed = ?started.elapsed(),
"Cache missed event"
);
};
}
if let Ok(cached) = self.get_cached_eventid_authchain(&chunk_key).await {
return Ok(cached.to_vec());
}
chunk_cache.sort_unstable();
chunk_cache.dedup();
self.cache_auth_chain_vec(chunk_key, &chunk_cache);
full_auth_chain.extend(chunk_cache.iter());
misses = misses.saturating_add(1);
debug!(
chunk_cache_length = ?chunk_cache.len(),
hits = ?hits2,
misses = ?misses2,
elapsed = ?started.elapsed(),
"Chunk missed",
);
}
let chunk_cache: Vec<_> = chunk
.into_iter()
.try_stream()
.broad_and_then(|(shortid, event_id)| async move {
if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await {
return Ok(cached.to_vec());
}
let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?;
self.cache_auth_chain_vec(vec![shortid], auth_chain.as_slice());
debug!(
?event_id,
elapsed = ?started.elapsed(),
"Cache missed event"
);
Ok(auth_chain)
})
.try_collect()
.await?;
let mut chunk_cache: Vec<_> = chunk_cache.into_iter().flatten().collect();
chunk_cache.sort_unstable();
chunk_cache.dedup();
self.cache_auth_chain_vec(chunk_key, chunk_cache.as_slice());
debug!(
chunk_cache_length = ?chunk_cache.len(),
elapsed = ?started.elapsed(),
"Cache missed chunk",
);
Ok(chunk_cache)
})
.try_collect()
.await?;
let mut full_auth_chain: Vec<_> = full_auth_chain.into_iter().flatten().collect();
full_auth_chain.sort_unstable();
full_auth_chain.dedup();
debug!(
chain_length = ?full_auth_chain.len(),
hits = ?hits,
misses = ?misses,
elapsed = ?started.elapsed(),
"done",
);
@ -184,7 +182,7 @@ impl Service {
&self,
room_id: &RoomId,
event_id: &EventId,
) -> Result<HashSet<ShortEventId>> {
) -> Result<Vec<ShortEventId>> {
let mut todo: VecDeque<_> = [event_id.to_owned()].into();
let mut found = HashSet::new();
@ -226,7 +224,7 @@ impl Service {
}
}
Ok(found)
Ok(found.into_iter().collect())
}
#[inline]