From 6a78c63d4738030cf2216a9dc90ba25ebb28737a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 08:56:54 +0100 Subject: [PATCH 1/7] We need to order by room_id and user_id --- synapse/storage/databases/main/events_bg_updates.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index cb23f433bcb..5c9a42bea3c 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1857,6 +1857,7 @@ async def _sliding_sync_membership_snapshots_bg_update( initial_phase = True last_room_id = progress.get("last_room_id", "") + last_user_id = progress.get("last_user_id", "") last_event_stream_ordering = progress["last_event_stream_ordering"] def _find_memberships_to_update_txn( @@ -1887,11 +1888,11 @@ def _find_memberships_to_update_txn( FROM local_current_membership AS c INNER JOIN events AS e USING (event_id) LEFT JOIN rooms AS r ON (c.room_id = r.room_id) - WHERE c.room_id > ? - ORDER BY c.room_id ASC + WHERE (c.room_id, c.user_id) > (?, ?) + ORDER BY c.room_id ASC, c.user_id ASC LIMIT ? """, - (last_room_id, batch_size), + (last_room_id, last_user_id, batch_size), ) elif last_event_stream_ordering is not None: # It's important to sort by `event_stream_ordering` *ascending* (oldest to @@ -2296,7 +2297,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: ( room_id, _room_id_from_rooms_table, - _user_id, + user_id, _sender, _membership_event_id, _membership, @@ -2308,6 +2309,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: progress = { "initial_phase": initial_phase, "last_room_id": room_id, + "last_user_id": user_id, "last_event_stream_ordering": membership_event_stream_ordering, } From 86e50ea90ea35a06d860bbf1e977648e12a90754 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 08:57:02 +0100 Subject: [PATCH 2/7] Handle corrupt events in DB --- .../databases/main/events_bg_updates.py | 25 +++++++++++++++--- .../storage/databases/main/events_worker.py | 26 ++++++++++++++++--- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 5c9a42bea3c..85f3c7d5f74 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -41,6 +41,7 @@ SlidingSyncMembershipSnapshotSharedInsertValues, SlidingSyncStateInsertValues, ) +from synapse.storage.databases.main.events_worker import DatabaseCorruptionError from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.types import Cursor @@ -2082,9 +2083,17 @@ def _find_previous_membership_txn( # have `current_state_events` and we should have some current state # for each room if current_state_ids_map: - fetched_events = await self.get_events( - current_state_ids_map.values() - ) + try: + fetched_events = await self.get_events( + current_state_ids_map.values() + ) + except DatabaseCorruptionError as e: + logger.warning( + "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", + room_id, + e, + ) + continue current_state_map: StateMap[EventBase] = { state_key: fetched_events[event_id] @@ -2183,7 +2192,15 @@ def _find_previous_membership_txn( await_full_state=False, ) - fetched_events = await self.get_events(state_ids_map.values()) + try: + fetched_events = await self.get_events(state_ids_map.values()) + except DatabaseCorruptionError as e: + logger.warning( + "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", + room_id, + e, + ) + continue state_map: StateMap[EventBase] = { state_key: fetched_events[event_id] diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6079cc4a52c..1d833908279 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -98,6 +98,26 @@ logger = logging.getLogger(__name__) +class DatabaseCorruptionError(RuntimeError): + """We found an event in the DB that has a persisted event ID that doesn't + match its computed event ID.""" + + def __init__( + self, room_id: str, persisted_event_id: str, computed_event_id: str + ) -> None: + self.room_id = room_id + self.persisted_event_id = persisted_event_id + self.computed_event_id = computed_event_id + + message = ( + f"Database corruption: Event {persisted_event_id} in room {room_id} " + f"from the database appears to have been modified (calculated " + f"event id {computed_event_id})" + ) + + super().__init__(message) + + # These values are used in the `enqueue_event` and `_fetch_loop` methods to # control how we batch/bulk fetch events from the database. # The values are plucked out of thing air to make initial sync run faster @@ -1364,10 +1384,8 @@ async def _fetch_event_ids_and_get_outstanding_redactions( if original_ev.event_id != event_id: # it's difficult to see what to do here. Pretty much all bets are off # if Synapse cannot rely on the consistency of its database. - raise RuntimeError( - f"Database corruption: Event {event_id} in room {d['room_id']} " - f"from the database appears to have been modified (calculated " - f"event id {original_ev.event_id})" + raise DatabaseCorruptionError( + d["room_id"], event_id, original_ev.event_id ) event_map[event_id] = original_ev From 2676c7ca2e0a0dce8fa681c9196ea6615d11b81c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 09:01:14 +0100 Subject: [PATCH 3/7] Handle multiple leaves/bans --- synapse/storage/databases/main/events_bg_updates.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 85f3c7d5f74..e805589d6c5 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1995,6 +1995,7 @@ def _find_previous_membership_txn( WHERE room_id = ? AND m.user_id = ? + AND (m.membership = ? OR m.membership = ?) AND e.event_id != ? ORDER BY e.topological_ordering DESC LIMIT 1 @@ -2002,6 +2003,8 @@ def _find_previous_membership_txn( ( room_id, user_id, + Membership.INVITE, + Membership.KNOCK, event_id, ), ) From e4cd5b32238274357c91e78e338963df1c2a2a51 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 09:01:39 +0100 Subject: [PATCH 4/7] Handle bans --- synapse/storage/databases/main/events_bg_updates.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index e805589d6c5..dab620ec0e5 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2137,7 +2137,7 @@ def _find_previous_membership_txn( False ) elif membership in (Membership.INVITE, Membership.KNOCK) or ( - membership == Membership.LEAVE and is_outlier + membership in (Membership.LEAVE, Membership.BAN) and is_outlier ): invite_or_knock_event_id = membership_event_id invite_or_knock_membership = membership @@ -2148,7 +2148,7 @@ def _find_previous_membership_txn( # us a consistent view of the room state regardless of your # membership (i.e. the room shouldn't disappear if your using the # `is_encrypted` filter and you leave). - if membership == Membership.LEAVE and is_outlier: + if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: invite_or_knock_event_id, invite_or_knock_membership = ( await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_bg_update._find_previous_membership", From fac2fe89d0f80a91cae8bdca18b25c9005a51aea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 11:09:21 +0100 Subject: [PATCH 5/7] Don't update stream_ordering --- synapse/storage/databases/main/events_bg_updates.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index dab620ec0e5..e819364a164 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2330,8 +2330,10 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: "initial_phase": initial_phase, "last_room_id": room_id, "last_user_id": user_id, - "last_event_stream_ordering": membership_event_stream_ordering, + "last_event_stream_ordering": last_event_stream_ordering, } + if not initial_phase: + progress["last_event_stream_ordering"] = membership_event_stream_ordering await self.db_pool.updates._background_update_progress( _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, From 12678888e70055bf361b4d0bb660c05b3760b230 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 12:57:02 +0100 Subject: [PATCH 6/7] Newsfile --- changelog.d/17636.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17636.misc diff --git a/changelog.d/17636.misc b/changelog.d/17636.misc new file mode 100644 index 00000000000..756918e2b21 --- /dev/null +++ b/changelog.d/17636.misc @@ -0,0 +1 @@ +Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. From 70c6722649f003d87e8df37db7a0c86564c50ac7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Aug 2024 13:04:08 +0100 Subject: [PATCH 7/7] Instance name may be null --- synapse/storage/databases/main/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index f3dbe5bba7c..e44b8d8e542 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1861,7 +1861,7 @@ def _update_current_state_txn( VALUES ( ?, ?, ?, ?, ?, (SELECT stream_ordering FROM events WHERE event_id = ?), - (SELECT instance_name FROM events WHERE event_id = ?) + (SELECT COALESCE(instance_name, 'master') FROM events WHERE event_id = ?) {("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""} ) ON CONFLICT (room_id, user_id)