diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 15dfd74b..73cde12e 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -1,6 +1,6 @@ use std::{ cmp, - collections::{BTreeMap, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, fmt::Debug, sync::Arc, time::{Duration, Instant}, @@ -25,7 +25,7 @@ use ruma::{ events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; -use tokio::sync::{Mutex, Semaphore}; +use tokio::sync::{oneshot, Mutex, Semaphore}; use tracing::{error, warn}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; @@ -65,9 +65,26 @@ pub enum SendingEventType { } enum TransactionStatus { + /// Currently running (for the first time) Running, - Failed(u32, Instant), // number of times failed, time of last failure - Retrying(u32), // number of times failed + /// Failed, backing off for a retry + Failed { + failures: u32, + waker: Option>, + }, + /// Currently retrying + Retrying { + /// number of times failed + failures: u32, + }, +} + +/// A control-flow enum to dictate what the handler should do after (trying to) +/// prepare a transaction +enum TransactionPrepOutcome { + Send(Vec), + Wake(OutgoingDestination), + Nothing, } impl Service { @@ -274,9 +291,12 @@ impl Service { #[tracing::instrument(skip(self), name = "sender")] async fn handler(&self) -> Result<()> { - let receiver = self.receiver.lock().await; + let new_transactions = self.receiver.lock().await; + let (waking_sender, waking_receiver) = loole::unbounded(); + + let mut outgoing = FuturesUnordered::new(); + let mut retrying = FuturesUnordered::new(); - let mut futures = FuturesUnordered::new(); let mut current_transaction_status = HashMap::::new(); // Retry requests we could not finish yet @@ -300,13 +320,14 @@ impl Service { for (outgoing_kind, events) in initial_transactions { current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); - futures.push(handle_events(outgoing_kind.clone(), events)); + outgoing.push(handle_events(outgoing_kind.clone(), events)); } } loop { tokio::select! { - Some(response) = futures.next() => { + Some(response) = outgoing.next() => { + // Outgoing transaction succeeded match response { Ok(outgoing_kind) => { let _cork = services().globals.db.cork(); @@ -322,35 +343,84 @@ impl Service { if !new_events.is_empty() { // Insert pdus we found self.db.mark_as_active(&new_events)?; - futures.push(handle_events( - outgoing_kind.clone(), + + // Clear retries + current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); + + outgoing.push(handle_events( + outgoing_kind, new_events.into_iter().map(|(event, _)| event).collect(), )); } else { current_transaction_status.remove(&outgoing_kind); } } - Err((outgoing_kind, _)) => { - current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { - TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), - TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), - TransactionStatus::Failed(_, _) => { - error!("Request that was not even running failed?!"); - return - }, - }); + // Outgoing transaction failed + Err((destination, err)) => { + // Set status to Failed, create timer + let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone()); + + // Add timer to loop + retrying.push(timer); + + warn!("Outgoing request to {destination} failed: {err}"); } }; }, - event = receiver.recv_async() => { - if let Ok((outgoing_kind, event, key)) = event { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, - vec![(event, key)], - &mut current_transaction_status, - ) { - futures.push(handle_events(outgoing_kind, events)); + // Transaction retry timers firing + Some(dest) = retrying.next() => { + // Transition Failed => Retrying, return pending old transaction events + match self.select_events( + &dest, + vec![], // will be ignored because fresh == false + &mut current_transaction_status, + false, + ) { + Ok(TransactionPrepOutcome::Send(events)) => { + outgoing.push(handle_events(dest, events)); + } + Ok(_) => { + // Unreachable because fresh == false + unreachable!("select_events on a stale transaction {} did not return ::Send", dest) + } + + Err(err) => { + error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err); + + // transaction dropped, so drop destination as well. + current_transaction_status.remove(&dest); + } + } + }, + + // Explicit wakeups, makes a backoff timer return immediately + Ok(outgoing) = waking_receiver.recv_async() => { + if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) { + if let Some(waker) = waker.take() { + _ = waker.send(()); + } + } + }, + + // New transactions to be sent out (from server/user activity) + event = new_transactions.recv_async() => { + if let Ok((dest, event, key)) = event { + match self.select_events( + &dest, + vec![(event, key)], + &mut current_transaction_status, + true) { + Ok(TransactionPrepOutcome::Send(events)) => { + outgoing.push(handle_events(dest, events)); + }, + Ok(TransactionPrepOutcome::Wake(dest)) => { + waking_sender.send(dest).expect("nothing closes this channel but ourselves"); + }, + Ok(TransactionPrepOutcome::Nothing) => {}, + Err(err) => { + error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err); + } } } } @@ -358,18 +428,70 @@ impl Service { } } + /// Generates timer/oneshot, alters status to reflect Failed + /// + /// Returns timer/oneshot future to wake up loop for next retry + fn mark_failed_and_backoff( + status: &mut HashMap, dest: OutgoingDestination, + ) -> impl std::future::Future { + let now = Instant::now(); + + let entry = status + .get_mut(&dest) + .expect("guaranteed to be set before this function"); + + let failures = match entry { + // Running -> Failed + TransactionStatus::Running => 1, + // Retrying -> Failed + TransactionStatus::Retrying { + failures, + } => *failures + 1, + + // The transition of Failed -> Retrying is handled by handle_events + TransactionStatus::Failed { + .. + } => { + unreachable!( + "TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, \ + bailing..." + ) + }, + }; + + const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24); + + // Exponential backoff, clamp upper value to one day + let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY); + + let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup); + + *entry = TransactionStatus::Failed { + failures, + waker: Some(waker), + }; + + fut + } + + /// This prepares a transaction, checks the transaction state, and selects + /// appropriate events. #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] fn select_events( &self, outgoing_kind: &OutgoingDestination, new_events: Vec<(SendingEventType, Vec)>, // Events we want to send: event and full key current_transaction_status: &mut HashMap, - ) -> Result>> { - let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?; + fresh: bool, // Wether or not this transaction came from server activity. + ) -> Result { + let (allow, retry, wake_up) = + self.select_events_current(outgoing_kind.clone(), current_transaction_status, fresh)?; // Nothing can be done for this remote, bail out. - if !allow { - return Ok(None); + if wake_up { + return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone())); + } else if !allow { + return Ok(TransactionPrepOutcome::Nothing); } let _cork = services().globals.db.cork(); @@ -377,12 +499,14 @@ impl Service { // Must retry any previous transaction for this remote. if retry { - self.db + // We retry the previous transaction + for (_, e) in self + .db .active_requests_for(outgoing_kind) .filter_map(Result::ok) - .for_each(|(_, e)| events.push(e)); - - return Ok(Some(events)); + { + events.push(e); + } } // Compose the next transaction @@ -402,37 +526,72 @@ impl Service { } } - Ok(Some(events)) + Ok(TransactionPrepOutcome::Send(events)) } #[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))] fn select_events_current( &self, outgoing_kind: OutgoingDestination, - current_transaction_status: &mut HashMap, - ) -> Result<(bool, bool)> { - let (mut allow, mut retry) = (true, false); - current_transaction_status - .entry(outgoing_kind) - .and_modify(|e| match e { - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential backoff) - const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); - let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); - min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION); - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); + current_transaction_status: &mut HashMap, fresh: bool, + ) -> Result<(bool, bool, bool)> { + let (mut allow, mut retry, mut wake_up) = (true, false, false); + + let entry = current_transaction_status.entry(outgoing_kind); + + if fresh { + // If its fresh, we initialise the status if we need to. + // + // We do nothing if it is already running or retrying. + // + // We return with a wake if it is in the Failed state. + entry + .and_modify(|e| match e { + TransactionStatus::Running + | TransactionStatus::Retrying { + .. + } => { + allow = false; // already running + }, + TransactionStatus::Failed { + .. + } => { + // currently sleeping + wake_up = true; + }, + }) + .or_insert(TransactionStatus::Running); + } else { + // If it's not fresh, we expect an entry. + // + // We also expect us to be the only one who are touching this destination right + // now, and its a stale transaction, so it must be in the Failed state + match entry { + Entry::Occupied(mut e) => { + let e = e.get_mut(); + match e { + TransactionStatus::Failed { + failures, + .. + } => { + *e = TransactionStatus::Retrying { + failures: *failures, + }; + retry = true; + }, + + _ => unreachable!( + "Encountered bad state when preparing stale transaction: expected Failed state, got \ + Running or Retrying" + ), } }, - TransactionStatus::Running | TransactionStatus::Retrying(_) => { - allow = false; // already running - }, - }) - .or_insert(TransactionStatus::Running); + Entry::Vacant(_) => unreachable!( + "Encountered bad state when preparing stale transaction: expected Failed state, got vacant state" + ), + } + } - Ok((allow, retry)) + Ok((allow, retry, wake_up)) } #[tracing::instrument(skip(self, server_name))] @@ -721,7 +880,7 @@ async fn handle_events_kind_push( let Some(pusher) = services() .pusher .get_pusher(userid, pushkey) - .map_err(|e| (kind.clone(), e))? + .map_err(|e| (OutgoingDestination::Push(userid.clone(), pushkey.clone()), e))? else { continue; }; @@ -858,4 +1017,40 @@ impl OutgoingDestination { prefix } + + /// This wraps the OutgoingDestination key in an interruptible sleep future. + /// + /// The first return value is the future, the second is the oneshot that + /// interrupts that future, and causes it to return instantly. + fn wrap_in_interruptible_sleep( + self, at: Instant, + ) -> (impl std::future::Future, oneshot::Sender<()>) { + let (tx, rx) = oneshot::channel(); + let at = tokio::time::Instant::from_std(at); + + ( + async move { + _ = tokio::time::timeout_at(at, rx).await; + + self + }, + tx, + ) + } +} + +impl std::fmt::Display for OutgoingDestination { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutgoingDestination::Appservice(appservice_id) => { + write!(f, "Appservice (ID {:?})", appservice_id) + }, + OutgoingDestination::Push(user, push_key) => { + write!(f, "User Push Service (for {:?}, with key {:?})", user, push_key) + }, + OutgoingDestination::Normal(server) => { + write!(f, "Matrix Server ({:?})", server) + }, + } + } }