pipeline prologue of handle_incoming_pdu

simplify room_version/first_pdu_in_room argument passing

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-26 06:15:01 +00:00
parent 2b730a30ad
commit 677316631a
7 changed files with 62 additions and 56 deletions

View file

@ -10,10 +10,11 @@ use conduwuit::{
};
use futures::TryFutureExt;
use ruma::{
api::federation::event::get_event, CanonicalJsonValue, OwnedEventId, RoomId, RoomVersionId,
ServerName,
api::federation::event::get_event, CanonicalJsonValue, OwnedEventId, RoomId, ServerName,
};
use super::get_room_version_id;
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
@ -30,7 +31,6 @@ pub(super) async fn fetch_and_handle_outliers<'a>(
events: &'a [OwnedEventId],
create_event: &'a PduEvent,
room_id: &'a RoomId,
room_version_id: &'a RoomVersionId,
) -> Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)> {
let back_off = |id| match self
.services
@ -113,8 +113,13 @@ pub(super) async fn fetch_and_handle_outliers<'a>(
{
| Ok(res) => {
debug!("Got {next_id} over federation");
let Ok(room_version_id) = get_room_version_id(create_event) else {
back_off((*next_id).to_owned());
continue;
};
let Ok((calculated_event_id, value)) =
pdu::gen_event_id_canonical_json(&res.pdu, room_version_id)
pdu::gen_event_id_canonical_json(&res.pdu, &room_version_id)
else {
back_off((*next_id).to_owned());
continue;

View file

@ -8,8 +8,7 @@ use futures::{future, FutureExt};
use ruma::{
int,
state_res::{self},
uint, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, RoomVersionId,
ServerName,
uint, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, UInt,
};
use super::check_room_id;
@ -26,7 +25,7 @@ pub(super) async fn fetch_prev(
origin: &ServerName,
create_event: &PduEvent,
room_id: &RoomId,
room_version_id: &RoomVersionId,
first_ts_in_room: UInt,
initial_set: Vec<OwnedEventId>,
) -> Result<(
Vec<OwnedEventId>,
@ -36,21 +35,13 @@ pub(super) async fn fetch_prev(
let mut eventid_info = HashMap::new();
let mut todo_outlier_stack: VecDeque<OwnedEventId> = initial_set.into();
let first_pdu_in_room = self.services.timeline.first_pdu_in_room(room_id).await?;
let mut amount = 0;
while let Some(prev_event_id) = todo_outlier_stack.pop_front() {
self.services.server.check_running()?;
if let Some((pdu, mut json_opt)) = self
.fetch_and_handle_outliers(
origin,
&[prev_event_id.clone()],
create_event,
room_id,
room_version_id,
)
.fetch_and_handle_outliers(origin, &[prev_event_id.clone()], create_event, room_id)
.boxed()
.await
.pop()
@ -74,7 +65,7 @@ pub(super) async fn fetch_prev(
}
if let Some(json) = json_opt {
if pdu.origin_server_ts > first_pdu_in_room.origin_server_ts {
if pdu.origin_server_ts > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in &pdu.prev_events {
if !graph.contains_key(prev_prev) {

View file

@ -4,7 +4,7 @@ use conduwuit::{debug, debug_warn, implement, Err, Error, PduEvent, Result};
use futures::FutureExt;
use ruma::{
api::federation::event::get_room_state_ids, events::StateEventType, EventId, OwnedEventId,
RoomId, RoomVersionId, ServerName,
RoomId, ServerName,
};
use crate::rooms::short::ShortStateKey;
@ -23,7 +23,6 @@ pub(super) async fn fetch_state(
origin: &ServerName,
create_event: &PduEvent,
room_id: &RoomId,
room_version_id: &RoomVersionId,
event_id: &EventId,
) -> Result<Option<HashMap<u64, OwnedEventId>>> {
let res = self
@ -38,7 +37,7 @@ pub(super) async fn fetch_state(
debug!("Fetching state events");
let state_vec = self
.fetch_and_handle_outliers(origin, &res.pdu_ids, create_event, room_id, room_version_id)
.fetch_and_handle_outliers(origin, &res.pdu_ids, create_event, room_id)
.boxed()
.await;

View file

@ -1,14 +1,15 @@
use std::{
collections::{hash_map, BTreeMap},
sync::Arc,
time::Instant,
};
use conduwuit::{debug, err, implement, warn, Err, Result};
use futures::{FutureExt, TryFutureExt};
use futures::{
future::{try_join5, OptionFuture},
FutureExt,
};
use ruma::{events::StateEventType, CanonicalJsonValue, EventId, RoomId, ServerName, UserId};
use super::{check_room_id, get_room_version_id};
use crate::rooms::timeline::RawPduId;
/// When receiving an event one needs to:
@ -59,19 +60,13 @@ pub async fn handle_incoming_pdu<'a>(
}
// 1.1 Check the server is in the room
if !self.services.metadata.exists(room_id).await {
return Err!(Request(NotFound("Room is unknown to this server")));
}
let meta_exists = self.services.metadata.exists(room_id).map(Ok);
// 1.2 Check if the room is disabled
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(
"Federation of this room is currently disabled on this server."
)));
}
let is_disabled = self.services.metadata.is_disabled(room_id).map(Ok);
// 1.3.1 Check room ACL on origin field/server
self.acl_check(origin, room_id).await?;
let origin_acl_check = self.acl_check(origin, room_id);
// 1.3.2 Check room ACL on sender's server name
let sender: &UserId = value
@ -79,36 +74,53 @@ pub async fn handle_incoming_pdu<'a>(
.try_into()
.map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
if sender.server_name() != origin {
self.acl_check(sender.server_name(), room_id).await?;
}
let sender_acl_check: OptionFuture<_> = sender
.server_name()
.ne(origin)
.then(|| self.acl_check(sender.server_name(), room_id))
.into();
// Fetch create event
let create_event = self
.services
.state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, "")
.map_ok(Arc::new)
.await?;
let create_event =
self.services
.state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, "");
// Procure the room version
let room_version_id = get_room_version_id(&create_event)?;
let (meta_exists, is_disabled, (), (), create_event) = try_join5(
meta_exists,
is_disabled,
origin_acl_check,
sender_acl_check.map(|o| o.unwrap_or(Ok(()))),
create_event,
)
.await?;
let first_pdu_in_room = self.services.timeline.first_pdu_in_room(room_id).await?;
if !meta_exists {
return Err!(Request(NotFound("Room is unknown to this server")));
}
if is_disabled {
return Err!(Request(Forbidden("Federation of this room is disabled by this server.")));
}
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false)
.boxed()
.await?;
check_room_id(room_id, &incoming_pdu)?;
// 8. if not timeline event: stop
if !is_timeline_event {
return Ok(None);
}
// Skip old events
if incoming_pdu.origin_server_ts < first_pdu_in_room.origin_server_ts {
let first_ts_in_room = self
.services
.timeline
.first_pdu_in_room(room_id)
.await?
.origin_server_ts;
if incoming_pdu.origin_server_ts < first_ts_in_room {
return Ok(None);
}
@ -119,7 +131,7 @@ pub async fn handle_incoming_pdu<'a>(
origin,
&create_event,
room_id,
&room_version_id,
first_ts_in_room,
incoming_pdu.prev_events.clone(),
)
.await?;
@ -134,7 +146,7 @@ pub async fn handle_incoming_pdu<'a>(
room_id,
&mut eventid_info,
&create_event,
&first_pdu_in_room,
first_ts_in_room,
&prev_id,
)
.await

View file

@ -84,7 +84,6 @@ pub(super) async fn handle_outlier_pdu<'a>(
&incoming_pdu.auth_events,
create_event,
room_id,
&room_version_id,
))
.await;
}

View file

@ -7,7 +7,7 @@ use std::{
use conduwuit::{
debug, implement, utils::continue_exponential_backoff_secs, Err, PduEvent, Result,
};
use ruma::{CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName};
use ruma::{CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, UInt};
#[implement(super::Service)]
#[allow(clippy::type_complexity)]
@ -27,8 +27,8 @@ pub(super) async fn handle_prev_pdu<'a>(
OwnedEventId,
(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>),
>,
create_event: &Arc<PduEvent>,
first_pdu_in_room: &PduEvent,
create_event: &PduEvent,
first_ts_in_room: UInt,
prev_id: &EventId,
) -> Result {
// Check for disabled again because it might have changed
@ -62,7 +62,7 @@ pub(super) async fn handle_prev_pdu<'a>(
if let Some((pdu, json)) = eventid_info.remove(prev_id) {
// Skip old events
if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts {
if pdu.origin_server_ts < first_ts_in_room {
return Ok(());
}

View file

@ -63,7 +63,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
if state_at_incoming_event.is_none() {
state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, &room_version_id, &incoming_pdu.event_id)
.fetch_state(origin, create_event, room_id, &incoming_pdu.event_id)
.await?;
}