modernize remove_to_device_events

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-02-02 08:59:14 +00:00
parent ffe3b0faf2
commit a774afe837

View file

@ -1,12 +1,12 @@
use std::{collections::BTreeMap, mem, mem::size_of, sync::Arc};
use std::{collections::BTreeMap, mem, sync::Arc};
use conduwuit::{
debug_warn, err, trace,
utils::{self, stream::TryIgnore, string::Unquoted, ReadyExt},
Err, Error, Result, Server,
};
use database::{Database, Deserialized, Ignore, Interfix, Json, Map};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use database::{Deserialized, Ignore, Interfix, Json, Map};
use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{
api::client::{device::Device, error::ErrorKind, filter::FilterDefinition},
encryption::{CrossSigningKey, DeviceKeys, OneTimeKey},
@ -28,7 +28,6 @@ pub struct Service {
struct Services {
server: Arc<Server>,
db: Arc<Database>,
account_data: Dep<account_data::Service>,
admin: Dep<admin::Service>,
globals: Dep<globals::Service>,
@ -64,7 +63,6 @@ impl crate::Service for Service {
Ok(Arc::new(Self {
services: Services {
server: args.server.clone(),
db: args.db.clone(),
account_data: args.depend::<account_data::Service>("account_data"),
admin: args.depend::<admin::Service>("admin"),
globals: args.depend::<globals::Service>("globals"),
@ -801,35 +799,28 @@ impl Service {
.map(|(_, val): (Ignore, Raw<AnyToDeviceEvent>)| val)
}
pub async fn remove_to_device_events(
pub async fn remove_to_device_events<Until>(
&self,
user_id: &UserId,
device_id: &DeviceId,
until: u64,
) {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xFF);
prefix.extend_from_slice(device_id.as_bytes());
prefix.push(0xFF);
until: Until,
) where
Until: Into<Option<u64>> + Send,
{
type Key<'a> = (&'a UserId, &'a DeviceId, u64);
let mut last = prefix.clone();
last.extend_from_slice(&until.to_be_bytes());
let _cork = self.services.db.cork_and_flush();
let until = until.into().unwrap_or(u64::MAX);
let from = (user_id, device_id, until);
self.db
.todeviceid_events
.rev_raw_keys_from(&last) // this includes last
.rev_keys_from(&from)
.ignore_err()
.ready_take_while(move |key| key.starts_with(&prefix))
.map(|key| {
let len = key.len();
let start = len.saturating_sub(size_of::<u64>());
let count = utils::u64_from_u8(&key[start..len]);
(key, count)
.ready_take_while(move |(user_id_, device_id_, _): &Key<'_>| {
user_id == *user_id_ && device_id == *device_id_
})
.ready_for_each(|key: Key<'_>| {
self.db.todeviceid_events.del(key);
})
.ready_take_while(move |(_, count)| *count <= until)
.ready_for_each(|(key, _)| self.db.todeviceid_events.remove(&key))
.boxed()
.await;
}