From fbb21b29bb3e7942c4cd051d3db7316a668cdaef Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti Date: Tue, 25 Feb 2025 12:22:01 -0500 Subject: [PATCH] Define delayed event ratelimit category (#18019) Apply ratelimiting on delayed event management separately from messages. ### Pull Request Checklist * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/18019.feature | 1 + demo/start.sh | 3 + .../conf/workers-shared-extra.yaml.j2 | 4 + .../configuration/config_documentation.md | 23 +++ synapse/config/ratelimiting.py | 6 + synapse/handlers/delayed_events.py | 32 +++- tests/rest/client/test_delayed_events.py | 143 ++++++++++++++++++ tests/rest/client/test_rooms.py | 35 +++++ 8 files changed, 243 insertions(+), 4 deletions(-) create mode 100644 changelog.d/18019.feature diff --git a/changelog.d/18019.feature b/changelog.d/18019.feature new file mode 100644 index 0000000000..74e22df74a --- /dev/null +++ b/changelog.d/18019.feature @@ -0,0 +1 @@ +Define ratelimit configuration for delayed event management. diff --git a/demo/start.sh b/demo/start.sh index 7636c41f1f..e010302bf4 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -142,6 +142,9 @@ for port in 8080 8081 8082; do per_user: per_second: 1000 burst_count: 1000 + rc_delayed_event_mgmt: + per_second: 1000 + burst_count: 1000 RC ) echo "${ratelimiting}" >> "$port.config" diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index ac0c4bb851..9ab8fedcae 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -94,6 +94,10 @@ rc_presence: per_second: 9999 burst_count: 9999 +rc_delayed_event_mgmt: + per_second: 9999 + burst_count: 9999 + federation_rr_transactions_per_room_per_second: 9999 allow_device_name_lookup_over_federation: true diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 8d9a71fb5f..8523c5f65f 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1947,6 +1947,29 @@ rc_presence: burst_count: 1 ``` --- +### `rc_delayed_event_mgmt` + +Ratelimiting settings for delayed event management. + +This is a ratelimiting option that ratelimits +attempts to restart, cancel, or view delayed events +based on the sending client's account and device ID. +It defaults to: `per_second: 1`, `burst_count: 5`. + +Attempts to create or send delayed events are ratelimited not by this setting, but by `rc_message`. + +Setting this to a high value allows clients to make delayed event management requests often +(such as repeatedly restarting a delayed event with a short timeout, +or restarting several different delayed events all at once) +without the risk of being ratelimited. + +Example configuration: +```yaml +rc_delayed_event_mgmt: + per_second: 2 + burst_count: 20 +``` +--- ### `federation_rr_transactions_per_room_per_second` Sets outgoing federation transaction frequency for sending read-receipts, diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 06af4da3c5..eb1dc2dacb 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -234,3 +234,9 @@ class RatelimitConfig(Config): "rc_presence.per_user", defaults={"per_second": 0.1, "burst_count": 1}, ) + + self.rc_delayed_event_mgmt = RatelimitSettings.parse( + config, + "rc_delayed_event_mgmt", + defaults={"per_second": 1, "burst_count": 5}, + ) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index 3c88a96fd3..b3f40809a1 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -19,6 +19,7 @@ from twisted.internet.interfaces import IDelayedCall from synapse.api.constants import EventTypes from synapse.api.errors import ShadowBanError +from synapse.api.ratelimiting import Ratelimiter from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.logging.opentracing import set_tag from synapse.metrics import event_processing_positions @@ -57,10 +58,19 @@ class DelayedEventsHandler: self._storage_controllers = hs.get_storage_controllers() self._config = hs.config self._clock = hs.get_clock() - self._request_ratelimiter = hs.get_request_ratelimiter() self._event_creation_handler = hs.get_event_creation_handler() self._room_member_handler = hs.get_room_member_handler() + self._request_ratelimiter = hs.get_request_ratelimiter() + + # Ratelimiter for management of existing delayed events, + # keyed by the sending user ID & device ID. + self._delayed_event_mgmt_ratelimiter = Ratelimiter( + store=self._store, + clock=self._clock, + cfg=self._config.ratelimiting.rc_delayed_event_mgmt, + ) + self._next_delayed_event_call: Optional[IDelayedCall] = None # The current position in the current_state_delta stream @@ -227,6 +237,9 @@ class DelayedEventsHandler: Raises: SynapseError: if the delayed event fails validation checks. """ + # Use standard request limiter for scheduling new delayed events. + # TODO: Instead apply ratelimiting based on the scheduled send time. + # See https://github.com/element-hq/synapse/issues/18021 await self._request_ratelimiter.ratelimit(requester) self._event_creation_handler.validator.validate_builder( @@ -285,7 +298,10 @@ class DelayedEventsHandler: NotFoundError: if no matching delayed event could be found. """ assert self._is_master - await self._request_ratelimiter.ratelimit(requester) + await self._delayed_event_mgmt_ratelimiter.ratelimit( + requester, + (requester.user.to_string(), requester.device_id), + ) await self._initialized_from_db next_send_ts = await self._store.cancel_delayed_event( @@ -308,7 +324,10 @@ class DelayedEventsHandler: NotFoundError: if no matching delayed event could be found. """ assert self._is_master - await self._request_ratelimiter.ratelimit(requester) + await self._delayed_event_mgmt_ratelimiter.ratelimit( + requester, + (requester.user.to_string(), requester.device_id), + ) await self._initialized_from_db next_send_ts = await self._store.restart_delayed_event( @@ -332,6 +351,8 @@ class DelayedEventsHandler: NotFoundError: if no matching delayed event could be found. """ assert self._is_master + # Use standard request limiter for sending delayed events on-demand, + # as an on-demand send is similar to sending a regular event. await self._request_ratelimiter.ratelimit(requester) await self._initialized_from_db @@ -415,7 +436,10 @@ class DelayedEventsHandler: async def get_all_for_user(self, requester: Requester) -> List[JsonDict]: """Return all pending delayed events requested by the given user.""" - await self._request_ratelimiter.ratelimit(requester) + await self._delayed_event_mgmt_ratelimiter.ratelimit( + requester, + (requester.user.to_string(), requester.device_id), + ) return await self._store.get_all_delayed_events_for_user( requester.user.localpart ) diff --git a/tests/rest/client/test_delayed_events.py b/tests/rest/client/test_delayed_events.py index 1793b38c4a..2c938390c8 100644 --- a/tests/rest/client/test_delayed_events.py +++ b/tests/rest/client/test_delayed_events.py @@ -109,6 +109,27 @@ class DelayedEventsTestCase(HomeserverTestCase): ) self.assertEqual(setter_expected, content.get(setter_key), content) + @unittest.override_config( + {"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}} + ) + def test_get_delayed_events_ratelimit(self) -> None: + args = ("GET", PATH_PREFIX) + + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result) + + # Add the current user to the ratelimit overrides, allowing them no ratelimiting. + self.get_success( + self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0) + ) + + # Test that the request isn't ratelimited anymore. + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + def test_update_delayed_event_without_id(self) -> None: channel = self.make_request( "POST", @@ -206,6 +227,46 @@ class DelayedEventsTestCase(HomeserverTestCase): expect_code=HTTPStatus.NOT_FOUND, ) + @unittest.override_config( + {"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}} + ) + def test_cancel_delayed_event_ratelimit(self) -> None: + delay_ids = [] + for _ in range(2): + channel = self.make_request( + "POST", + _get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000), + {}, + ) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + delay_id = channel.json_body.get("delay_id") + self.assertIsNotNone(delay_id) + delay_ids.append(delay_id) + + channel = self.make_request( + "POST", + f"{PATH_PREFIX}/{delay_ids.pop(0)}", + {"action": "cancel"}, + ) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + + args = ( + "POST", + f"{PATH_PREFIX}/{delay_ids.pop(0)}", + {"action": "cancel"}, + ) + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result) + + # Add the current user to the ratelimit overrides, allowing them no ratelimiting. + self.get_success( + self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0) + ) + + # Test that the request isn't ratelimited anymore. + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + def test_send_delayed_state_event(self) -> None: state_key = "to_send_on_request" @@ -250,6 +311,44 @@ class DelayedEventsTestCase(HomeserverTestCase): ) self.assertEqual(setter_expected, content.get(setter_key), content) + @unittest.override_config({"rc_message": {"per_second": 3.5, "burst_count": 4}}) + def test_send_delayed_event_ratelimit(self) -> None: + delay_ids = [] + for _ in range(2): + channel = self.make_request( + "POST", + _get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000), + {}, + ) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + delay_id = channel.json_body.get("delay_id") + self.assertIsNotNone(delay_id) + delay_ids.append(delay_id) + + channel = self.make_request( + "POST", + f"{PATH_PREFIX}/{delay_ids.pop(0)}", + {"action": "send"}, + ) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + + args = ( + "POST", + f"{PATH_PREFIX}/{delay_ids.pop(0)}", + {"action": "send"}, + ) + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result) + + # Add the current user to the ratelimit overrides, allowing them no ratelimiting. + self.get_success( + self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0) + ) + + # Test that the request isn't ratelimited anymore. + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + def test_restart_delayed_state_event(self) -> None: state_key = "to_send_on_restarted_timeout" @@ -309,6 +408,46 @@ class DelayedEventsTestCase(HomeserverTestCase): ) self.assertEqual(setter_expected, content.get(setter_key), content) + @unittest.override_config( + {"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}} + ) + def test_restart_delayed_event_ratelimit(self) -> None: + delay_ids = [] + for _ in range(2): + channel = self.make_request( + "POST", + _get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000), + {}, + ) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + delay_id = channel.json_body.get("delay_id") + self.assertIsNotNone(delay_id) + delay_ids.append(delay_id) + + channel = self.make_request( + "POST", + f"{PATH_PREFIX}/{delay_ids.pop(0)}", + {"action": "restart"}, + ) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + + args = ( + "POST", + f"{PATH_PREFIX}/{delay_ids.pop(0)}", + {"action": "restart"}, + ) + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result) + + # Add the current user to the ratelimit overrides, allowing them no ratelimiting. + self.get_success( + self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0) + ) + + # Test that the request isn't ratelimited anymore. + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + def test_delayed_state_events_are_cancelled_by_more_recent_state(self) -> None: state_key = "to_be_cancelled" @@ -374,3 +513,7 @@ def _get_path_for_delayed_state( room_id: str, event_type: str, state_key: str, delay_ms: int ) -> str: return f"rooms/{room_id}/state/{event_type}/{state_key}?org.matrix.msc4140.delay={delay_ms}" + + +def _get_path_for_delayed_send(room_id: str, event_type: str, delay_ms: int) -> str: + return f"rooms/{room_id}/send/{event_type}?org.matrix.msc4140.delay={delay_ms}" diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index a7108b905a..dd8350ddd1 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -2399,6 +2399,41 @@ class RoomDelayedEventTestCase(RoomBase): ) self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + @unittest.override_config( + { + "max_event_delay_duration": "24h", + "rc_message": {"per_second": 1, "burst_count": 2}, + } + ) + def test_add_delayed_event_ratelimit(self) -> None: + """Test that requests to schedule new delayed events are ratelimited by a RateLimiter, + which ratelimits them correctly, including by not limiting when the requester is + exempt from ratelimiting. + """ + + # Test that new delayed events are correctly ratelimited. + args = ( + "POST", + ( + "rooms/%s/send/m.room.message?org.matrix.msc4140.delay=2000" + % self.room_id + ).encode("ascii"), + {"body": "test", "msgtype": "m.text"}, + ) + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result) + + # Add the current user to the ratelimit overrides, allowing them no ratelimiting. + self.get_success( + self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0) + ) + + # Test that the new delayed events aren't ratelimited anymore. + channel = self.make_request(*args) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + class RoomSearchTestCase(unittest.HomeserverTestCase): servlets = [