pipeline pdu fetch for federation sending destination

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-26 21:46:46 +00:00
parent b2a565b0b4
commit ffd0fd4242

View file

@ -8,12 +8,12 @@ use std::{
time::{Duration, Instant},
};
use base64::{engine::general_purpose, Engine as _};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
use conduwuit::{
debug, err, error,
result::LogErr,
trace,
utils::{calculate_hash, continue_exponential_backoff_secs, ReadyExt},
utils::{calculate_hash, continue_exponential_backoff_secs, stream::IterStream, ReadyExt},
warn, Error, Result,
};
use futures::{
@ -38,7 +38,9 @@ use ruma::{
push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent,
GlobalAccountDataEventType,
},
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
push,
serde::Raw,
uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
OwnedUserId, RoomId, RoomVersionId, ServerName, UInt,
};
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
@ -633,7 +635,7 @@ impl Service {
}
fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> {
//debug_assert!(!events.is_empty(), "sending empty transaction");
debug_assert!(!events.is_empty(), "sending empty transaction");
match dest {
| Destination::Federation(server) =>
self.send_events_dest_federation(server, events).boxed(),
@ -698,7 +700,7 @@ impl Service {
| SendingEvent::Flush => None,
}));
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
@ -796,81 +798,72 @@ impl Service {
Ok(Destination::Push(user_id, pushkey))
}
#[tracing::instrument(
name = "fed",
level = "debug",
skip(self, events),
fields(
events = %events.len(),
),
)]
async fn send_events_dest_federation(
&self,
server: OwnedServerName,
events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdu_jsons = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(),
);
let mut edu_jsons = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Edu(_)))
.count(),
);
let pdus: Vec<_> = events
.iter()
.filter_map(|pdu| match pdu {
| SendingEvent::Pdu(pdu) => Some(pdu),
| _ => None,
})
.stream()
.then(|pdu_id| self.services.timeline.get_pdu_json_from_id(pdu_id))
.ready_filter_map(Result::ok)
.then(|pdu| self.convert_to_outgoing_federation_event(pdu))
.collect()
.await;
for event in &events {
match event {
// TODO: check room version and remove event_id if needed
| SendingEvent::Pdu(pdu_id) => {
if let Ok(pdu) = self.services.timeline.get_pdu_json_from_id(pdu_id).await {
pdu_jsons.push(self.convert_to_outgoing_federation_event(pdu).await);
}
},
| SendingEvent::Edu(edu) =>
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
},
| SendingEvent::Flush => {}, // flush only; no new content
let edus: Vec<Raw<Edu>> = events
.iter()
.filter_map(|edu| match edu {
| SendingEvent::Edu(edu) => Some(edu.as_ref()),
| _ => None,
})
.map(serde_json::from_slice)
.filter_map(Result::ok)
.collect();
if pdus.is_empty() && edus.is_empty() {
return Ok(Destination::Federation(server));
}
let preimage = pdus
.iter()
.map(|raw| raw.get().as_bytes())
.chain(edus.iter().map(|raw| raw.json().get().as_bytes()));
let txn_hash = calculate_hash(preimage);
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
let request = send_transaction_message::v1::Request {
transaction_id: txn_id.into(),
origin: self.server.name.clone(),
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
pdus,
edus,
};
let result = self
.services
.federation
.execute_on(&self.services.client.sender, &server, request)
.await;
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
if let Err(e) = result {
warn!(
%txn_id, %server,
"error sending PDU {event_id} to remote server: {e:?}"
);
}
}
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let txn_hash = calculate_hash(events.iter().filter_map(|e| match e {
| SendingEvent::Edu(b) => Some(&**b),
| SendingEvent::Pdu(b) => Some(b.as_ref()),
| SendingEvent::Flush => None,
}));
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
let request = send_transaction_message::v1::Request {
origin: self.server.name.clone(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: txn_id.into(),
};
let client = &self.services.client.sender;
self.services.federation.execute_on(client, &server, request)
.await
.inspect(|response| {
response
.pdus
.iter()
.filter(|(_, res)| res.is_err())
.for_each(
|(pdu_id, res)| warn!(%txn_id, %server, "error sending PDU {pdu_id} to remote server: {res:?}"),
);
})
.map_err(|e| (Destination::Federation(server.clone()), e))
.map(|_| Destination::Federation(server))
match result {
| Err(error) => Err((Destination::Federation(server), error)),
| Ok(_) => Ok(Destination::Federation(server)),
}
}
/// This does not return a full `Pdu` it is only to satisfy ruma's types.