@@ -327,6 +327,13 @@ async def _persist_events_and_state_updates(
327
327
328
328
async with stream_ordering_manager as stream_orderings :
329
329
for (event , _ ), stream in zip (events_and_contexts , stream_orderings ):
330
+ # XXX: We can't rely on `stream_ordering`/`instance_name` being correct
331
+ # at this point. We could be working with events that were previously
332
+ # persisted as an `outlier` with one `stream_ordering` but are now being
333
+ # persisted again and de-outliered and are being assigned a different
334
+ # `stream_ordering` here that won't end up being used.
335
+ # `_update_outliers_txn()` will fix this discrepancy (always use the
336
+ # `stream_ordering` from the first time it was persisted).
330
337
event .internal_metadata .stream_ordering = stream
331
338
event .internal_metadata .instance_name = self ._instance_name
332
339
@@ -470,11 +477,11 @@ async def _calculate_sliding_sync_table_changes(
470
477
membership_infos_to_insert_membership_snapshots .append (
471
478
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
472
479
# because we're sourcing the event from `events_and_contexts`, we
473
- # can't rely on `stream_ordering`/`instance_name` being correct. We
474
- # could be working with events that were previously persisted as an
475
- # `outlier` with one `stream_ordering` but are now being persisted
476
- # again and de-outliered and assigned a different `stream_ordering`
477
- # that won't end up being used. Since we call
480
+ # can't rely on `stream_ordering`/`instance_name` being correct at
481
+ # this point. We could be working with events that were previously
482
+ # persisted as an `outlier` with one `stream_ordering` but are now
483
+ # being persisted again and de-outliered and assigned a different
484
+ # `stream_ordering` that won't end up being used. Since we call
478
485
# `_calculate_sliding_sync_table_changes()` before
479
486
# `_update_outliers_txn()` which fixes this discrepancy (always use
480
487
# the `stream_ordering` from the first time it was persisted), we're
@@ -591,11 +598,17 @@ async def _calculate_sliding_sync_table_changes(
591
598
event_types = SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES ,
592
599
)
593
600
)
594
- bump_stamp_to_fully_insert = (
595
- most_recent_bump_event_pos_results [1 ].stream
596
- if most_recent_bump_event_pos_results is not None
597
- else None
598
- )
601
+ if most_recent_bump_event_pos_results is not None :
602
+ _ , new_bump_event_pos = most_recent_bump_event_pos_results
603
+
604
+ # If we've just joined a remote room, then the last bump event may
605
+ # have been backfilled (and so have a negative stream ordering).
606
+ # These negative stream orderings can't sensibly be compared, so
607
+ # instead just leave it as `None` in the table and we will use their
608
+ # membership event position as the bump event position in the
609
+ # Sliding Sync API.
610
+ if new_bump_event_pos .stream > 0 :
611
+ bump_stamp_to_fully_insert = new_bump_event_pos .stream
599
612
600
613
current_state_ids_map = dict (
601
614
await self .store .get_partial_filtered_current_state_ids (
@@ -2123,31 +2136,26 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn(
2123
2136
if len (events_and_contexts ) == 0 :
2124
2137
return
2125
2138
2126
- # We only update the sliding sync tables for non-backfilled events.
2127
- #
2128
- # Check if the first event is a backfilled event (with a negative
2129
- # `stream_ordering`). If one event is backfilled, we assume this whole batch was
2130
- # backfilled.
2131
- first_event_stream_ordering = events_and_contexts [0 ][
2132
- 0
2133
- ].internal_metadata .stream_ordering
2134
- # This should exist for persisted events
2135
- assert first_event_stream_ordering is not None
2136
- if first_event_stream_ordering < 0 :
2137
- return
2138
-
2139
2139
# Since the list is sorted ascending by `stream_ordering`, the last event should
2140
2140
# have the highest `stream_ordering`.
2141
2141
max_stream_ordering = events_and_contexts [- 1 ][
2142
2142
0
2143
2143
].internal_metadata .stream_ordering
2144
+ # `stream_ordering` should be assigned for persisted events
2145
+ assert max_stream_ordering is not None
2146
+ # Check if the event is a backfilled event (with a negative `stream_ordering`).
2147
+ # If one event is backfilled, we assume this whole batch was backfilled.
2148
+ if max_stream_ordering < 0 :
2149
+ # We only update the sliding sync tables for non-backfilled events.
2150
+ return
2151
+
2144
2152
max_bump_stamp = None
2145
2153
for event , _ in reversed (events_and_contexts ):
2146
2154
# Sanity check that all events belong to the same room
2147
2155
assert event .room_id == room_id
2148
2156
2149
2157
if event .type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES :
2150
- # This should exist for persisted events
2158
+ # `stream_ordering` should be assigned for persisted events
2151
2159
assert event .internal_metadata .stream_ordering is not None
2152
2160
2153
2161
max_bump_stamp = event .internal_metadata .stream_ordering
@@ -2156,11 +2164,6 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn(
2156
2164
# matching bump event which should have the highest `stream_ordering`.
2157
2165
break
2158
2166
2159
- # We should have exited earlier if there were no events
2160
- assert (
2161
- max_stream_ordering is not None
2162
- ), "Expected to have a stream_ordering if we have events"
2163
-
2164
2167
# Handle updating the `sliding_sync_joined_rooms` table.
2165
2168
#
2166
2169
txn .execute (
0 commit comments