mirror of
https://github.com/element-hq/synapse.git
synced 2025-03-14 09:45:51 +00:00
Define delayed event ratelimit category (#18019)
Apply ratelimiting on delayed event management separately from messages. ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
parent
0fa7ffd76f
commit
fbb21b29bb
8 changed files with 243 additions and 4 deletions
1
changelog.d/18019.feature
Normal file
1
changelog.d/18019.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Define ratelimit configuration for delayed event management.
|
|
@ -142,6 +142,9 @@ for port in 8080 8081 8082; do
|
|||
per_user:
|
||||
per_second: 1000
|
||||
burst_count: 1000
|
||||
rc_delayed_event_mgmt:
|
||||
per_second: 1000
|
||||
burst_count: 1000
|
||||
RC
|
||||
)
|
||||
echo "${ratelimiting}" >> "$port.config"
|
||||
|
|
|
@ -94,6 +94,10 @@ rc_presence:
|
|||
per_second: 9999
|
||||
burst_count: 9999
|
||||
|
||||
rc_delayed_event_mgmt:
|
||||
per_second: 9999
|
||||
burst_count: 9999
|
||||
|
||||
federation_rr_transactions_per_room_per_second: 9999
|
||||
|
||||
allow_device_name_lookup_over_federation: true
|
||||
|
|
|
@ -1947,6 +1947,29 @@ rc_presence:
|
|||
burst_count: 1
|
||||
```
|
||||
---
|
||||
### `rc_delayed_event_mgmt`
|
||||
|
||||
Ratelimiting settings for delayed event management.
|
||||
|
||||
This is a ratelimiting option that ratelimits
|
||||
attempts to restart, cancel, or view delayed events
|
||||
based on the sending client's account and device ID.
|
||||
It defaults to: `per_second: 1`, `burst_count: 5`.
|
||||
|
||||
Attempts to create or send delayed events are ratelimited not by this setting, but by `rc_message`.
|
||||
|
||||
Setting this to a high value allows clients to make delayed event management requests often
|
||||
(such as repeatedly restarting a delayed event with a short timeout,
|
||||
or restarting several different delayed events all at once)
|
||||
without the risk of being ratelimited.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
rc_delayed_event_mgmt:
|
||||
per_second: 2
|
||||
burst_count: 20
|
||||
```
|
||||
---
|
||||
### `federation_rr_transactions_per_room_per_second`
|
||||
|
||||
Sets outgoing federation transaction frequency for sending read-receipts,
|
||||
|
|
|
@ -234,3 +234,9 @@ class RatelimitConfig(Config):
|
|||
"rc_presence.per_user",
|
||||
defaults={"per_second": 0.1, "burst_count": 1},
|
||||
)
|
||||
|
||||
self.rc_delayed_event_mgmt = RatelimitSettings.parse(
|
||||
config,
|
||||
"rc_delayed_event_mgmt",
|
||||
defaults={"per_second": 1, "burst_count": 5},
|
||||
)
|
||||
|
|
|
@ -19,6 +19,7 @@ from twisted.internet.interfaces import IDelayedCall
|
|||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import ShadowBanError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
|
||||
from synapse.logging.opentracing import set_tag
|
||||
from synapse.metrics import event_processing_positions
|
||||
|
@ -57,10 +58,19 @@ class DelayedEventsHandler:
|
|||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._config = hs.config
|
||||
self._clock = hs.get_clock()
|
||||
self._request_ratelimiter = hs.get_request_ratelimiter()
|
||||
self._event_creation_handler = hs.get_event_creation_handler()
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
|
||||
self._request_ratelimiter = hs.get_request_ratelimiter()
|
||||
|
||||
# Ratelimiter for management of existing delayed events,
|
||||
# keyed by the sending user ID & device ID.
|
||||
self._delayed_event_mgmt_ratelimiter = Ratelimiter(
|
||||
store=self._store,
|
||||
clock=self._clock,
|
||||
cfg=self._config.ratelimiting.rc_delayed_event_mgmt,
|
||||
)
|
||||
|
||||
self._next_delayed_event_call: Optional[IDelayedCall] = None
|
||||
|
||||
# The current position in the current_state_delta stream
|
||||
|
@ -227,6 +237,9 @@ class DelayedEventsHandler:
|
|||
Raises:
|
||||
SynapseError: if the delayed event fails validation checks.
|
||||
"""
|
||||
# Use standard request limiter for scheduling new delayed events.
|
||||
# TODO: Instead apply ratelimiting based on the scheduled send time.
|
||||
# See https://github.com/element-hq/synapse/issues/18021
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
|
||||
self._event_creation_handler.validator.validate_builder(
|
||||
|
@ -285,7 +298,10 @@ class DelayedEventsHandler:
|
|||
NotFoundError: if no matching delayed event could be found.
|
||||
"""
|
||||
assert self._is_master
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await self._delayed_event_mgmt_ratelimiter.ratelimit(
|
||||
requester,
|
||||
(requester.user.to_string(), requester.device_id),
|
||||
)
|
||||
await self._initialized_from_db
|
||||
|
||||
next_send_ts = await self._store.cancel_delayed_event(
|
||||
|
@ -308,7 +324,10 @@ class DelayedEventsHandler:
|
|||
NotFoundError: if no matching delayed event could be found.
|
||||
"""
|
||||
assert self._is_master
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await self._delayed_event_mgmt_ratelimiter.ratelimit(
|
||||
requester,
|
||||
(requester.user.to_string(), requester.device_id),
|
||||
)
|
||||
await self._initialized_from_db
|
||||
|
||||
next_send_ts = await self._store.restart_delayed_event(
|
||||
|
@ -332,6 +351,8 @@ class DelayedEventsHandler:
|
|||
NotFoundError: if no matching delayed event could be found.
|
||||
"""
|
||||
assert self._is_master
|
||||
# Use standard request limiter for sending delayed events on-demand,
|
||||
# as an on-demand send is similar to sending a regular event.
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await self._initialized_from_db
|
||||
|
||||
|
@ -415,7 +436,10 @@ class DelayedEventsHandler:
|
|||
|
||||
async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
|
||||
"""Return all pending delayed events requested by the given user."""
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await self._delayed_event_mgmt_ratelimiter.ratelimit(
|
||||
requester,
|
||||
(requester.user.to_string(), requester.device_id),
|
||||
)
|
||||
return await self._store.get_all_delayed_events_for_user(
|
||||
requester.user.localpart
|
||||
)
|
||||
|
|
|
@ -109,6 +109,27 @@ class DelayedEventsTestCase(HomeserverTestCase):
|
|||
)
|
||||
self.assertEqual(setter_expected, content.get(setter_key), content)
|
||||
|
||||
@unittest.override_config(
|
||||
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
|
||||
)
|
||||
def test_get_delayed_events_ratelimit(self) -> None:
|
||||
args = ("GET", PATH_PREFIX)
|
||||
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
|
||||
|
||||
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
|
||||
)
|
||||
|
||||
# Test that the request isn't ratelimited anymore.
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
def test_update_delayed_event_without_id(self) -> None:
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
|
@ -206,6 +227,46 @@ class DelayedEventsTestCase(HomeserverTestCase):
|
|||
expect_code=HTTPStatus.NOT_FOUND,
|
||||
)
|
||||
|
||||
@unittest.override_config(
|
||||
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
|
||||
)
|
||||
def test_cancel_delayed_event_ratelimit(self) -> None:
|
||||
delay_ids = []
|
||||
for _ in range(2):
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
|
||||
{},
|
||||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
delay_id = channel.json_body.get("delay_id")
|
||||
self.assertIsNotNone(delay_id)
|
||||
delay_ids.append(delay_id)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
|
||||
{"action": "cancel"},
|
||||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
args = (
|
||||
"POST",
|
||||
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
|
||||
{"action": "cancel"},
|
||||
)
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
|
||||
|
||||
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
|
||||
)
|
||||
|
||||
# Test that the request isn't ratelimited anymore.
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
def test_send_delayed_state_event(self) -> None:
|
||||
state_key = "to_send_on_request"
|
||||
|
||||
|
@ -250,6 +311,44 @@ class DelayedEventsTestCase(HomeserverTestCase):
|
|||
)
|
||||
self.assertEqual(setter_expected, content.get(setter_key), content)
|
||||
|
||||
@unittest.override_config({"rc_message": {"per_second": 3.5, "burst_count": 4}})
|
||||
def test_send_delayed_event_ratelimit(self) -> None:
|
||||
delay_ids = []
|
||||
for _ in range(2):
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
|
||||
{},
|
||||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
delay_id = channel.json_body.get("delay_id")
|
||||
self.assertIsNotNone(delay_id)
|
||||
delay_ids.append(delay_id)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
|
||||
{"action": "send"},
|
||||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
args = (
|
||||
"POST",
|
||||
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
|
||||
{"action": "send"},
|
||||
)
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
|
||||
|
||||
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
|
||||
)
|
||||
|
||||
# Test that the request isn't ratelimited anymore.
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
def test_restart_delayed_state_event(self) -> None:
|
||||
state_key = "to_send_on_restarted_timeout"
|
||||
|
||||
|
@ -309,6 +408,46 @@ class DelayedEventsTestCase(HomeserverTestCase):
|
|||
)
|
||||
self.assertEqual(setter_expected, content.get(setter_key), content)
|
||||
|
||||
@unittest.override_config(
|
||||
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
|
||||
)
|
||||
def test_restart_delayed_event_ratelimit(self) -> None:
|
||||
delay_ids = []
|
||||
for _ in range(2):
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
|
||||
{},
|
||||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
delay_id = channel.json_body.get("delay_id")
|
||||
self.assertIsNotNone(delay_id)
|
||||
delay_ids.append(delay_id)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
|
||||
{"action": "restart"},
|
||||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
args = (
|
||||
"POST",
|
||||
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
|
||||
{"action": "restart"},
|
||||
)
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
|
||||
|
||||
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
|
||||
)
|
||||
|
||||
# Test that the request isn't ratelimited anymore.
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
def test_delayed_state_events_are_cancelled_by_more_recent_state(self) -> None:
|
||||
state_key = "to_be_cancelled"
|
||||
|
||||
|
@ -374,3 +513,7 @@ def _get_path_for_delayed_state(
|
|||
room_id: str, event_type: str, state_key: str, delay_ms: int
|
||||
) -> str:
|
||||
return f"rooms/{room_id}/state/{event_type}/{state_key}?org.matrix.msc4140.delay={delay_ms}"
|
||||
|
||||
|
||||
def _get_path_for_delayed_send(room_id: str, event_type: str, delay_ms: int) -> str:
|
||||
return f"rooms/{room_id}/send/{event_type}?org.matrix.msc4140.delay={delay_ms}"
|
||||
|
|
|
@ -2399,6 +2399,41 @@ class RoomDelayedEventTestCase(RoomBase):
|
|||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
@unittest.override_config(
|
||||
{
|
||||
"max_event_delay_duration": "24h",
|
||||
"rc_message": {"per_second": 1, "burst_count": 2},
|
||||
}
|
||||
)
|
||||
def test_add_delayed_event_ratelimit(self) -> None:
|
||||
"""Test that requests to schedule new delayed events are ratelimited by a RateLimiter,
|
||||
which ratelimits them correctly, including by not limiting when the requester is
|
||||
exempt from ratelimiting.
|
||||
"""
|
||||
|
||||
# Test that new delayed events are correctly ratelimited.
|
||||
args = (
|
||||
"POST",
|
||||
(
|
||||
"rooms/%s/send/m.room.message?org.matrix.msc4140.delay=2000"
|
||||
% self.room_id
|
||||
).encode("ascii"),
|
||||
{"body": "test", "msgtype": "m.text"},
|
||||
)
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
|
||||
|
||||
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
|
||||
)
|
||||
|
||||
# Test that the new delayed events aren't ratelimited anymore.
|
||||
channel = self.make_request(*args)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
|
||||
|
||||
class RoomSearchTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
|
|
Loading…
Add table
Reference in a new issue