From 3fe98f35f2571f80d39d251977208b1b074c1622 Mon Sep 17 00:00:00 2001 From: strawberry Date: Sat, 23 Nov 2024 13:45:27 -0500 Subject: [PATCH] remove queued push keys on pusher deletion, use more refs Signed-off-by: strawberry --- src/api/client/push.rs | 5 ++++- src/service/appservice/mod.rs | 14 ++++++-------- src/service/pusher/mod.rs | 12 ++++++++++-- src/service/sending/mod.rs | 35 ++++++++++++++++++++++++++++------- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/src/api/client/push.rs b/src/api/client/push.rs index f2376e7c..f27ead1f 100644 --- a/src/api/client/push.rs +++ b/src/api/client/push.rs @@ -441,7 +441,10 @@ pub(crate) async fn set_pushers_route( ) -> Result { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - services.pusher.set_pusher(sender_user, &body.action)?; + services + .pusher + .set_pusher(sender_user, &body.action) + .await?; Ok(set_pusher::v3::Response::new()) } diff --git a/src/service/appservice/mod.rs b/src/service/appservice/mod.rs index 1617e6e6..4a20b130 100644 --- a/src/service/appservice/mod.rs +++ b/src/service/appservice/mod.rs @@ -79,26 +79,24 @@ impl Service { /// /// # Arguments /// - /// * `service_name` - the name you send to register the service previously - pub async fn unregister_appservice(&self, service_name: &str) -> Result<()> { + /// * `service_name` - the registration ID of the appservice + pub async fn unregister_appservice(&self, appservice_id: &str) -> Result<()> { // removes the appservice registration info self.registration_info .write() .await - .remove(service_name) + .remove(appservice_id) .ok_or(err!("Appservice not found"))?; // remove the appservice from the database - self.db.id_appserviceregistrations.remove(service_name); + self.db.id_appserviceregistrations.del(appservice_id); // deletes all active requests for the appservice if there are any so we stop // sending to the URL self.services .sending - .cleanup_events(service_name.to_owned()) - .await; - - Ok(()) + .cleanup_events(Some(appservice_id), None, None) + .await } pub async fn get_registration(&self, id: &str) -> Option { diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index fb43fdb8..6b02c7f8 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -26,7 +26,7 @@ use ruma::{ uint, RoomId, UInt, UserId, }; -use crate::{client, globals, rooms, users, Dep}; +use crate::{client, globals, rooms, sending, users, Dep}; pub struct Service { db: Data, @@ -39,6 +39,7 @@ struct Services { state_accessor: Dep, state_cache: Dep, users: Dep, + sending: Dep, } struct Data { @@ -57,6 +58,7 @@ impl crate::Service for Service { state_accessor: args.depend::("rooms::state_accessor"), state_cache: args.depend::("rooms::state_cache"), users: args.depend::("users"), + sending: args.depend::("sending"), }, })) } @@ -65,7 +67,7 @@ impl crate::Service for Service { } impl Service { - pub fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) -> Result { + pub async fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) -> Result { match pusher { set_pusher::v3::PusherAction::Post(data) => { let pushkey = data.pusher.ids.pushkey.as_str(); @@ -84,6 +86,12 @@ impl Service { set_pusher::v3::PusherAction::Delete(ids) => { let key = (sender, ids.pushkey.as_str()); self.db.senderkey_pusher.del(key); + + self.services + .sending + .cleanup_events(None, Some(sender), Some(ids.pushkey.as_str())) + .await + .ok(); }, } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 5a070306..611940be 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -8,7 +8,7 @@ use std::{fmt::Debug, iter::once, sync::Arc}; use async_trait::async_trait; use conduit::{ - err, + debug_warn, err, utils::{ReadyExt, TryReadyExt}, warn, Result, Server, }; @@ -285,13 +285,34 @@ impl Service { appservice::send_request(client, registration, request).await } - /// Cleanup event data - /// Used for instance after we remove an appservice registration + /// Clean up queued sending event data + /// + /// Used after we remove an appservice registration or a user deletes a push + /// key #[tracing::instrument(skip(self), level = "debug")] - pub async fn cleanup_events(&self, appservice_id: String) { - self.db - .delete_all_requests_for(&Destination::Appservice(appservice_id)) - .await; + pub async fn cleanup_events( + &self, appservice_id: Option<&str>, user_id: Option<&UserId>, push_key: Option<&str>, + ) -> Result { + match (appservice_id, user_id, push_key) { + (None, Some(user_id), Some(push_key)) => { + self.db + .delete_all_requests_for(&Destination::Push(user_id.to_owned(), push_key.to_owned())) + .await; + + Ok(()) + }, + (Some(appservice_id), None, None) => { + self.db + .delete_all_requests_for(&Destination::Appservice(appservice_id.to_owned())) + .await; + + Ok(()) + }, + _ => { + debug_warn!("cleanup_events called with too many or too few arguments"); + Ok(()) + }, + } } fn dispatch(&self, msg: Msg) -> Result<()> {