mirror of
https://github.com/girlbossceo/conduwuit.git
synced 2025-03-14 18:55:37 +00:00
pipeline pdu fetch for federation sending destination
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
b2a565b0b4
commit
ffd0fd4242
1 changed files with 64 additions and 71 deletions
|
@ -8,12 +8,12 @@ use std::{
|
|||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
|
||||
use conduwuit::{
|
||||
debug, err, error,
|
||||
result::LogErr,
|
||||
trace,
|
||||
utils::{calculate_hash, continue_exponential_backoff_secs, ReadyExt},
|
||||
utils::{calculate_hash, continue_exponential_backoff_secs, stream::IterStream, ReadyExt},
|
||||
warn, Error, Result,
|
||||
};
|
||||
use futures::{
|
||||
|
@ -38,7 +38,9 @@ use ruma::{
|
|||
push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent,
|
||||
GlobalAccountDataEventType,
|
||||
},
|
||||
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
|
||||
push,
|
||||
serde::Raw,
|
||||
uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName,
|
||||
OwnedUserId, RoomId, RoomVersionId, ServerName, UInt,
|
||||
};
|
||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||
|
@ -633,7 +635,7 @@ impl Service {
|
|||
}
|
||||
|
||||
fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> {
|
||||
//debug_assert!(!events.is_empty(), "sending empty transaction");
|
||||
debug_assert!(!events.is_empty(), "sending empty transaction");
|
||||
match dest {
|
||||
| Destination::Federation(server) =>
|
||||
self.send_events_dest_federation(server, events).boxed(),
|
||||
|
@ -698,7 +700,7 @@ impl Service {
|
|||
| SendingEvent::Flush => None,
|
||||
}));
|
||||
|
||||
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
|
||||
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
|
||||
|
||||
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
||||
// transaction");
|
||||
|
@ -796,81 +798,72 @@ impl Service {
|
|||
Ok(Destination::Push(user_id, pushkey))
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "fed",
|
||||
level = "debug",
|
||||
skip(self, events),
|
||||
fields(
|
||||
events = %events.len(),
|
||||
),
|
||||
)]
|
||||
async fn send_events_dest_federation(
|
||||
&self,
|
||||
server: OwnedServerName,
|
||||
events: Vec<SendingEvent>,
|
||||
) -> SendingResult {
|
||||
let mut pdu_jsons = Vec::with_capacity(
|
||||
events
|
||||
.iter()
|
||||
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
|
||||
.count(),
|
||||
);
|
||||
let mut edu_jsons = Vec::with_capacity(
|
||||
events
|
||||
.iter()
|
||||
.filter(|event| matches!(event, SendingEvent::Edu(_)))
|
||||
.count(),
|
||||
);
|
||||
let pdus: Vec<_> = events
|
||||
.iter()
|
||||
.filter_map(|pdu| match pdu {
|
||||
| SendingEvent::Pdu(pdu) => Some(pdu),
|
||||
| _ => None,
|
||||
})
|
||||
.stream()
|
||||
.then(|pdu_id| self.services.timeline.get_pdu_json_from_id(pdu_id))
|
||||
.ready_filter_map(Result::ok)
|
||||
.then(|pdu| self.convert_to_outgoing_federation_event(pdu))
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
for event in &events {
|
||||
match event {
|
||||
// TODO: check room version and remove event_id if needed
|
||||
| SendingEvent::Pdu(pdu_id) => {
|
||||
if let Ok(pdu) = self.services.timeline.get_pdu_json_from_id(pdu_id).await {
|
||||
pdu_jsons.push(self.convert_to_outgoing_federation_event(pdu).await);
|
||||
}
|
||||
},
|
||||
| SendingEvent::Edu(edu) =>
|
||||
if let Ok(raw) = serde_json::from_slice(edu) {
|
||||
edu_jsons.push(raw);
|
||||
},
|
||||
| SendingEvent::Flush => {}, // flush only; no new content
|
||||
let edus: Vec<Raw<Edu>> = events
|
||||
.iter()
|
||||
.filter_map(|edu| match edu {
|
||||
| SendingEvent::Edu(edu) => Some(edu.as_ref()),
|
||||
| _ => None,
|
||||
})
|
||||
.map(serde_json::from_slice)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
if pdus.is_empty() && edus.is_empty() {
|
||||
return Ok(Destination::Federation(server));
|
||||
}
|
||||
|
||||
let preimage = pdus
|
||||
.iter()
|
||||
.map(|raw| raw.get().as_bytes())
|
||||
.chain(edus.iter().map(|raw| raw.json().get().as_bytes()));
|
||||
|
||||
let txn_hash = calculate_hash(preimage);
|
||||
let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
|
||||
let request = send_transaction_message::v1::Request {
|
||||
transaction_id: txn_id.into(),
|
||||
origin: self.server.name.clone(),
|
||||
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||
pdus,
|
||||
edus,
|
||||
};
|
||||
|
||||
let result = self
|
||||
.services
|
||||
.federation
|
||||
.execute_on(&self.services.client.sender, &server, request)
|
||||
.await;
|
||||
|
||||
for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
|
||||
if let Err(e) = result {
|
||||
warn!(
|
||||
%txn_id, %server,
|
||||
"error sending PDU {event_id} to remote server: {e:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
|
||||
// transaction");
|
||||
|
||||
let txn_hash = calculate_hash(events.iter().filter_map(|e| match e {
|
||||
| SendingEvent::Edu(b) => Some(&**b),
|
||||
| SendingEvent::Pdu(b) => Some(b.as_ref()),
|
||||
| SendingEvent::Flush => None,
|
||||
}));
|
||||
|
||||
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(txn_hash);
|
||||
|
||||
let request = send_transaction_message::v1::Request {
|
||||
origin: self.server.name.clone(),
|
||||
pdus: pdu_jsons,
|
||||
edus: edu_jsons,
|
||||
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||
transaction_id: txn_id.into(),
|
||||
};
|
||||
|
||||
let client = &self.services.client.sender;
|
||||
self.services.federation.execute_on(client, &server, request)
|
||||
.await
|
||||
.inspect(|response| {
|
||||
response
|
||||
.pdus
|
||||
.iter()
|
||||
.filter(|(_, res)| res.is_err())
|
||||
.for_each(
|
||||
|(pdu_id, res)| warn!(%txn_id, %server, "error sending PDU {pdu_id} to remote server: {res:?}"),
|
||||
);
|
||||
})
|
||||
.map_err(|e| (Destination::Federation(server.clone()), e))
|
||||
.map(|_| Destination::Federation(server))
|
||||
match result {
|
||||
| Err(error) => Err((Destination::Federation(server), error)),
|
||||
| Ok(_) => Ok(Destination::Federation(server)),
|
||||
}
|
||||
}
|
||||
|
||||
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
|
||||
|
|
Loading…
Add table
Reference in a new issue