Skip to content

Commit fbb21b2

Browse files
Define delayed event ratelimit category (#18019)
Apply ratelimiting on delayed event management separately from messages. ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Andrew Morgan <[email protected]>
1 parent 0fa7ffd commit fbb21b2

File tree

8 files changed

+243
-4
lines changed

8 files changed

+243
-4
lines changed

changelog.d/18019.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Define ratelimit configuration for delayed event management.

demo/start.sh

+3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ for port in 8080 8081 8082; do
142142
per_user:
143143
per_second: 1000
144144
burst_count: 1000
145+
rc_delayed_event_mgmt:
146+
per_second: 1000
147+
burst_count: 1000
145148
RC
146149
)
147150
echo "${ratelimiting}" >> "$port.config"

docker/complement/conf/workers-shared-extra.yaml.j2

+4
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ rc_presence:
9494
per_second: 9999
9595
burst_count: 9999
9696

97+
rc_delayed_event_mgmt:
98+
per_second: 9999
99+
burst_count: 9999
100+
97101
federation_rr_transactions_per_room_per_second: 9999
98102

99103
allow_device_name_lookup_over_federation: true

docs/usage/configuration/config_documentation.md

+23
Original file line numberDiff line numberDiff line change
@@ -1947,6 +1947,29 @@ rc_presence:
19471947
burst_count: 1
19481948
```
19491949
---
1950+
### `rc_delayed_event_mgmt`
1951+
1952+
Ratelimiting settings for delayed event management.
1953+
1954+
This is a ratelimiting option that ratelimits
1955+
attempts to restart, cancel, or view delayed events
1956+
based on the sending client's account and device ID.
1957+
It defaults to: `per_second: 1`, `burst_count: 5`.
1958+
1959+
Attempts to create or send delayed events are ratelimited not by this setting, but by `rc_message`.
1960+
1961+
Setting this to a high value allows clients to make delayed event management requests often
1962+
(such as repeatedly restarting a delayed event with a short timeout,
1963+
or restarting several different delayed events all at once)
1964+
without the risk of being ratelimited.
1965+
1966+
Example configuration:
1967+
```yaml
1968+
rc_delayed_event_mgmt:
1969+
per_second: 2
1970+
burst_count: 20
1971+
```
1972+
---
19501973
### `federation_rr_transactions_per_room_per_second`
19511974

19521975
Sets outgoing federation transaction frequency for sending read-receipts,

synapse/config/ratelimiting.py

+6
Original file line numberDiff line numberDiff line change
@@ -234,3 +234,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
234234
"rc_presence.per_user",
235235
defaults={"per_second": 0.1, "burst_count": 1},
236236
)
237+
238+
self.rc_delayed_event_mgmt = RatelimitSettings.parse(
239+
config,
240+
"rc_delayed_event_mgmt",
241+
defaults={"per_second": 1, "burst_count": 5},
242+
)

synapse/handlers/delayed_events.py

+28-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from synapse.api.constants import EventTypes
2121
from synapse.api.errors import ShadowBanError
22+
from synapse.api.ratelimiting import Ratelimiter
2223
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
2324
from synapse.logging.opentracing import set_tag
2425
from synapse.metrics import event_processing_positions
@@ -57,10 +58,19 @@ def __init__(self, hs: "HomeServer"):
5758
self._storage_controllers = hs.get_storage_controllers()
5859
self._config = hs.config
5960
self._clock = hs.get_clock()
60-
self._request_ratelimiter = hs.get_request_ratelimiter()
6161
self._event_creation_handler = hs.get_event_creation_handler()
6262
self._room_member_handler = hs.get_room_member_handler()
6363

64+
self._request_ratelimiter = hs.get_request_ratelimiter()
65+
66+
# Ratelimiter for management of existing delayed events,
67+
# keyed by the sending user ID & device ID.
68+
self._delayed_event_mgmt_ratelimiter = Ratelimiter(
69+
store=self._store,
70+
clock=self._clock,
71+
cfg=self._config.ratelimiting.rc_delayed_event_mgmt,
72+
)
73+
6474
self._next_delayed_event_call: Optional[IDelayedCall] = None
6575

6676
# The current position in the current_state_delta stream
@@ -227,6 +237,9 @@ async def add(
227237
Raises:
228238
SynapseError: if the delayed event fails validation checks.
229239
"""
240+
# Use standard request limiter for scheduling new delayed events.
241+
# TODO: Instead apply ratelimiting based on the scheduled send time.
242+
# See https://github.com/element-hq/synapse/issues/18021
230243
await self._request_ratelimiter.ratelimit(requester)
231244

232245
self._event_creation_handler.validator.validate_builder(
@@ -285,7 +298,10 @@ async def cancel(self, requester: Requester, delay_id: str) -> None:
285298
NotFoundError: if no matching delayed event could be found.
286299
"""
287300
assert self._is_master
288-
await self._request_ratelimiter.ratelimit(requester)
301+
await self._delayed_event_mgmt_ratelimiter.ratelimit(
302+
requester,
303+
(requester.user.to_string(), requester.device_id),
304+
)
289305
await self._initialized_from_db
290306

291307
next_send_ts = await self._store.cancel_delayed_event(
@@ -308,7 +324,10 @@ async def restart(self, requester: Requester, delay_id: str) -> None:
308324
NotFoundError: if no matching delayed event could be found.
309325
"""
310326
assert self._is_master
311-
await self._request_ratelimiter.ratelimit(requester)
327+
await self._delayed_event_mgmt_ratelimiter.ratelimit(
328+
requester,
329+
(requester.user.to_string(), requester.device_id),
330+
)
312331
await self._initialized_from_db
313332

314333
next_send_ts = await self._store.restart_delayed_event(
@@ -332,6 +351,8 @@ async def send(self, requester: Requester, delay_id: str) -> None:
332351
NotFoundError: if no matching delayed event could be found.
333352
"""
334353
assert self._is_master
354+
# Use standard request limiter for sending delayed events on-demand,
355+
# as an on-demand send is similar to sending a regular event.
335356
await self._request_ratelimiter.ratelimit(requester)
336357
await self._initialized_from_db
337358

@@ -415,7 +436,10 @@ def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
415436

416437
async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
417438
"""Return all pending delayed events requested by the given user."""
418-
await self._request_ratelimiter.ratelimit(requester)
439+
await self._delayed_event_mgmt_ratelimiter.ratelimit(
440+
requester,
441+
(requester.user.to_string(), requester.device_id),
442+
)
419443
return await self._store.get_all_delayed_events_for_user(
420444
requester.user.localpart
421445
)

tests/rest/client/test_delayed_events.py

+143
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,27 @@ def test_delayed_state_events_are_sent_on_timeout(self) -> None:
109109
)
110110
self.assertEqual(setter_expected, content.get(setter_key), content)
111111

112+
@unittest.override_config(
113+
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
114+
)
115+
def test_get_delayed_events_ratelimit(self) -> None:
116+
args = ("GET", PATH_PREFIX)
117+
118+
channel = self.make_request(*args)
119+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
120+
121+
channel = self.make_request(*args)
122+
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
123+
124+
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
125+
self.get_success(
126+
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
127+
)
128+
129+
# Test that the request isn't ratelimited anymore.
130+
channel = self.make_request(*args)
131+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
132+
112133
def test_update_delayed_event_without_id(self) -> None:
113134
channel = self.make_request(
114135
"POST",
@@ -206,6 +227,46 @@ def test_cancel_delayed_state_event(self) -> None:
206227
expect_code=HTTPStatus.NOT_FOUND,
207228
)
208229

230+
@unittest.override_config(
231+
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
232+
)
233+
def test_cancel_delayed_event_ratelimit(self) -> None:
234+
delay_ids = []
235+
for _ in range(2):
236+
channel = self.make_request(
237+
"POST",
238+
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
239+
{},
240+
)
241+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
242+
delay_id = channel.json_body.get("delay_id")
243+
self.assertIsNotNone(delay_id)
244+
delay_ids.append(delay_id)
245+
246+
channel = self.make_request(
247+
"POST",
248+
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
249+
{"action": "cancel"},
250+
)
251+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
252+
253+
args = (
254+
"POST",
255+
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
256+
{"action": "cancel"},
257+
)
258+
channel = self.make_request(*args)
259+
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
260+
261+
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
262+
self.get_success(
263+
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
264+
)
265+
266+
# Test that the request isn't ratelimited anymore.
267+
channel = self.make_request(*args)
268+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
269+
209270
def test_send_delayed_state_event(self) -> None:
210271
state_key = "to_send_on_request"
211272

@@ -250,6 +311,44 @@ def test_send_delayed_state_event(self) -> None:
250311
)
251312
self.assertEqual(setter_expected, content.get(setter_key), content)
252313

314+
@unittest.override_config({"rc_message": {"per_second": 3.5, "burst_count": 4}})
315+
def test_send_delayed_event_ratelimit(self) -> None:
316+
delay_ids = []
317+
for _ in range(2):
318+
channel = self.make_request(
319+
"POST",
320+
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
321+
{},
322+
)
323+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
324+
delay_id = channel.json_body.get("delay_id")
325+
self.assertIsNotNone(delay_id)
326+
delay_ids.append(delay_id)
327+
328+
channel = self.make_request(
329+
"POST",
330+
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
331+
{"action": "send"},
332+
)
333+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
334+
335+
args = (
336+
"POST",
337+
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
338+
{"action": "send"},
339+
)
340+
channel = self.make_request(*args)
341+
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
342+
343+
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
344+
self.get_success(
345+
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
346+
)
347+
348+
# Test that the request isn't ratelimited anymore.
349+
channel = self.make_request(*args)
350+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
351+
253352
def test_restart_delayed_state_event(self) -> None:
254353
state_key = "to_send_on_restarted_timeout"
255354

@@ -309,6 +408,46 @@ def test_restart_delayed_state_event(self) -> None:
309408
)
310409
self.assertEqual(setter_expected, content.get(setter_key), content)
311410

411+
@unittest.override_config(
412+
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
413+
)
414+
def test_restart_delayed_event_ratelimit(self) -> None:
415+
delay_ids = []
416+
for _ in range(2):
417+
channel = self.make_request(
418+
"POST",
419+
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
420+
{},
421+
)
422+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
423+
delay_id = channel.json_body.get("delay_id")
424+
self.assertIsNotNone(delay_id)
425+
delay_ids.append(delay_id)
426+
427+
channel = self.make_request(
428+
"POST",
429+
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
430+
{"action": "restart"},
431+
)
432+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
433+
434+
args = (
435+
"POST",
436+
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
437+
{"action": "restart"},
438+
)
439+
channel = self.make_request(*args)
440+
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
441+
442+
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
443+
self.get_success(
444+
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
445+
)
446+
447+
# Test that the request isn't ratelimited anymore.
448+
channel = self.make_request(*args)
449+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
450+
312451
def test_delayed_state_events_are_cancelled_by_more_recent_state(self) -> None:
313452
state_key = "to_be_cancelled"
314453

@@ -374,3 +513,7 @@ def _get_path_for_delayed_state(
374513
room_id: str, event_type: str, state_key: str, delay_ms: int
375514
) -> str:
376515
return f"rooms/{room_id}/state/{event_type}/{state_key}?org.matrix.msc4140.delay={delay_ms}"
516+
517+
518+
def _get_path_for_delayed_send(room_id: str, event_type: str, delay_ms: int) -> str:
519+
return f"rooms/{room_id}/send/{event_type}?org.matrix.msc4140.delay={delay_ms}"

tests/rest/client/test_rooms.py

+35
Original file line numberDiff line numberDiff line change
@@ -2399,6 +2399,41 @@ def test_send_delayed_state_event(self) -> None:
23992399
)
24002400
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
24012401

2402+
@unittest.override_config(
2403+
{
2404+
"max_event_delay_duration": "24h",
2405+
"rc_message": {"per_second": 1, "burst_count": 2},
2406+
}
2407+
)
2408+
def test_add_delayed_event_ratelimit(self) -> None:
2409+
"""Test that requests to schedule new delayed events are ratelimited by a RateLimiter,
2410+
which ratelimits them correctly, including by not limiting when the requester is
2411+
exempt from ratelimiting.
2412+
"""
2413+
2414+
# Test that new delayed events are correctly ratelimited.
2415+
args = (
2416+
"POST",
2417+
(
2418+
"rooms/%s/send/m.room.message?org.matrix.msc4140.delay=2000"
2419+
% self.room_id
2420+
).encode("ascii"),
2421+
{"body": "test", "msgtype": "m.text"},
2422+
)
2423+
channel = self.make_request(*args)
2424+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
2425+
channel = self.make_request(*args)
2426+
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)
2427+
2428+
# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
2429+
self.get_success(
2430+
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
2431+
)
2432+
2433+
# Test that the new delayed events aren't ratelimited anymore.
2434+
channel = self.make_request(*args)
2435+
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
2436+
24022437

24032438
class RoomSearchTestCase(unittest.HomeserverTestCase):
24042439
servlets = [

0 commit comments

Comments
 (0)