Skip to content

Commit 1a6b718

Browse files
Sliding Sync: Pre-populate room data for quick filtering/sorting (#17512)
Pre-populate room data for quick filtering/sorting in the Sliding Sync API Spawning from #17450 (comment) This PR is acting as the Synapse version `N+1` step in the gradual migration being tracked by #17623 Adding two new database tables: - `sliding_sync_joined_rooms`: A table for storing room meta data that the local server is still participating in. The info here can be shared across all `Membership.JOIN`. Keyed on `(room_id)` and updated when the relevant room current state changes or a new event is sent in the room. - `sliding_sync_membership_snapshots`: A table for storing a snapshot of room meta data at the time of the local user's membership. Keyed on `(room_id, user_id)` and only updated when a user's membership in a room changes. Also adds background updates to populate these tables with all of the existing data. We want to have the guarantee that if a row exists in the sliding sync tables, we are able to rely on it (accurate data). And if a row doesn't exist, we use a fallback to get the same info until the background updates fill in the rows or a new event comes in triggering it to be fully inserted. This means we need a couple extra things in place until we bump `SCHEMA_COMPAT_VERSION` and run the foreground update in the `N+2` part of the gradual migration. For context on why we can't rely on the tables without these things see [1]. 1. On start-up, block until we clear out any rows for the rooms that have had events since the max-`stream_ordering` of the `sliding_sync_joined_rooms` table (compare to max-`stream_ordering` of the `events` table). For `sliding_sync_membership_snapshots`, we can compare to the max-`stream_ordering` of `local_current_membership` - This accounts for when someone downgrades their Synapse version and then upgrades it again. This will ensure that we don't have any stale/out-of-date data in the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables since any new events sent in rooms would have also needed to be written to the sliding sync tables. For example a new event needs to bump `event_stream_ordering` in `sliding_sync_joined_rooms` table or some state in the room changing (like the room name). Or another example of someone's membership changing in a room affecting `sliding_sync_membership_snapshots`. 1. Add another background update that will catch-up with any rows that were just deleted from the sliding sync tables (based on the activity in the `events`/`local_current_membership`). The rooms that need recalculating are added to the `sliding_sync_joined_rooms_to_recalculate` table. 1. Making sure rows are fully inserted. Instead of partially inserting, we need to check if the row already exists and fully insert all data if not. All of this extra functionality can be removed once the `SCHEMA_COMPAT_VERSION` is bumped with support for the new sliding sync tables so people can no longer downgrade (the `N+2` part of the gradual migration). <details> <summary><sup>[1]</sup></summary> For `sliding_sync_joined_rooms`, since we partially insert rows as state comes in, we can't rely on the existence of the row for a given `room_id`. We can't even rely on looking at whether the background update has finished. There could still be partial rows from when someone reverted their Synapse version after the background update finished, had some state changes (or new rooms), then upgraded again and more state changes happen leaving a partial row. For `sliding_sync_membership_snapshots`, we insert items as a whole except for the `forgotten` column ~~so we can rely on rows existing and just need to always use a fallback for the `forgotten` data. We can't use the `forgotten` column in the table for the same reasons above about `sliding_sync_joined_rooms`.~~ We could have an out-of-date membership from when someone reverted their Synapse version. (same problems as outlined for `sliding_sync_joined_rooms` above) Discussed in an [internal meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.dz5x6ef4mxz7) </details> ### TODO - [x] Update `stream_ordering`/`bump_stamp` - [x] Handle remote invites - [x] Handle state resets - [x] Consider adding `sender` so we can filter `LEAVE` memberships and distinguish from kicks. - [x] We should add it to be able to tell leaves from kicks - [x] Consider adding `tombstone` state to help address #17540 - [x] We should add it `tombstone_successor_room_id` - [x] Consider adding `forgotten` status to avoid extra lookup/table-join on `room_memberships` - [x] We should add it - [x] Background update to fill in values for all joined rooms and non-join membership - [x] Clean-up tables when room is deleted - [ ] Make sure tables are useful to our use case - First explored in https://github.com/element-hq/synapse/compare/erikj/ss_use_new_tables - Also explored in 76b5a57 - [x] Plan for how can we use this with a fallback - See plan discussed above in main area of the issue description - Discussed in an [internal meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.dz5x6ef4mxz7) - [x] Plan for how we can rely on this new table without a fallback - Synapse version `N+1`: (this PR) Bump `SCHEMA_VERSION` to `87`. Add new tables and background update to backfill all rows. Since this is a new table, we don't have to add any `NOT VALID` constraints and validate them when the background update completes. Read from new tables with a fallback in cases where the rows aren't filled in yet. - Synapse version `N+2`: Bump `SCHEMA_VERSION` to `88` and bump `SCHEMA_COMPAT_VERSION` to `87` because we don't want people to downgrade and miss writes while they are on an older version. Add a foreground update to finish off the backfill so we can read from new tables without the fallback. Application code can now rely on the new tables being populated. - Discussed in an [internal meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.hh7shg4cxdhj) ### Dev notes ``` SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.storage.test_events.SlidingSyncPrePopulatedTablesTestCase SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.storage.test_events.SlidingSyncPrePopulatedTablesTestCase ``` ``` SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.FilterRoomsTestCase ``` Reference: - [Development docs on background updates and worked examples of gradual migrations ](https://github.com/element-hq/synapse/blob/1dfa59b238cee0dc62163588cc9481896c288979/docs/development/database_schema.md#background-updates) - A real example of a gradual migration: matrix-org/synapse#15649 (comment) - Adding `rooms.creator` field that needed a background update to backfill data, matrix-org/synapse#10697 - Adding `rooms.room_version` that needed a background update to backfill data, matrix-org/synapse#6729 - Adding `room_stats_state.room_type` that needed a background update to backfill data, matrix-org/synapse#13031 - Tables from MSC2716: `insertion_events`, `insertion_event_edges`, `insertion_event_extremities`, `batch_events` - `current_state_events` updated in `synapse/storage/databases/main/events.py` --- ``` persist_event (adds to queue) _persist_event_batch _persist_events_and_state_updates (assigns `stream_ordering` to events) _persist_events_txn _store_event_txn _update_metadata_tables_txn _store_room_members_txn _update_current_state_txn ``` --- > Concatenated Indexes [...] (also known as multi-column, composite or combined index) > > [...] key consists of multiple columns. > > We can take advantage of the fact that the first index column is always usable for searching > > *-- https://use-the-index-luke.com/sql/where-clause/the-equals-operator/concatenated-keys* --- Dealing with `portdb` (`synapse/_scripts/synapse_port_db.py`), #17512 (comment) --- <details> <summary>SQL queries:</summary> Both of these are equivalent and work in SQLite and Postgres Options 1: ```sql WITH data_table (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) AS ( VALUES ( ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?), (SELECT stream_ordering FROM events WHERE event_id = ?), {", ".join("?" for _ in insert_values)} ) ) INSERT INTO sliding_sync_non_join_memberships (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) SELECT * FROM data_table WHERE membership != ? ON CONFLICT (room_id, user_id) DO UPDATE SET membership_event_id = EXCLUDED.membership_event_id, membership = EXCLUDED.membership, event_stream_ordering = EXCLUDED.event_stream_ordering, {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} ``` Option 2: ```sql INSERT INTO sliding_sync_non_join_memberships (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) SELECT column1 as room_id, column2 as user_id, column3 as membership_event_id, column4 as membership, column5 as event_stream_ordering, {", ".join("column" + str(i) for i in range(6, 6 + len(insert_keys)))} FROM ( VALUES ( ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?), (SELECT stream_ordering FROM events WHERE event_id = ?), {", ".join("?" for _ in insert_values)} ) ) as v WHERE membership != ? ON CONFLICT (room_id, user_id) DO UPDATE SET membership_event_id = EXCLUDED.membership_event_id, membership = EXCLUDED.membership, event_stream_ordering = EXCLUDED.event_stream_ordering, {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} ``` If we don't need the `membership` condition, we could use: ```sql INSERT INTO sliding_sync_non_join_memberships (room_id, membership_event_id, user_id, membership, event_stream_ordering, {", ".join(insert_keys)}) VALUES ( ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?), (SELECT stream_ordering FROM events WHERE event_id = ?), {", ".join("?" for _ in insert_values)} ) ON CONFLICT (room_id, user_id) DO UPDATE SET membership_event_id = EXCLUDED.membership_event_id, membership = EXCLUDED.membership, event_stream_ordering = EXCLUDED.event_stream_ordering, {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} ``` </details> ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Erik Johnston <[email protected]>
1 parent 594cd5f commit 1a6b718

22 files changed

+7408
-109
lines changed

changelog.d/17512.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

synapse/_scripts/synapse_port_db.py

+5
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@
129129
"remote_media_cache": ["authenticated"],
130130
"room_stats_state": ["is_federatable"],
131131
"rooms": ["is_public", "has_auth_chain_index"],
132+
"sliding_sync_joined_rooms": ["is_encrypted"],
133+
"sliding_sync_membership_snapshots": [
134+
"has_known_state",
135+
"is_encrypted",
136+
],
132137
"users": ["shadow_banned", "approved", "locked", "suspended"],
133138
"un_partial_stated_event_stream": ["rejection_status_changed"],
134139
"users_who_share_rooms": ["share_private"],

synapse/api/constants.py

+2
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ class EventContentFields:
245245
# `m.room.encryption`` algorithm field
246246
ENCRYPTION_ALGORITHM: Final = "algorithm"
247247

248+
TOMBSTONE_SUCCESSOR_ROOM: Final = "replacement_room"
249+
248250

249251
class EventUnsignedContentFields:
250252
"""Fields found inside the 'unsigned' data on events"""

synapse/handlers/sliding_sync/__init__.py

+8-14
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@
5757
StreamKeyType,
5858
StreamToken,
5959
)
60-
from synapse.types.handlers import SlidingSyncConfig, SlidingSyncResult
60+
from synapse.types.handlers import (
61+
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
62+
SlidingSyncConfig,
63+
SlidingSyncResult,
64+
)
6165
from synapse.types.state import StateFilter
6266
from synapse.util.async_helpers import concurrently_execute
6367
from synapse.visibility import filter_events_for_client
@@ -75,18 +79,6 @@
7579
)
7680

7781

78-
# The event types that clients should consider as new activity.
79-
DEFAULT_BUMP_EVENT_TYPES = {
80-
EventTypes.Create,
81-
EventTypes.Message,
82-
EventTypes.Encrypted,
83-
EventTypes.Sticker,
84-
EventTypes.CallInvite,
85-
EventTypes.PollStart,
86-
EventTypes.LiveLocationShareStart,
87-
}
88-
89-
9082
class SlidingSyncHandler:
9183
def __init__(self, hs: "HomeServer"):
9284
self.clock = hs.get_clock()
@@ -986,7 +978,9 @@ async def get_room_sync_data(
986978
# Figure out the last bump event in the room
987979
last_bump_event_result = (
988980
await self.store.get_last_event_pos_in_room_before_stream_ordering(
989-
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
981+
room_id,
982+
to_token.room_key,
983+
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
990984
)
991985
)
992986

synapse/storage/controllers/persist_events.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -502,8 +502,15 @@ async def _update_current_state(
502502
"""
503503
state = await self._calculate_current_state(room_id)
504504
delta = await self._calculate_state_delta(room_id, state)
505+
sliding_sync_table_changes = (
506+
await self.persist_events_store._calculate_sliding_sync_table_changes(
507+
room_id, [], delta
508+
)
509+
)
505510

506-
await self.persist_events_store.update_current_state(room_id, delta)
511+
await self.persist_events_store.update_current_state(
512+
room_id, delta, sliding_sync_table_changes
513+
)
507514

508515
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
509516
"""Calculate the current state of a room, based on the forward extremities

synapse/storage/database.py

+15-14
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
Iterable,
3636
Iterator,
3737
List,
38+
Mapping,
3839
Optional,
3940
Sequence,
4041
Tuple,
@@ -1254,9 +1255,9 @@ def simple_upsert_txn(
12541255
self,
12551256
txn: LoggingTransaction,
12561257
table: str,
1257-
keyvalues: Dict[str, Any],
1258-
values: Dict[str, Any],
1259-
insertion_values: Optional[Dict[str, Any]] = None,
1258+
keyvalues: Mapping[str, Any],
1259+
values: Mapping[str, Any],
1260+
insertion_values: Optional[Mapping[str, Any]] = None,
12601261
where_clause: Optional[str] = None,
12611262
) -> bool:
12621263
"""
@@ -1299,9 +1300,9 @@ def simple_upsert_txn_emulated(
12991300
self,
13001301
txn: LoggingTransaction,
13011302
table: str,
1302-
keyvalues: Dict[str, Any],
1303-
values: Dict[str, Any],
1304-
insertion_values: Optional[Dict[str, Any]] = None,
1303+
keyvalues: Mapping[str, Any],
1304+
values: Mapping[str, Any],
1305+
insertion_values: Optional[Mapping[str, Any]] = None,
13051306
where_clause: Optional[str] = None,
13061307
lock: bool = True,
13071308
) -> bool:
@@ -1322,7 +1323,7 @@ def simple_upsert_txn_emulated(
13221323

13231324
if lock:
13241325
# We need to lock the table :(
1325-
self.engine.lock_table(txn, table)
1326+
txn.database_engine.lock_table(txn, table)
13261327

13271328
def _getwhere(key: str) -> str:
13281329
# If the value we're passing in is None (aka NULL), we need to use
@@ -1376,13 +1377,13 @@ def _getwhere(key: str) -> str:
13761377
# successfully inserted
13771378
return True
13781379

1380+
@staticmethod
13791381
def simple_upsert_txn_native_upsert(
1380-
self,
13811382
txn: LoggingTransaction,
13821383
table: str,
1383-
keyvalues: Dict[str, Any],
1384-
values: Dict[str, Any],
1385-
insertion_values: Optional[Dict[str, Any]] = None,
1384+
keyvalues: Mapping[str, Any],
1385+
values: Mapping[str, Any],
1386+
insertion_values: Optional[Mapping[str, Any]] = None,
13861387
where_clause: Optional[str] = None,
13871388
) -> bool:
13881389
"""
@@ -1535,8 +1536,8 @@ def simple_upsert_many_txn_emulated(
15351536

15361537
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
15371538

1539+
@staticmethod
15381540
def simple_upsert_many_txn_native_upsert(
1539-
self,
15401541
txn: LoggingTransaction,
15411542
table: str,
15421543
key_names: Collection[str],
@@ -1966,8 +1967,8 @@ async def simple_update(
19661967
def simple_update_txn(
19671968
txn: LoggingTransaction,
19681969
table: str,
1969-
keyvalues: Dict[str, Any],
1970-
updatevalues: Dict[str, Any],
1970+
keyvalues: Mapping[str, Any],
1971+
updatevalues: Mapping[str, Any],
19711972
) -> int:
19721973
"""
19731974
Update rows in the given database table.

0 commit comments

Comments
 (0)