@@ -163,24 +163,22 @@ class SlidingSyncMembershipInfo:
163
163
sender : str
164
164
membership_event_id : str
165
165
membership : str
166
+
167
+
168
+ @attr .s (slots = True , auto_attribs = True )
169
+ class SlidingSyncMembershipInfoWithEventPos (SlidingSyncMembershipInfo ):
170
+ """
171
+ SlidingSyncMembershipInfo + `stream_ordering`/`instance_name` of the membership
172
+ event
173
+ """
174
+
166
175
membership_event_stream_ordering : int
167
176
membership_event_instance_name : str
168
177
169
178
170
179
@attr .s (slots = True , auto_attribs = True )
171
180
class SlidingSyncTableChanges :
172
181
room_id : str
173
- # `stream_ordering` of the most recent event being persisted in the room. This doesn't
174
- # need to be perfect, we just need *some* answer that points to a real event in the
175
- # room in case we are the first ones inserting into the `sliding_sync_joined_rooms`
176
- # table because of the `NON NULL` constraint on `event_stream_ordering`. In reality,
177
- # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
178
- # `_update_current_state_txn()` whenever a new event is persisted to update it to the
179
- # correct latest value.
180
- #
181
- # This should be *some* value that points to a real event in the room if we are
182
- # still joined to the room and some state is changing (`to_insert` or `to_delete`).
183
- joined_room_best_effort_most_recent_stream_ordering : Optional [int ]
184
182
# If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to
185
183
# fully-insert it which means we also need to include a `bump_stamp` value to use
186
184
# for the row. This should only be populated when we're trying to fully-insert a
@@ -401,6 +399,9 @@ async def _calculate_sliding_sync_table_changes(
401
399
`stream_ordering`).
402
400
delta_state: Deltas that are going to be used to update the
403
401
`current_state_events` table. Changes to the current state of the room.
402
+
403
+ Returns:
404
+ SlidingSyncTableChanges
404
405
"""
405
406
to_insert = delta_state .to_insert
406
407
to_delete = delta_state .to_delete
@@ -410,7 +411,6 @@ async def _calculate_sliding_sync_table_changes(
410
411
if not to_insert and not to_delete :
411
412
return SlidingSyncTableChanges (
412
413
room_id = room_id ,
413
- joined_room_best_effort_most_recent_stream_ordering = None ,
414
414
joined_room_bump_stamp_to_fully_insert = None ,
415
415
joined_room_updates = {},
416
416
membership_snapshot_shared_insert_values = {},
@@ -469,24 +469,24 @@ async def _calculate_sliding_sync_table_changes(
469
469
membership_event_id ,
470
470
user_id ,
471
471
) in membership_event_id_to_user_id_map .items ():
472
- # We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point
473
- membership_event_stream_ordering = membership_event_map [
474
- membership_event_id
475
- ].internal_metadata .stream_ordering
476
- assert membership_event_stream_ordering is not None
477
- membership_event_instance_name = membership_event_map [
478
- membership_event_id
479
- ].internal_metadata .instance_name
480
- assert membership_event_instance_name is not None
481
-
482
472
membership_infos_to_insert_membership_snapshots .append (
473
+ # XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
474
+ # because we're sourcing the event from `events_and_contexts`, we
475
+ # can't rely on `stream_ordering`/`instance_name` being correct. We
476
+ # could be working with events that were previously persisted as an
477
+ # `outlier` with one `stream_ordering` but are now being persisted
478
+ # again and de-outliered and assigned a different `stream_ordering`
479
+ # that won't end up being used. Since we call
480
+ # `_calculate_sliding_sync_table_changes()` before
481
+ # `_update_outliers_txn()` which fixes this discrepancy (always use
482
+ # the `stream_ordering` from the first time it was persisted), we're
483
+ # working with an unreliable `stream_ordering` value that will
484
+ # possibly be unused and not make it into the `events` table.
483
485
SlidingSyncMembershipInfo (
484
486
user_id = user_id ,
485
487
sender = membership_event_map [membership_event_id ].sender ,
486
488
membership_event_id = membership_event_id ,
487
489
membership = membership_event_map [membership_event_id ].membership ,
488
- membership_event_stream_ordering = membership_event_stream_ordering ,
489
- membership_event_instance_name = membership_event_instance_name ,
490
490
)
491
491
)
492
492
@@ -568,7 +568,6 @@ async def _calculate_sliding_sync_table_changes(
568
568
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`)
569
569
#
570
570
joined_room_updates : SlidingSyncStateInsertValues = {}
571
- best_effort_most_recent_stream_ordering : Optional [int ] = None
572
571
bump_stamp_to_fully_insert : Optional [int ] = None
573
572
if not delta_state .no_longer_in_room :
574
573
current_state_ids_map = {}
@@ -632,9 +631,7 @@ async def _calculate_sliding_sync_table_changes(
632
631
633
632
# Otherwise, we need to find a couple events that we were reset to.
634
633
if missing_event_ids :
635
- remaining_events = await self .store .get_events (
636
- current_state_ids_map .values ()
637
- )
634
+ remaining_events = await self .store .get_events (missing_event_ids )
638
635
# There shouldn't be any missing events
639
636
assert (
640
637
remaining_events .keys () == missing_event_ids
@@ -657,52 +654,9 @@ async def _calculate_sliding_sync_table_changes(
657
654
elif state_key == (EventTypes .Name , "" ):
658
655
joined_room_updates ["room_name" ] = None
659
656
660
- # Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to
661
- # be perfect, we just need *some* answer that points to a real event in the
662
- # room in case we are the first ones inserting into the
663
- # `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on
664
- # `event_stream_ordering`. In reality,
665
- # `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after
666
- # `_update_current_state_txn()` whenever a new event is persisted to update
667
- # it to the correct latest value.
668
- #
669
- if len (events_and_contexts ) > 0 :
670
- # Since the list is sorted ascending by `stream_ordering`, the last event
671
- # should have the highest `stream_ordering`.
672
- best_effort_most_recent_stream_ordering = events_and_contexts [- 1 ][
673
- 0
674
- ].internal_metadata .stream_ordering
675
- else :
676
- # If there are no `events_and_contexts`, we assume it's one of two scenarios:
677
- # 1. If there are new state `to_insert` but no `events_and_contexts`,
678
- # then it's a state reset.
679
- # 2. Otherwise, it's some partial-state room re-syncing the current state and
680
- # going through un-partial process.
681
- #
682
- # Either way, we assume no new events are being persisted and we can
683
- # find the latest already in the database. Since this is a best-effort
684
- # value, we don't need to be perfect although I think we're pretty close
685
- # here.
686
- most_recent_event_pos_results = (
687
- await self .store .get_last_event_pos_in_room (
688
- room_id , event_types = None
689
- )
690
- )
691
- assert most_recent_event_pos_results , (
692
- f"We should not be seeing `None` here because we are still in the room ({ room_id } ) and "
693
- + "it should at-least have a join membership event that's keeping us here."
694
- )
695
- best_effort_most_recent_stream_ordering = most_recent_event_pos_results [
696
- 1
697
- ].stream
698
-
699
- # We should have found a value if we are still in the room
700
- assert best_effort_most_recent_stream_ordering is not None
701
-
702
657
return SlidingSyncTableChanges (
703
658
room_id = room_id ,
704
659
# For `sliding_sync_joined_rooms`
705
- joined_room_best_effort_most_recent_stream_ordering = best_effort_most_recent_stream_ordering ,
706
660
joined_room_bump_stamp_to_fully_insert = bump_stamp_to_fully_insert ,
707
661
joined_room_updates = joined_room_updates ,
708
662
# For `sliding_sync_membership_snapshots`
@@ -1773,31 +1727,53 @@ def _update_current_state_txn(
1773
1727
#
1774
1728
# We only need to update when one of the relevant state values has changed
1775
1729
if sliding_sync_table_changes .joined_room_updates :
1776
- # This should be *some* value that points to a real event in the room if
1777
- # we are still joined to the room.
1778
- assert (
1779
- sliding_sync_table_changes . joined_room_best_effort_most_recent_stream_ordering
1780
- is not None
1730
+ sliding_sync_updates_keys = (
1731
+ sliding_sync_table_changes . joined_room_updates . keys ()
1732
+ )
1733
+ sliding_sync_updates_values = (
1734
+ sliding_sync_table_changes . joined_room_updates . values ()
1781
1735
)
1782
1736
1783
- self .db_pool .simple_upsert_txn (
1784
- txn ,
1785
- table = "sliding_sync_joined_rooms" ,
1786
- keyvalues = {"room_id" : room_id },
1787
- values = sliding_sync_table_changes .joined_room_updates ,
1788
- insertion_values = {
1789
- # The reason we're only *inserting* (not *updating*)
1790
- # `event_stream_ordering` here is because the column has a `NON
1791
- # NULL` constraint and we need *some* answer. And if the row
1792
- # already exists, it already has the correct value and it's
1793
- # better to just rely on
1794
- # `_update_sliding_sync_tables_with_new_persisted_events_txn()`
1795
- # to do the right thing (same for `bump_stamp`).
1796
- "event_stream_ordering" : sliding_sync_table_changes .joined_room_best_effort_most_recent_stream_ordering ,
1797
- # If we're trying to fully-insert a row, we need to provide a
1798
- # value for `bump_stamp` if it exists for the room.
1799
- "bump_stamp" : sliding_sync_table_changes .joined_room_bump_stamp_to_fully_insert ,
1800
- },
1737
+ args : List [Any ] = [
1738
+ room_id ,
1739
+ room_id ,
1740
+ sliding_sync_table_changes .joined_room_bump_stamp_to_fully_insert ,
1741
+ ]
1742
+ args .extend (iter (sliding_sync_updates_values ))
1743
+
1744
+ # XXX: We use a sub-query for `stream_ordering` because it's unreliable to
1745
+ # pre-calculate from `events_and_contexts` at the time when
1746
+ # `_calculate_sliding_sync_table_changes()` is ran. We could be working
1747
+ # with events that were previously persisted as an `outlier` with one
1748
+ # `stream_ordering` but are now being persisted again and de-outliered
1749
+ # and assigned a different `stream_ordering`. Since we call
1750
+ # `_calculate_sliding_sync_table_changes()` before
1751
+ # `_update_outliers_txn()` which fixes this discrepancy (always use the
1752
+ # `stream_ordering` from the first time it was persisted), we're working
1753
+ # with an unreliable `stream_ordering` value that will possibly be
1754
+ # unused and not make it into the `events` table.
1755
+ #
1756
+ # We don't update `event_stream_ordering` `ON CONFLICT` because it's
1757
+ # simpler and we can just rely on
1758
+ # `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do
1759
+ # the right thing (same for `bump_stamp`). The only reason we're
1760
+ # inserting `event_stream_ordering` here is because the column has a
1761
+ # `NON NULL` constraint and we need some answer.
1762
+ txn .execute (
1763
+ f"""
1764
+ INSERT INTO sliding_sync_joined_rooms
1765
+ (room_id, event_stream_ordering, bump_stamp, { ", " .join (sliding_sync_updates_keys )} )
1766
+ VALUES (
1767
+ ?,
1768
+ (SELECT stream_ordering FROM events WHERE room_id = ? ORDER BY stream_ordering DESC LIMIT 1),
1769
+ ?,
1770
+ { ", " .join ("?" for _ in sliding_sync_updates_values )}
1771
+ )
1772
+ ON CONFLICT (room_id)
1773
+ DO UPDATE SET
1774
+ { ", " .join (f"{ key } = EXCLUDED.{ key } " for key in sliding_sync_updates_keys )}
1775
+ """ ,
1776
+ args ,
1801
1777
)
1802
1778
1803
1779
# We now update `local_current_membership`. We do this regardless
@@ -1854,38 +1830,63 @@ def _update_current_state_txn(
1854
1830
if sliding_sync_table_changes .to_insert_membership_snapshots :
1855
1831
# Update the `sliding_sync_membership_snapshots` table
1856
1832
#
1857
- # We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys`
1858
- # because there are other fields in the `ON CONFLICT` upsert to run (see
1859
- # inherit case above for more context when this happens).
1860
- self .db_pool .simple_upsert_many_txn (
1861
- txn = txn ,
1862
- table = "sliding_sync_membership_snapshots" ,
1863
- key_names = ("room_id" , "user_id" ),
1864
- key_values = [
1865
- (room_id , membership_info .user_id )
1866
- for membership_info in sliding_sync_table_changes .to_insert_membership_snapshots
1867
- ],
1868
- value_names = [
1869
- "sender" ,
1870
- "membership_event_id" ,
1871
- "membership" ,
1872
- "event_stream_ordering" ,
1873
- "event_instance_name" ,
1874
- ]
1875
- + list (
1876
- sliding_sync_table_changes .membership_snapshot_shared_insert_values .keys ()
1877
- ),
1878
- value_values = [
1833
+ sliding_sync_snapshot_keys = (
1834
+ sliding_sync_table_changes .membership_snapshot_shared_insert_values .keys ()
1835
+ )
1836
+ sliding_sync_snapshot_values = (
1837
+ sliding_sync_table_changes .membership_snapshot_shared_insert_values .values ()
1838
+ )
1839
+ # We need to insert/update regardless of whether we have
1840
+ # `sliding_sync_snapshot_keys` because there are other fields in the `ON
1841
+ # CONFLICT` upsert to run (see inherit case (explained in
1842
+ # `_calculate_sliding_sync_table_changes()`) for more context when this
1843
+ # happens).
1844
+ #
1845
+ # XXX: We use a sub-query for `stream_ordering` because it's unreliable to
1846
+ # pre-calculate from `events_and_contexts` at the time when
1847
+ # `_calculate_sliding_sync_table_changes()` is ran. We could be working with
1848
+ # events that were previously persisted as an `outlier` with one
1849
+ # `stream_ordering` but are now being persisted again and de-outliered and
1850
+ # assigned a different `stream_ordering` that won't end up being used. Since
1851
+ # we call `_calculate_sliding_sync_table_changes()` before
1852
+ # `_update_outliers_txn()` which fixes this discrepancy (always use the
1853
+ # `stream_ordering` from the first time it was persisted), we're working
1854
+ # with an unreliable `stream_ordering` value that will possibly be unused
1855
+ # and not make it into the `events` table.
1856
+ txn .execute_batch (
1857
+ f"""
1858
+ INSERT INTO sliding_sync_membership_snapshots
1859
+ (room_id, user_id, sender, membership_event_id, membership, event_stream_ordering, event_instance_name
1860
+ { ("," + ", " .join (sliding_sync_snapshot_keys )) if sliding_sync_snapshot_keys else "" } )
1861
+ VALUES (
1862
+ ?, ?, ?, ?, ?,
1863
+ (SELECT stream_ordering FROM events WHERE event_id = ?),
1864
+ (SELECT instance_name FROM events WHERE event_id = ?)
1865
+ { ("," + ", " .join ("?" for _ in sliding_sync_snapshot_values )) if sliding_sync_snapshot_values else "" }
1866
+ )
1867
+ ON CONFLICT (room_id, user_id)
1868
+ DO UPDATE SET
1869
+ sender = EXCLUDED.sender,
1870
+ membership_event_id = EXCLUDED.membership_event_id,
1871
+ membership = EXCLUDED.membership,
1872
+ event_stream_ordering = EXCLUDED.event_stream_ordering
1873
+ { ("," + ", " .join (f"{ key } = EXCLUDED.{ key } " for key in sliding_sync_snapshot_keys )) if sliding_sync_snapshot_keys else "" }
1874
+ """ ,
1875
+ [
1879
1876
[
1877
+ room_id ,
1878
+ membership_info .user_id ,
1880
1879
membership_info .sender ,
1881
1880
membership_info .membership_event_id ,
1882
1881
membership_info .membership ,
1883
- membership_info .membership_event_stream_ordering ,
1884
- membership_info .membership_event_instance_name ,
1882
+ # XXX: We do not use `membership_info.membership_event_stream_ordering` here
1883
+ # because it is an unreliable value. See XXX note above.
1884
+ membership_info .membership_event_id ,
1885
+ # XXX: We do not use `membership_info.membership_event_instance_name` here
1886
+ # because it is an unreliable value. See XXX note above.
1887
+ membership_info .membership_event_id ,
1885
1888
]
1886
- + list (
1887
- sliding_sync_table_changes .membership_snapshot_shared_insert_values .values ()
1888
- )
1889
+ + list (sliding_sync_snapshot_values )
1889
1890
for membership_info in sliding_sync_table_changes .to_insert_membership_snapshots
1890
1891
],
1891
1892
)
0 commit comments