optimize incremental sync state diff

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-02-02 07:40:08 +00:00
parent da4b94d80d
commit 106bcd30b7
2 changed files with 474 additions and 415 deletions

View file

@ -7,13 +7,13 @@ use std::{
use axum::extract::State;
use conduwuit::{
at, err, error, extract_variant, is_equal_to, pair_of,
pdu::EventHash,
pdu::{Event, EventHash},
ref_at,
result::FlatOk,
utils::{
self,
future::OptionExt,
math::ruma_from_u64,
stream::{BroadbandExt, Tools, WidebandExt},
stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
},
PduCount, PduEvent, Result,
@ -53,19 +53,16 @@ use ruma::{
serde::Raw,
uint, DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
};
use service::rooms::short::{ShortEventId, ShortStateKey};
use super::{load_timeline, share_encrypted_room};
use crate::{
client::{ignored_filter, lazy_loading_witness},
Ruma, RumaResponse,
};
use crate::{client::ignored_filter, Ruma, RumaResponse};
#[derive(Default)]
struct StateChanges {
heroes: Option<Vec<OwnedUserId>>,
joined_member_count: Option<u64>,
invited_member_count: Option<u64>,
joined_since_last_sync: bool,
state_events: Vec<PduEvent>,
device_list_updates: HashSet<OwnedUserId>,
left_encrypted_users: HashSet<OwnedUserId>,
@ -625,6 +622,40 @@ async fn load_joined_room(
.await?;
let (timeline_pdus, limited) = timeline;
let initial = since_shortstatehash.is_none();
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled();
let lazy_loading_context = &lazy_loading::Context {
user_id: sender_user,
device_id: sender_device,
room_id,
token: Some(since),
options: Some(&filter.room.state.lazy_load_options),
};
// Reset lazy loading because this is an initial sync
let lazy_load_reset: OptionFuture<_> = initial
.then(|| services.rooms.lazy_loading.reset(lazy_loading_context))
.into();
lazy_load_reset.await;
let witness: OptionFuture<_> = lazy_loading_enabled
.then(|| {
let witness: Witness = timeline_pdus
.iter()
.map(ref_at!(1))
.map(Event::sender)
.map(Into::into)
.chain(receipt_events.keys().map(Into::into))
.collect();
services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
})
.into();
let last_notification_read: OptionFuture<_> = timeline_pdus
.is_empty()
@ -646,41 +677,20 @@ async fn load_joined_room(
})
.into();
let (last_notification_read, since_sender_member, witness) =
join3(last_notification_read, since_sender_member, witness).await;
let joined_since_last_sync =
since_sender_member
.await
.flatten()
.is_none_or(|content: RoomMemberEventContent| {
content.membership != MembershipState::Join
});
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter.room.timeline.lazy_load_options.is_enabled();
let lazy_reset = since_shortstatehash.is_none();
let lazy_loading_context = &lazy_loading::Context {
user_id: sender_user,
device_id: sender_device,
room_id,
token: None,
options: Some(&filter.room.state.lazy_load_options),
};
// Reset lazy loading because this is an initial sync
let lazy_load_reset: OptionFuture<_> = lazy_reset
.then(|| services.rooms.lazy_loading.reset(lazy_loading_context))
.into();
lazy_load_reset.await;
let witness: OptionFuture<_> = lazy_loading_enabled
.then(|| lazy_loading_witness(services, lazy_loading_context, timeline_pdus.iter()))
.into();
let StateChanges {
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync,
state_events,
mut device_list_updates,
left_encrypted_users,
@ -693,7 +703,7 @@ async fn load_joined_room(
since_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
witness.await.as_ref(),
witness.as_ref(),
)
.boxed()
.await?;
@ -719,28 +729,7 @@ async fn load_joined_room(
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let typing_events = services
.rooms
.typing
.last_typing_update(room_id)
.and_then(|count| async move {
if count <= since {
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
}
let typings = services
.rooms
.typing
.typings_all(room_id, sender_user)
.await?;
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
})
.unwrap_or(Vec::new());
let send_notification_counts = last_notification_read
.is_none_or(|&count| count > since)
.await;
let send_notification_counts = last_notification_read.is_none_or(|count| count > since);
let notification_count: OptionFuture<_> = send_notification_counts
.then(|| {
@ -764,8 +753,27 @@ async fn load_joined_room(
})
.into();
let events = join3(room_events, account_data_events, typing_events);
let typing_events = services
.rooms
.typing
.last_typing_update(room_id)
.and_then(|count| async move {
if count <= since {
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
}
let typings = services
.rooms
.typing
.typings_all(room_id, sender_user)
.await?;
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
})
.unwrap_or(Vec::new());
let unread_notifications = join(notification_count, highlight_count);
let events = join3(room_events, account_data_events, typing_events);
let (unread_notifications, events, device_updates) =
join3(unread_notifications, events, device_updates)
.boxed()
@ -942,7 +950,6 @@ async fn calculate_state_initial(
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync: true,
state_events,
..Default::default()
})
@ -952,7 +959,7 @@ async fn calculate_state_initial(
#[allow(clippy::too_many_arguments)]
async fn calculate_state_incremental<'a>(
services: &Services,
sender_user: &UserId,
sender_user: &'a UserId,
room_id: &RoomId,
full_state: bool,
_filter: &FilterDefinition,
@ -965,102 +972,130 @@ async fn calculate_state_incremental<'a>(
let state_changed = since_shortstatehash != current_shortstatehash;
let state_get_id = |user_id: &'a UserId| {
services
.rooms
.state_accessor
.state_get_id(current_shortstatehash, &StateEventType::RoomMember, user_id.as_str())
.ok()
};
let lazy_state_ids: OptionFuture<_> = witness
.map(|witness| {
witness
.iter()
.stream()
.broad_filter_map(|user_id| state_get_id(user_id))
.collect::<Vec<OwnedEventId>>()
})
.into();
let current_state_ids: OptionFuture<_> = state_changed
.then(|| {
services
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.collect::<Vec<(_, OwnedEventId)>>()
})
.into();
let since_state_ids: OptionFuture<_> = (state_changed && !full_state)
.then(|| {
services
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.collect::<HashMap<_, OwnedEventId>>()
})
.into();
let lazy_state_ids = lazy_state_ids
.map(Option::into_iter)
.map(|iter| iter.flat_map(Vec::into_iter))
.map(IterStream::stream)
.flatten_stream();
let ref since_state_ids = since_state_ids.shared();
let delta_state_events = current_state_ids
.map(Option::into_iter)
.map(|iter| iter.flat_map(Vec::into_iter))
.map(IterStream::stream)
.flatten_stream()
.filter_map(|(shortstatekey, event_id): (u64, OwnedEventId)| async move {
since_state_ids
.clone()
.await
.is_none_or(|since_state| since_state.get(&shortstatekey) != Some(&event_id))
.then_some(event_id)
})
.chain(lazy_state_ids)
.broad_filter_map(|event_id: OwnedEventId| async move {
services
.rooms
.timeline
.get_pdu(&event_id)
.await
.map(move |pdu| (event_id, pdu))
.ok()
})
.collect::<HashMap<_, _>>();
let since_encryption = services
.rooms
.state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
.is_ok()
.await;
let (delta_state_events, encrypted_room) = join(delta_state_events, encrypted_room).await;
let state_get_shorteventid = |user_id: &'a UserId| {
services
.rooms
.state_accessor
.state_get_shortid(
current_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
.ok()
};
let (mut device_list_updates, left_encrypted_users) = delta_state_events
.values()
let lazy_state_ids: OptionFuture<_> = witness
.filter(|_| !full_state && !encrypted_room)
.map(|witness| {
witness
.iter()
.stream()
.broad_filter_map(|user_id| state_get_shorteventid(user_id))
.into_future()
})
.into();
let state_diff: OptionFuture<_> = (!full_state && state_changed)
.then(|| {
services
.rooms
.state_accessor
.state_added((since_shortstatehash, current_shortstatehash))
.boxed()
.into_future()
})
.into();
let current_state_ids: OptionFuture<_> = full_state
.then(|| {
services
.rooms
.state_accessor
.state_full_shortids(current_shortstatehash)
.expect_ok()
.boxed()
.into_future()
})
.into();
let lazy_state_ids = lazy_state_ids
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten();
let state_diff_ids = state_diff
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten();
let state_events = current_state_ids
.map(|opt| {
opt.map(|(curr, next)| {
let opt = curr;
let iter = Option::into_iter(opt);
IterStream::stream(iter).chain(next)
})
})
.map(Option::into_iter)
.map(IterStream::stream)
.flatten_stream()
.flatten()
.chain(state_diff_ids)
.broad_filter_map(|(shortstatekey, shorteventid)| async move {
if witness.is_none() || encrypted_room {
return Some(shorteventid);
}
lazy_filter(services, sender_user, shortstatekey, shorteventid).await
})
.chain(lazy_state_ids)
.broad_filter_map(|shorteventid| {
services
.rooms
.short
.get_eventid_from_short(shorteventid)
.ok()
})
.broad_filter_map(|event_id: OwnedEventId| async move {
services.rooms.timeline.get_pdu(&event_id).await.ok()
})
.collect::<Vec<_>>()
.await;
let (device_list_updates, left_encrypted_users) = state_events
.iter()
.stream()
.ready_filter(|_| encrypted_room)
.ready_filter(|state_event| state_event.kind == RoomMember)
.ready_filter_map(|state_event| {
let content = state_event.get_content().ok()?;
let user_id = state_event.state_key.as_ref()?.parse().ok()?;
let content: RoomMemberEventContent = state_event.get_content().ok()?;
let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?;
Some((content, user_id))
})
.ready_filter(|(_, user_id): &(RoomMemberEventContent, OwnedUserId)| {
user_id != sender_user
})
.fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move {
use MembershipState::*;
@ -1068,8 +1103,9 @@ async fn calculate_state_incremental<'a>(
|user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id));
match content.membership {
| Join if !shares_encrypted_room(&user_id).await => dlu.insert(user_id),
| Leave => leu.insert(user_id),
| Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await =>
dlu.insert(user_id),
| _ => false,
};
@ -1077,29 +1113,7 @@ async fn calculate_state_incremental<'a>(
})
.await;
// If the user is in a new encrypted room, give them all joined users
let new_encrypted_room = encrypted_room && !since_encryption.await;
if joined_since_last_sync && encrypted_room || new_encrypted_room {
services
.rooms
.state_cache
.room_members(room_id)
.ready_filter(|&user_id| sender_user != user_id)
.map(ToOwned::to_owned)
.broad_filter_map(|user_id| async move {
share_encrypted_room(services, sender_user, &user_id, Some(room_id))
.await
.or_some(user_id)
})
.ready_for_each(|user_id| {
device_list_updates.insert(user_id);
})
.await;
}
let send_member_count = delta_state_events
.values()
.any(|event| event.kind == RoomMember);
let send_member_count = state_events.iter().any(|event| event.kind == RoomMember);
let (joined_member_count, invited_member_count, heroes) = if send_member_count {
calculate_counts(services, room_id, sender_user).await?
@ -1111,13 +1125,29 @@ async fn calculate_state_incremental<'a>(
heroes,
joined_member_count,
invited_member_count,
joined_since_last_sync,
state_events,
device_list_updates,
left_encrypted_users,
state_events: delta_state_events.into_values().collect(),
})
}
async fn lazy_filter(
services: &Services,
sender_user: &UserId,
shortstatekey: ShortStateKey,
shorteventid: ShortEventId,
) -> Option<ShortEventId> {
let (event_type, state_key) = services
.rooms
.short
.get_statekey_from_short(shortstatekey)
.await
.ok()?;
(event_type != StateEventType::RoomMember || state_key == sender_user.as_str())
.then_some(shorteventid)
}
async fn calculate_counts(
services: &Services,
room_id: &RoomId,

View file

@ -6,7 +6,7 @@ use std::{
};
use conduwuit::{
at, err, error,
at, err, error, pair_of,
pdu::PduBuilder,
utils,
utils::{
@ -17,7 +17,7 @@ use conduwuit::{
Err, Error, PduEvent, Result,
};
use database::{Deserialized, Map};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use futures::{future::try_join, FutureExt, Stream, StreamExt, TryFutureExt};
use lru_cache::LruCache;
use ruma::{
events::{
@ -48,7 +48,7 @@ use crate::{
rooms::{
short::{ShortEventId, ShortStateHash, ShortStateKey},
state::RoomMutexGuard,
state_compressor::{compress_state_event, parse_compressed_state_event},
state_compressor::{compress_state_event, parse_compressed_state_event, CompressedState},
},
Dep,
};
@ -143,6 +143,256 @@ impl crate::Service for Service {
}
impl Service {
/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).
pub async fn room_state_get_content<T>(
&self,
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
self.room_state_get(room_id, event_type, state_key)
.await
.and_then(|event| event.get_content())
}
/// Returns the full room state.
#[tracing::instrument(skip(self), level = "debug")]
pub fn room_state_full<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = Result<((StateEventType, String), PduEvent)>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok))
.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
.try_flatten_stream()
}
/// Returns the full room state pdus
#[tracing::instrument(skip(self), level = "debug")]
pub fn room_state_full_pdus<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = Result<PduEvent>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|shortstatehash| self.state_full_pdus(shortstatehash).map(Ok))
.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
.try_flatten_stream()
}
/// Returns a single EventId from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn room_state_get_id<Id>(
&self,
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<Id>
where
Id: for<'de> Deserialize<'de> + Sized + ToOwned,
<Id as ToOwned>::Owned: Borrow<EventId>,
{
self.services
.state
.get_room_shortstatehash(room_id)
.and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key))
.await
}
/// Returns a single PDU from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn room_state_get(
&self,
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<PduEvent> {
self.services
.state
.get_room_shortstatehash(room_id)
.and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key))
.await
}
/// The user was a joined member at this state (potentially in the past)
#[inline]
async fn user_was_joined(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool {
self.user_membership(shortstatehash, user_id).await == MembershipState::Join
}
/// The user was an invited or joined room member at this state (potentially
/// in the past)
#[inline]
async fn user_was_invited(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool {
let s = self.user_membership(shortstatehash, user_id).await;
s == MembershipState::Join || s == MembershipState::Invite
}
/// Get membership for given user in state
async fn user_membership(
&self,
shortstatehash: ShortStateHash,
user_id: &UserId,
) -> MembershipState {
self.state_get_content(shortstatehash, &StateEventType::RoomMember, user_id.as_str())
.await
.map_or(MembershipState::Leave, |c: RoomMemberEventContent| c.membership)
}
/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).
pub async fn state_get_content<T>(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
self.state_get(shortstatehash, event_type, state_key)
.await
.and_then(|event| event.get_content())
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_contains(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> bool {
let Ok(shortstatekey) = self
.services
.short
.get_shortstatekey(event_type, state_key)
.await
else {
return false;
};
self.state_contains_shortstatekey(shortstatehash, shortstatekey)
.await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_contains_shortstatekey(
&self,
shortstatehash: ShortStateHash,
shortstatekey: ShortStateKey,
) -> bool {
let start = compress_state_event(shortstatekey, 0);
let end = compress_state_event(shortstatekey, u64::MAX);
self.load_full_state(shortstatehash)
.map_ok(|full_state| full_state.range(start..end).next().copied())
.await
.flat_ok()
.is_some()
}
/// Returns a single PDU from `room_id` with key (`event_type`,
/// `state_key`).
pub async fn state_get(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<PduEvent> {
self.state_get_id(shortstatehash, event_type, state_key)
.and_then(|event_id: OwnedEventId| async move {
self.services.timeline.get_pdu(&event_id).await
})
.await
}
/// Returns a single EventId from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_get_id<Id>(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<Id>
where
Id: for<'de> Deserialize<'de> + Sized + ToOwned,
<Id as ToOwned>::Owned: Borrow<EventId>,
{
let shorteventid = self
.state_get_shortid(shortstatehash, event_type, state_key)
.await?;
self.services
.short
.get_eventid_from_short(shorteventid)
.await
}
/// Returns a single EventId from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_get_shortid(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<ShortEventId> {
let shortstatekey = self
.services
.short
.get_shortstatekey(event_type, state_key)
.await?;
let start = compress_state_event(shortstatekey, 0);
let end = compress_state_event(shortstatekey, u64::MAX);
self.load_full_state(shortstatehash)
.map_ok(|full_state| {
full_state
.range(start..end)
.next()
.copied()
.map(parse_compressed_state_event)
.map(at!(1))
.ok_or(err!(Request(NotFound("Not found in room state"))))
})
.await?
}
/// Returns the state events removed between the interval (present in .0 but
/// not in .1)
#[inline]
pub fn state_removed(
&self,
shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + '_ {
self.state_added((shortstatehash.1, shortstatehash.0))
}
/// Returns the state events added between the interval (present in .1 but
/// not in .0)
#[tracing::instrument(skip(self), level = "debug")]
pub fn state_added<'a>(
&'a self,
shortstatehash: pair_of!(ShortStateHash),
) -> impl Stream<Item = (ShortStateKey, ShortEventId)> + Send + 'a {
let a = self.load_full_state(shortstatehash.0);
let b = self.load_full_state(shortstatehash.1);
try_join(a, b)
.map_ok(|(a, b)| b.difference(&a).copied().collect::<Vec<_>>())
.map_ok(IterStream::try_stream)
.try_flatten_stream()
.expect_ok()
.map(parse_compressed_state_event)
}
pub fn state_full(
&self,
shortstatehash: ShortStateHash,
@ -208,110 +458,11 @@ impl Service {
.ready_filter_map(|(event_id, shortstatekey)| Some((shortstatekey, event_id.ok()?)))
}
/// Returns a single EventId from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_get_id<Id>(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<Id>
where
Id: for<'de> Deserialize<'de> + Sized + ToOwned,
<Id as ToOwned>::Owned: Borrow<EventId>,
{
let shorteventid = self
.state_get_shortid(shortstatehash, event_type, state_key)
.await?;
self.services
.short
.get_eventid_from_short(shorteventid)
.await
}
/// Returns a single EventId from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_get_shortid(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<ShortEventId> {
let shortstatekey = self
.services
.short
.get_shortstatekey(event_type, state_key)
.await?;
let start = compress_state_event(shortstatekey, 0);
let end = compress_state_event(shortstatekey, u64::MAX);
self.services
.state_compressor
.load_shortstatehash_info(shortstatehash)
.map_ok(|vec| vec.last().expect("at least one layer").full_state.clone())
.map_ok(|full_state| {
full_state
.range(start..end)
.next()
.copied()
.map(parse_compressed_state_event)
.map(at!(1))
.ok_or(err!(Request(NotFound("Not found in room state"))))
})
.await?
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_contains(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> bool {
let Ok(shortstatekey) = self
.services
.short
.get_shortstatekey(event_type, state_key)
.await
else {
return false;
};
self.state_contains_shortstatekey(shortstatehash, shortstatekey)
.await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn state_contains_shortstatekey(
&self,
shortstatehash: ShortStateHash,
shortstatekey: ShortStateKey,
) -> bool {
let start = compress_state_event(shortstatekey, 0);
let end = compress_state_event(shortstatekey, u64::MAX);
self.services
.state_compressor
.load_shortstatehash_info(shortstatehash)
.map_ok(|vec| vec.last().expect("at least one layer").full_state.clone())
.map_ok(|full_state| full_state.range(start..end).next().copied())
.await
.flat_ok()
.is_some()
}
pub fn state_full_shortids(
&self,
shortstatehash: ShortStateHash,
) -> impl Stream<Item = Result<(ShortStateKey, ShortEventId)>> + Send + '_ {
self.services
.state_compressor
.load_shortstatehash_info(shortstatehash)
.map_err(|e| err!(Database("Missing state IDs: {e}")))
.map_ok(|vec| vec.last().expect("at least one layer").full_state.clone())
self.load_full_state(shortstatehash)
.map_ok(|full_state| {
full_state
.deref()
@ -324,59 +475,32 @@ impl Service {
.try_flatten_stream()
}
/// Returns a single PDU from `room_id` with key (`event_type`,
/// `state_key`).
pub async fn state_get(
async fn load_full_state(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<PduEvent> {
self.state_get_id(shortstatehash, event_type, state_key)
.and_then(|event_id: OwnedEventId| async move {
self.services.timeline.get_pdu(&event_id).await
) -> Result<Arc<CompressedState>> {
self.services
.state_compressor
.load_shortstatehash_info(shortstatehash)
.map_err(|e| err!(Database("Missing state IDs: {e}")))
.map_ok(|vec| vec.last().expect("at least one layer").full_state.clone())
.await
}
/// Returns the state hash for this pdu.
pub async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<ShortStateHash> {
const BUFSIZE: usize = size_of::<ShortEventId>();
self.services
.short
.get_shorteventid(event_id)
.and_then(|shorteventid| {
self.db
.shorteventid_shortstatehash
.aqry::<BUFSIZE, _>(&shorteventid)
})
.await
}
/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).
pub async fn state_get_content<T>(
&self,
shortstatehash: ShortStateHash,
event_type: &StateEventType,
state_key: &str,
) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
self.state_get(shortstatehash, event_type, state_key)
.await
.and_then(|event| event.get_content())
}
/// Get membership for given user in state
async fn user_membership(
&self,
shortstatehash: ShortStateHash,
user_id: &UserId,
) -> MembershipState {
self.state_get_content(shortstatehash, &StateEventType::RoomMember, user_id.as_str())
.await
.map_or(MembershipState::Leave, |c: RoomMemberEventContent| c.membership)
}
/// The user was a joined member at this state (potentially in the past)
#[inline]
async fn user_was_joined(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool {
self.user_membership(shortstatehash, user_id).await == MembershipState::Join
}
/// The user was an invited or joined room member at this state (potentially
/// in the past)
#[inline]
async fn user_was_invited(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool {
let s = self.user_membership(shortstatehash, user_id).await;
s == MembershipState::Join || s == MembershipState::Invite
.deserialized()
}
/// Whether a server is allowed to see an event through federation, based on
@ -521,101 +645,6 @@ impl Service {
}
}
/// Returns the state hash for this pdu.
pub async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<ShortStateHash> {
const BUFSIZE: usize = size_of::<ShortEventId>();
self.services
.short
.get_shorteventid(event_id)
.and_then(|shorteventid| {
self.db
.shorteventid_shortstatehash
.aqry::<BUFSIZE, _>(&shorteventid)
})
.await
.deserialized()
}
/// Returns the full room state.
#[tracing::instrument(skip(self), level = "debug")]
pub fn room_state_full<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = Result<((StateEventType, String), PduEvent)>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|shortstatehash| self.state_full(shortstatehash).map(Ok))
.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
.try_flatten_stream()
}
/// Returns the full room state pdus
#[tracing::instrument(skip(self), level = "debug")]
pub fn room_state_full_pdus<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = Result<PduEvent>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|shortstatehash| self.state_full_pdus(shortstatehash).map(Ok))
.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
.try_flatten_stream()
}
/// Returns a single EventId from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn room_state_get_id<Id>(
&self,
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<Id>
where
Id: for<'de> Deserialize<'de> + Sized + ToOwned,
<Id as ToOwned>::Owned: Borrow<EventId>,
{
self.services
.state
.get_room_shortstatehash(room_id)
.and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key))
.await
}
/// Returns a single PDU from `room_id` with key (`event_type`,
/// `state_key`).
#[tracing::instrument(skip(self), level = "debug")]
pub async fn room_state_get(
&self,
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<PduEvent> {
self.services
.state
.get_room_shortstatehash(room_id)
.and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key))
.await
}
/// Returns a single PDU from `room_id` with key (`event_type`,`state_key`).
pub async fn room_state_get_content<T>(
&self,
room_id: &RoomId,
event_type: &StateEventType,
state_key: &str,
) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
self.room_state_get(room_id, event_type, state_key)
.await
.and_then(|event| event.get_content())
}
pub async fn get_name(&self, room_id: &RoomId) -> Result<String> {
self.room_state_get_content(room_id, &StateEventType::RoomName, "")
.await