unfinished untested impl of room deletion

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-02-20 22:40:46 -05:00
parent 0593dce8a6
commit a94bf2cf9f
31 changed files with 400 additions and 9 deletions

View file

@ -319,7 +319,7 @@ async fn sync_helper(
)? {
Some(e) => e,
None => {
error!("Left room but no left state event");
error!("Left room {room_id} but no left state event");
continue;
}
};
@ -331,7 +331,7 @@ async fn sync_helper(
{
Some(s) => s,
None => {
error!("Leave event has no state");
error!("Leave event {left_event_id} has no state");
continue;
}
};

View file

@ -56,7 +56,7 @@ impl service::globals::Data for KeyValueDatabase {
let mut futures = FuturesUnordered::new();
// Return when *any* user changed his key
// Return when *any* user changed their key
// TODO: only send for user they share a room with
futures.push(self.todeviceid_events.watch_prefix(&userdeviceid_prefix));

View file

@ -3,6 +3,7 @@ use std::mem;
use ruma::{
events::receipt::ReceiptEvent, serde::Raw, CanonicalJsonObject, OwnedUserId, RoomId, UserId,
};
use tracing::debug;
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
@ -117,6 +118,23 @@ impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
.insert(&key, &services().globals.next_count()?.to_be_bytes())
}
fn delete_all_private_read_receipts(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.roomuserid_privateread.scan_prefix(prefix.clone()) {
debug!("Removing key {:?}", key);
self.roomuserid_privateread.remove(&key)?;
}
for (key, _) in self.roomuserid_lastprivatereadupdate.scan_prefix(prefix) {
debug!("Removing key {:?}", key);
self.roomuserid_lastprivatereadupdate.remove(&key)?;
}
Ok(())
}
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);

View file

@ -1,6 +1,7 @@
use std::{collections::HashSet, mem};
use ruma::{OwnedUserId, RoomId, UserId};
use tracing::debug;
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
@ -107,6 +108,18 @@ impl service::rooms::edus::typing::Data for KeyValueDatabase {
.unwrap_or(0))
}
fn delete_all_typing_updates(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.roomid_lasttypingupdate.scan_prefix(prefix) {
debug!("Removing key {:?}", key);
self.roomid_lasttypingupdate.remove(&key)?;
}
Ok(())
}
fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<OwnedUserId>> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);

View file

@ -1,6 +1,7 @@
use std::{mem, sync::Arc};
use ruma::{EventId, RoomId, UserId};
use tracing::debug;
use crate::{
database::KeyValueDatabase,
@ -72,6 +73,18 @@ impl service::rooms::pdu_metadata::Data for KeyValueDatabase {
Ok(())
}
fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.referencedevents.scan_prefix(prefix) {
debug!("Removing key: {:?}", key);
self.referencedevents.remove(&key)?;
}
Ok(())
}
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool> {
let mut key = room_id.as_bytes().to_vec();
key.extend_from_slice(event_id.as_bytes());

View file

@ -1,4 +1,5 @@
use ruma::RoomId;
use tracing::debug;
use crate::{database::KeyValueDatabase, service, services, utils, Result};
@ -22,6 +23,18 @@ impl service::rooms::search::Data for KeyValueDatabase {
self.tokenids.insert_batch(&mut batch)
}
fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.tokenids.scan_prefix(prefix) {
debug!("Removing key: {:?}", key);
self.tokenids.remove(&key)?;
}
Ok(())
}
fn search_pdus<'a>(&'a self, room_id: &RoomId, search_string: &str) -> SearchPdusResult<'a> {
let prefix = services()
.rooms

View file

@ -1,7 +1,7 @@
use std::sync::Arc;
use ruma::{events::StateEventType, EventId, RoomId};
use tracing::warn;
use tracing::{error, warn};
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
@ -216,4 +216,23 @@ impl service::rooms::short::Data for KeyValueDatabase {
}
})
}
/// Attempts to delete a shortroomid from the kv database
fn delete_shortroomid(&self, room_id: &RoomId) -> Result<()> {
match self.roomid_shortroomid.get(room_id.as_bytes())? {
Some(short) => {
self.roomid_shortroomid.remove(&short).map_err(|e| {
error!("Failed to remove shortroomid in database: {e}");
Error::bad_database("Failed to remove shortroomid in database")
})?;
}
None => {
return Err(Error::bad_database(
"Invalid or non-existent shortroomid in db.",
))?
}
}
Ok(())
}
}

View file

@ -1,5 +1,6 @@
use ruma::{EventId, OwnedEventId, RoomId};
use std::collections::HashSet;
use tracing::debug;
use std::sync::Arc;
use tokio::sync::MutexGuard;
@ -28,6 +29,15 @@ impl service::rooms::state::Data for KeyValueDatabase {
Ok(())
}
fn delete_room_shortstatehash(
&self,
room_id: &RoomId,
_mutex_lock: &MutexGuard<'_, ()>,
) -> Result<()> {
self.roomid_shortstatehash.remove(room_id.as_bytes())?;
Ok(())
}
fn set_event_state(&self, shorteventid: u64, shortstatehash: u64) -> Result<()> {
self.shorteventid_shortstatehash
.insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?;
@ -70,4 +80,16 @@ impl service::rooms::state::Data for KeyValueDatabase {
Ok(())
}
fn delete_all_rooms_forward_extremities(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.roomid_pduleaves.scan_prefix(prefix) {
debug!("Removing key: {:?}", key);
self.roomid_pduleaves.remove(&key)?;
}
Ok(())
}
}

View file

@ -7,6 +7,7 @@ use ruma::{
serde::Raw,
OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
};
use tracing::debug;
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
@ -99,6 +100,28 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
Ok(())
}
fn delete_room_join_counts(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.roomid_joinedcount.scan_prefix(prefix.clone()) {
debug!("Removing key: {:?}", key);
self.roomid_joinedcount.remove(&key)?;
}
for (key, _) in self.roomid_invitedcount.scan_prefix(prefix.clone()) {
debug!("Removing key: {:?}", key);
self.roomid_invitedcount.remove(&key)?;
}
for (key, _) in self.roomserverids.scan_prefix(prefix.clone()) {
debug!("Removing key: {:?}", key);
self.roomserverids.remove(&key)?;
}
Ok(())
}
fn update_joined_count(&self, room_id: &RoomId) -> Result<()> {
let mut joinedcount = 0_u64;
let mut invitedcount = 0_u64;
@ -190,7 +213,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
}
}
#[tracing::instrument(skip(self, room_id, appservice))]
/// Check our room state cache if an appservice is in the room ID
fn appservice_in_room(
&self,
room_id: &RoomId,
@ -280,8 +303,8 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
}))
}
#[tracing::instrument(skip(self))]
fn server_in_room<'a>(&'a self, server: &ServerName, room_id: &RoomId) -> Result<bool> {
/// Check our room state cache if a server is in the room ID
fn server_in_room(&self, server: &ServerName, room_id: &RoomId) -> Result<bool> {
let mut key = server.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(room_id.as_bytes());

View file

@ -1,6 +1,7 @@
use std::mem;
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
use tracing::debug;
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
@ -47,6 +48,18 @@ impl service::rooms::threads::Data for KeyValueDatabase {
))
}
fn delete_all_rooms_threads(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.threadid_userids.scan_prefix(prefix) {
debug!("Removing key: {:?}", key);
self.threadid_userids.remove(&key)?;
}
Ok(())
}
fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()> {
let users = participants
.iter()

View file

@ -3,7 +3,7 @@ use std::{collections::hash_map, mem::size_of, sync::Arc};
use ruma::{
api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId,
};
use tracing::error;
use tracing::{debug, error};
use crate::{
database::KeyValueDatabase,
@ -303,6 +303,18 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.increment_batch(&mut highlights_batch.into_iter())?;
Ok(())
}
fn delete_all_pdus_for_room(&self, room_id: &RoomId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for (key, _) in self.pduid_pdu.scan_prefix(prefix) {
debug!("Removing key: {:?}", key);
self.pduid_pdu.remove(&key)?;
}
Ok(())
}
}
/// Returns the `count` of this pdu's id.

View file

@ -166,6 +166,9 @@ enum RoomCommand {
/// - List all rooms the server knows about
List { page: Option<usize> },
/// - Attempts to delete a room from our database using room ID
Delete { room_id: Box<RoomId> },
#[command(subcommand)]
/// - Manage moderation of remote or local rooms
Moderation(RoomModeration),
@ -828,6 +831,180 @@ impl Service {
}
},
AdminCommand::Rooms(command) => match command {
RoomCommand::Delete { room_id } => {
RoomMessageEventContent::text_plain("Deleting room, this may take a while.");
// 1. check if we know about this room in the first place
debug!("Checking if we have room {} in our database", &room_id);
if !services().rooms.metadata.exists(&room_id)? {
return Ok(RoomMessageEventContent::text_plain(
"Cannot delete a room we do not know about (would not exist in our database).",
));
}
let owned_room_id = RoomId::parse(&room_id).ok().unwrap();
// 2. disable incoming federation
debug!("Disabling incoming federation on room {}", &room_id);
services().rooms.metadata.disable_room(&room_id, true)?;
// ??. deleting all our room aliases from the room
debug!("Deleting all our room aliases for the room");
for alias in services()
.rooms
.alias
.local_aliases_for_room(&room_id)
.filter_map(|r| r.ok())
{
services().rooms.alias.remove_alias(&alias)?;
}
// ??. removing the room from our room directory
debug!("Removing/unpublishing room from our room directory");
services().rooms.directory.set_not_public(&room_id)?;
// 3. ban the room locally so new users cannot join while we're in the process of deleting it
debug!("Banning room {}", &room_id);
services().rooms.metadata.ban_room(&room_id, true)?;
// 3. attempt to make all our local users in that room leave
// TODO: add a "force" option to ignore errors making users leave
debug!("Making all users leave the room {}", &room_id);
for local_user in services()
.rooms
.state_cache
.room_members(&room_id)
.filter_map(|user| {
user.ok().filter(|local_user| {
local_user.server_name() == services().globals.server_name()
})
})
.collect::<Vec<OwnedUserId>>()
{
debug!(
"Attempting leave for user {} in room {}",
&local_user, &room_id
);
if let Err(e) = leave_room(&local_user, &room_id, None).await {
error!("Error attempting to delete room {} during local user leave step, re-enabling the room: {}", &room_id, e);
// undo our changes to the room to be safe
services().rooms.metadata.disable_room(&room_id, false)?;
services().rooms.metadata.ban_room(&room_id, false)?;
return Ok(RoomMessageEventContent::text_plain(format!("Error occurred while attempting to make user {} leave the room, re-enabling it. If you would like to ignore errors (potentially dangerous!), use --force", &local_user)));
}
}
// 4. make all our local users forget the room so they stop receiving new information about it (e.g. notifications)
for local_user in services()
.rooms
.state_cache
.room_members(&room_id)
.filter_map(|user| {
user.ok().filter(|local_user| {
local_user.server_name() == services().globals.server_name()
})
})
.collect::<Vec<OwnedUserId>>()
{
debug!(
"Attempting to forget room for user {} in room {}",
&local_user, &room_id
);
services().rooms.state_cache.forget(&room_id, &local_user)?;
}
debug!("Deleting room's threads from database");
services()
.rooms
.threads
.delete_all_rooms_threads(&room_id)?;
// ??. delete all the room's search token IDs from our database
debug!("Deleting all the room's search token IDs from our database");
services()
.rooms
.search
.delete_all_search_tokenids_for_room(&room_id)?;
// ??. delete all the room's forward extremities from our database
debug!("Deleting all room's forward extremities from our database");
services()
.rooms
.state
.delete_all_rooms_forward_extremities(&room_id)?;
// ??. delete all the room's event (PDU) references
debug!("Deleting all the room's event (PDU) references");
services()
.rooms
.pdu_metadata
.delete_all_referenced_for_room(&room_id)?;
// ??. delete all the room's member counts
debug!("Deleting all the room's member counts");
services()
.rooms
.state_cache
.delete_room_join_counts(&room_id)?;
// ??. delete all the room's private read receipts
debug!("Deleting all the room's private read receipts");
services()
.rooms
.edus
.read_receipt
.delete_all_private_read_receipts(&room_id)?;
// ??. delete all the room's typing indicator updates
debug!("Deleting all the room's typing indicator updates");
services()
.rooms
.edus
.typing
.delete_all_typing_updates(&room_id)?;
debug!("Final stages of deleting the room");
// 2. obtain a mutex state lock
debug!(
"Obtaining a mutex state lock for safety and future database operations"
);
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.unwrap()
.entry(owned_room_id.clone())
.or_default(),
);
let state_lock = mutex_state.lock().await;
// ??. delete the room state hash from our database
debug!("Deleting room state hash from our database");
services()
.rooms
.state
.delete_room_shortstatehash(&room_id, &state_lock)?;
// ??. delete the room ID from our database
debug!("Deleting internal room ID from our database");
services().rooms.short.delete_shortroomid(&room_id)?;
// unbanning and allowing incoming federation with room again
// TODO: add option to keep a room banned (`--block` or `--ban`)
services().rooms.metadata.disable_room(&room_id, false)?;
services().rooms.metadata.ban_room(&room_id, false)?;
// drop our state lock (we are done)
drop(state_lock);
debug!("Successfully deleted room {} from our database", &room_id);
return Ok(RoomMessageEventContent::text_plain(
"Successfully deleted the room from our database.",
));
}
RoomCommand::Moderation(command) => match command {
RoomModeration::BanRoom {
force,

View file

@ -29,4 +29,6 @@ pub trait Data: Send + Sync {
/// Returns the count of the last typing update in this room.
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
fn delete_all_private_read_receipts(&self, room_id: &RoomId) -> Result<()>;
}

View file

@ -52,4 +52,8 @@ impl Service {
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.last_privateread_update(user_id, room_id)
}
pub fn delete_all_private_read_receipts(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_all_private_read_receipts(room_id)
}
}

View file

@ -16,6 +16,8 @@ pub trait Data: Send + Sync {
/// Returns the count of the last typing update in this room.
fn last_typing_update(&self, room_id: &RoomId) -> Result<u64>;
fn delete_all_typing_updates(&self, room_id: &RoomId) -> Result<()>;
/// Returns all user ids currently typing.
fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<OwnedUserId>>;
}

View file

@ -33,6 +33,10 @@ impl Service {
self.db.last_typing_update(room_id)
}
pub fn delete_all_typing_updates(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_all_typing_updates(room_id)
}
/// Returns a new typing EDU.
pub fn typings_all(
&self,

View file

@ -16,6 +16,7 @@ pub trait Data: Send + Sync {
until: PduCount,
) -> PduData<'a>;
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>;
fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result<()>;
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>;
fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>;
fn is_event_soft_failed(&self, event_id: &EventId) -> Result<bool>;

View file

@ -173,6 +173,10 @@ impl Service {
self.db.mark_as_referenced(room_id, event_ids)
}
pub fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_all_referenced_for_room(room_id)
}
#[tracing::instrument(skip(self))]
pub fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool> {
self.db.is_event_referenced(room_id, event_id)

View file

@ -7,4 +7,6 @@ pub trait Data: Send + Sync {
fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()>;
fn search_pdus<'a>(&'a self, room_id: &RoomId, search_string: &str) -> SearchPdusResult<'a>;
fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result<()>;
}

View file

@ -23,4 +23,8 @@ impl Service {
) -> Result<Option<(impl Iterator<Item = Vec<u8>> + 'a, Vec<String>)>> {
self.db.search_pdus(room_id, search_string)
}
pub fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_all_search_tokenids_for_room(room_id)
}
}

View file

@ -28,4 +28,6 @@ pub trait Data: Send + Sync {
fn get_shortroomid(&self, room_id: &RoomId) -> Result<Option<u64>>;
fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result<u64>;
fn delete_shortroomid(&self, room_id: &RoomId) -> Result<()>;
}

View file

@ -51,4 +51,8 @@ impl Service {
pub fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result<u64> {
self.db.get_or_create_shortroomid(room_id)
}
pub fn delete_shortroomid(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_shortroomid(room_id)
}
}

View file

@ -15,6 +15,12 @@ pub trait Data: Send + Sync {
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()>;
fn delete_room_shortstatehash(
&self,
room_id: &RoomId,
_mutex_lock: &MutexGuard<'_, ()>,
) -> Result<()>;
/// Associates a state with an event.
fn set_event_state(&self, shorteventid: u64, shortstatehash: u64) -> Result<()>;
@ -28,4 +34,6 @@ pub trait Data: Send + Sync {
event_ids: Vec<OwnedEventId>,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()>;
fn delete_all_rooms_forward_extremities(&self, room_id: &RoomId) -> Result<()>;
}

View file

@ -321,6 +321,14 @@ impl Service {
self.db.set_room_state(room_id, shortstatehash, mutex_lock)
}
pub fn delete_room_shortstatehash(
&self,
room_id: &RoomId,
mutex_lock: &MutexGuard<'_, ()>,
) -> Result<()> {
self.db.delete_room_shortstatehash(room_id, mutex_lock)
}
/// Returns the room's version.
#[tracing::instrument(skip(self))]
pub fn get_room_version(&self, room_id: &RoomId) -> Result<RoomVersionId> {
@ -362,6 +370,10 @@ impl Service {
.set_forward_extremities(room_id, event_ids, state_lock)
}
pub fn delete_all_rooms_forward_extremities(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_all_rooms_forward_extremities(room_id)
}
/// This fetches auth events from the current state.
#[tracing::instrument(skip(self))]
pub fn get_auth_events(

View file

@ -27,6 +27,8 @@ pub trait Data: Send + Sync {
fn update_joined_count(&self, room_id: &RoomId) -> Result<()>;
fn delete_room_join_counts(&self, room_id: &RoomId) -> Result<()>;
fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>>;
fn appservice_in_room(

View file

@ -232,6 +232,10 @@ impl Service {
self.db.update_joined_count(room_id)
}
pub fn delete_room_join_counts(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_room_join_counts(room_id)
}
#[tracing::instrument(skip(self, room_id))]
pub fn get_our_real_users(&self, room_id: &RoomId) -> Result<Arc<HashSet<OwnedUserId>>> {
self.db.get_our_real_users(room_id)

View file

@ -13,5 +13,8 @@ pub trait Data: Send + Sync {
) -> PduEventIterResult<'a>;
fn update_participants(&self, root_id: &[u8], participants: &[OwnedUserId]) -> Result<()>;
fn get_participants(&self, root_id: &[u8]) -> Result<Option<Vec<OwnedUserId>>>;
fn delete_all_rooms_threads(&self, room_id: &RoomId) -> Result<()>;
}

View file

@ -26,6 +26,10 @@ impl Service {
self.db.threads_until(user_id, room_id, until, include)
}
pub fn delete_all_rooms_threads(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_all_rooms_threads(room_id)
}
pub fn add_to_thread(&self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> {
let root_id = &services()
.rooms

View file

@ -81,4 +81,6 @@ pub trait Data: Send + Sync {
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) -> Result<()>;
fn delete_all_pdus_for_room(&self, room_id: &RoomId) -> Result<()>;
}

View file

@ -100,6 +100,10 @@ pub struct Service {
}
impl Service {
pub fn delete_all_pdus_for_room(&self, room_id: &RoomId) -> Result<()> {
self.db.delete_all_pdus_for_room(room_id)
}
#[tracing::instrument(skip(self))]
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)?

View file

@ -480,7 +480,7 @@ impl Service {
/// Used for instance after we remove an appservice registration
///
#[tracing::instrument(skip(self))]
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
pub fn cleanup_appservice_events(&self, appservice_id: String) -> Result<()> {
self.db
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?;