-
Notifications
You must be signed in to change notification settings - Fork 321
Sliding Sync: Get bump_stamp
from new sliding sync tables because it's faster
#17658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d0a6e2f
d24eb32
b5ea1e2
7830b7a
c8103d9
523ecbc
db31c3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1040,29 +1040,67 @@ async def get_room_sync_data( | |
) | ||
) | ||
|
||
# By default, just choose the membership event position | ||
# Figure out the last bump event in the room | ||
# | ||
# By default, just choose the membership event position for any non-join membership | ||
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream | ||
|
||
# Figure out the last bump event in the room if we're in the room. | ||
# If we're joined to the room, we need to find the last bump event before the | ||
# `to_token` | ||
if room_membership_for_user_at_to_token.membership == Membership.JOIN: | ||
last_bump_event_result = ( | ||
await self.store.get_last_event_pos_in_room_before_stream_ordering( | ||
room_id, | ||
to_token.room_key, | ||
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, | ||
) | ||
# We can quickly query for the latest bump event in the room using the | ||
# sliding sync tables. | ||
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room( | ||
room_id | ||
) | ||
Comment on lines
+1050
to
1054
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the new functionality that this PR is introducing. We're fetching the |
||
|
||
# But if we found a bump event, use that instead | ||
if last_bump_event_result is not None: | ||
_, new_bump_event_pos = last_bump_event_result | ||
min_to_token_position = to_token.room_key.stream | ||
|
||
# If we've just joined a remote room, then the last bump event may | ||
# have been backfilled (and so have a negative stream ordering). | ||
# These negative stream orderings can't sensibly be compared, so | ||
# instead we use the membership event position. | ||
if new_bump_event_pos.stream > 0: | ||
bump_stamp = new_bump_event_pos.stream | ||
# If we can rely on the new sliding sync tables and the `bump_stamp` is | ||
# `None`, just fallback to the membership event position. This can happen | ||
# when we've just joined a remote room and all the events are backfilled. | ||
if ( | ||
# FIXME: The background job check can be removed once we bump | ||
# `SCHEMA_COMPAT_VERSION` and run the foreground update for | ||
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` | ||
# (tracked by https://github.com/element-hq/synapse/issues/17623) | ||
await self.store.have_finished_sliding_sync_background_jobs() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we need to check this for the second clause too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only need it for the first clause. If the value is But if we have some value in the table, we can use it (second clause). Instead of using |
||
and latest_room_bump_stamp is None | ||
): | ||
pass | ||
|
||
# The `bump_stamp` stored in the database might be ahead of our token. Since | ||
# `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure | ||
# that's before the `to_token` in all scenarios. The only scenario we can be | ||
# sure of is if the `bump_stamp` is totally before the minimum position from | ||
# the token. | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# | ||
# We don't need to check if the background update has finished, as if the | ||
# returned bump stamp is not None then it must be up to date. | ||
elif ( | ||
latest_room_bump_stamp is not None | ||
and latest_room_bump_stamp < min_to_token_position | ||
): | ||
bump_stamp = latest_room_bump_stamp | ||
|
||
# Otherwise, if it's within or after the `to_token`, we need to find the | ||
# last bump event before the `to_token`. | ||
else: | ||
last_bump_event_result = ( | ||
await self.store.get_last_event_pos_in_room_before_stream_ordering( | ||
room_id, | ||
to_token.room_key, | ||
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, | ||
) | ||
) | ||
if last_bump_event_result is not None: | ||
_, new_bump_event_pos = last_bump_event_result | ||
|
||
# If we've just joined a remote room, then the last bump event may | ||
# have been backfilled (and so have a negative stream ordering). | ||
# These negative stream orderings can't sensibly be compared, so | ||
# instead we use the membership event position. | ||
if new_bump_event_pos.stream > 0: | ||
bump_stamp = new_bump_event_pos.stream | ||
|
||
unstable_expanded_timeline = False | ||
prev_room_sync_config = previous_connection_state.room_configs.get(room_id) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -327,6 +327,13 @@ async def _persist_events_and_state_updates( | |
|
||
async with stream_ordering_manager as stream_orderings: | ||
for (event, _), stream in zip(events_and_contexts, stream_orderings): | ||
# XXX: We can't rely on `stream_ordering`/`instance_name` being correct | ||
# at this point. We could be working with events that were previously | ||
# persisted as an `outlier` with one `stream_ordering` but are now being | ||
# persisted again and de-outliered and are being assigned a different | ||
# `stream_ordering` here that won't end up being used. | ||
# `_update_outliers_txn()` will fix this discrepancy (always use the | ||
# `stream_ordering` from the first time it was persisted). | ||
event.internal_metadata.stream_ordering = stream | ||
event.internal_metadata.instance_name = self._instance_name | ||
|
||
|
@@ -470,11 +477,11 @@ async def _calculate_sliding_sync_table_changes( | |
membership_infos_to_insert_membership_snapshots.append( | ||
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here | ||
# because we're sourcing the event from `events_and_contexts`, we | ||
# can't rely on `stream_ordering`/`instance_name` being correct. We | ||
# could be working with events that were previously persisted as an | ||
# `outlier` with one `stream_ordering` but are now being persisted | ||
# again and de-outliered and assigned a different `stream_ordering` | ||
# that won't end up being used. Since we call | ||
# can't rely on `stream_ordering`/`instance_name` being correct at | ||
# this point. We could be working with events that were previously | ||
# persisted as an `outlier` with one `stream_ordering` but are now | ||
# being persisted again and de-outliered and assigned a different | ||
# `stream_ordering` that won't end up being used. Since we call | ||
# `_calculate_sliding_sync_table_changes()` before | ||
# `_update_outliers_txn()` which fixes this discrepancy (always use | ||
# the `stream_ordering` from the first time it was persisted), we're | ||
|
@@ -591,11 +598,17 @@ async def _calculate_sliding_sync_table_changes( | |
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, | ||
) | ||
) | ||
bump_stamp_to_fully_insert = ( | ||
most_recent_bump_event_pos_results[1].stream | ||
if most_recent_bump_event_pos_results is not None | ||
else None | ||
) | ||
if most_recent_bump_event_pos_results is not None: | ||
_, new_bump_event_pos = most_recent_bump_event_pos_results | ||
|
||
# If we've just joined a remote room, then the last bump event may | ||
# have been backfilled (and so have a negative stream ordering). | ||
# These negative stream orderings can't sensibly be compared, so | ||
# instead just leave it as `None` in the table and we will use their | ||
# membership event position as the bump event position in the | ||
# Sliding Sync API. | ||
if new_bump_event_pos.stream > 0: | ||
bump_stamp_to_fully_insert = new_bump_event_pos.stream | ||
Comment on lines
+604
to
+611
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to add this bug fix here because now that we're using the new tables, the rest-layer tests would fail with the previous flawed logic storing the negative backfilled I also added a new test, |
||
|
||
current_state_ids_map = dict( | ||
await self.store.get_partial_filtered_current_state_ids( | ||
|
@@ -2123,31 +2136,26 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn( | |
if len(events_and_contexts) == 0: | ||
return | ||
|
||
# We only update the sliding sync tables for non-backfilled events. | ||
# | ||
# Check if the first event is a backfilled event (with a negative | ||
# `stream_ordering`). If one event is backfilled, we assume this whole batch was | ||
# backfilled. | ||
first_event_stream_ordering = events_and_contexts[0][ | ||
0 | ||
].internal_metadata.stream_ordering | ||
# This should exist for persisted events | ||
assert first_event_stream_ordering is not None | ||
if first_event_stream_ordering < 0: | ||
return | ||
|
||
# Since the list is sorted ascending by `stream_ordering`, the last event should | ||
# have the highest `stream_ordering`. | ||
max_stream_ordering = events_and_contexts[-1][ | ||
0 | ||
].internal_metadata.stream_ordering | ||
# `stream_ordering` should be assigned for persisted events | ||
assert max_stream_ordering is not None | ||
# Check if the event is a backfilled event (with a negative `stream_ordering`). | ||
# If one event is backfilled, we assume this whole batch was backfilled. | ||
if max_stream_ordering < 0: | ||
# We only update the sliding sync tables for non-backfilled events. | ||
return | ||
Comment on lines
+2144
to
+2150
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unrelated to the change to using new tables or the bugfix. Just re-arranging to be a little more sane. Should have no functional difference from before. |
||
|
||
max_bump_stamp = None | ||
for event, _ in reversed(events_and_contexts): | ||
# Sanity check that all events belong to the same room | ||
assert event.room_id == room_id | ||
|
||
if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES: | ||
# This should exist for persisted events | ||
# `stream_ordering` should be assigned for persisted events | ||
assert event.internal_metadata.stream_ordering is not None | ||
|
||
max_bump_stamp = event.internal_metadata.stream_ordering | ||
|
@@ -2156,11 +2164,6 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn( | |
# matching bump event which should have the highest `stream_ordering`. | ||
break | ||
|
||
# We should have exited earlier if there were no events | ||
assert ( | ||
max_stream_ordering is not None | ||
), "Expected to have a stream_ordering if we have events" | ||
|
||
# Handle updating the `sliding_sync_joined_rooms` table. | ||
# | ||
txn.execute( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a separate PR maybe: we should also check in
timeline
for events that are bump events and use the stream ordering from that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#17684