47
47
)
48
48
from synapse .storage .databases .main .state_deltas import StateDeltasStore
49
49
from synapse .storage .databases .main .stream import StreamWorkerStore
50
+ from synapse .storage .engines import PostgresEngine
50
51
from synapse .storage .types import Cursor
51
52
from synapse .types import JsonDict , RoomStreamToken , StateMap , StrCollection
52
53
from synapse .types .handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
@@ -1877,9 +1878,29 @@ async def _sliding_sync_membership_snapshots_bg_update(
1877
1878
def _find_memberships_to_update_txn (
1878
1879
txn : LoggingTransaction ,
1879
1880
) -> List [
1880
- Tuple [str , Optional [str ], str , str , str , str , int , Optional [str ], bool ]
1881
+ Tuple [
1882
+ str ,
1883
+ Optional [str ],
1884
+ Optional [str ],
1885
+ str ,
1886
+ str ,
1887
+ str ,
1888
+ str ,
1889
+ int ,
1890
+ Optional [str ],
1891
+ bool ,
1892
+ ]
1881
1893
]:
1882
1894
# Fetch the set of event IDs that we want to update
1895
+ #
1896
+ # We skip over rows which we've already handled, i.e. have a
1897
+ # matching row in `sliding_sync_membership_snapshots` with the same
1898
+ # room, user and event ID.
1899
+ #
1900
+ # We also ignore rooms that the user has left themselves (i.e. not
1901
+ # kicked). This is to avoid having to port lots of old rooms that we
1902
+ # will never send down sliding sync (as we exclude such rooms from
1903
+ # initial syncs).
1883
1904
1884
1905
if initial_phase :
1885
1906
# There are some old out-of-band memberships (before
@@ -1892,6 +1913,7 @@ def _find_memberships_to_update_txn(
1892
1913
SELECT
1893
1914
c.room_id,
1894
1915
r.room_id,
1916
+ r.room_version,
1895
1917
c.user_id,
1896
1918
e.sender,
1897
1919
c.event_id,
@@ -1900,9 +1922,11 @@ def _find_memberships_to_update_txn(
1900
1922
e.instance_name,
1901
1923
e.outlier
1902
1924
FROM local_current_membership AS c
1925
+ LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
1903
1926
INNER JOIN events AS e USING (event_id)
1904
1927
LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
1905
1928
WHERE (c.room_id, c.user_id) > (?, ?)
1929
+ AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
1906
1930
ORDER BY c.room_id ASC, c.user_id ASC
1907
1931
LIMIT ?
1908
1932
""" ,
@@ -1922,7 +1946,8 @@ def _find_memberships_to_update_txn(
1922
1946
"""
1923
1947
SELECT
1924
1948
c.room_id,
1925
- c.room_id,
1949
+ r.room_id,
1950
+ r.room_version,
1926
1951
c.user_id,
1927
1952
e.sender,
1928
1953
c.event_id,
@@ -1931,9 +1956,12 @@ def _find_memberships_to_update_txn(
1931
1956
e.instance_name,
1932
1957
e.outlier
1933
1958
FROM local_current_membership AS c
1959
+ LEFT JOIN sliding_sync_membership_snapshots AS m USING (room_id, user_id)
1934
1960
INNER JOIN events AS e USING (event_id)
1935
- WHERE event_stream_ordering > ?
1936
- ORDER BY event_stream_ordering ASC
1961
+ LEFT JOIN rooms AS r ON (c.room_id = r.room_id)
1962
+ WHERE c.event_stream_ordering > ?
1963
+ AND (m.user_id IS NULL OR c.event_id != m.membership_event_id)
1964
+ ORDER BY c.event_stream_ordering ASC
1937
1965
LIMIT ?
1938
1966
""" ,
1939
1967
(last_event_stream_ordering , batch_size ),
@@ -1944,7 +1972,16 @@ def _find_memberships_to_update_txn(
1944
1972
memberships_to_update_rows = cast (
1945
1973
List [
1946
1974
Tuple [
1947
- str , Optional [str ], str , str , str , str , int , Optional [str ], bool
1975
+ str ,
1976
+ Optional [str ],
1977
+ Optional [str ],
1978
+ str ,
1979
+ str ,
1980
+ str ,
1981
+ str ,
1982
+ int ,
1983
+ Optional [str ],
1984
+ bool ,
1948
1985
]
1949
1986
],
1950
1987
txn .fetchall (),
@@ -1977,7 +2014,7 @@ def _find_memberships_to_update_txn(
1977
2014
1978
2015
def _find_previous_invite_or_knock_membership_txn (
1979
2016
txn : LoggingTransaction , room_id : str , user_id : str , event_id : str
1980
- ) -> Tuple [str , str ]:
2017
+ ) -> Optional [ Tuple [str , str ] ]:
1981
2018
# Find the previous invite/knock event before the leave event
1982
2019
#
1983
2020
# Here are some notes on how we landed on this query:
@@ -2027,8 +2064,13 @@ def _find_previous_invite_or_knock_membership_txn(
2027
2064
)
2028
2065
row = txn .fetchone ()
2029
2066
2030
- # We should see a corresponding previous invite/knock event
2031
- assert row is not None
2067
+ if row is None :
2068
+ # Generally we should have an invite or knock event for leaves
2069
+ # that are outliers, however this may not always be the case
2070
+ # (e.g. a local user got kicked but the kick event got pulled in
2071
+ # as an outlier).
2072
+ return None
2073
+
2032
2074
event_id , membership = row
2033
2075
2034
2076
return event_id , membership
@@ -2043,6 +2085,7 @@ def _find_previous_invite_or_knock_membership_txn(
2043
2085
for (
2044
2086
room_id ,
2045
2087
room_id_from_rooms_table ,
2088
+ room_version_id ,
2046
2089
user_id ,
2047
2090
sender ,
2048
2091
membership_event_id ,
@@ -2061,6 +2104,14 @@ def _find_previous_invite_or_knock_membership_txn(
2061
2104
Membership .BAN ,
2062
2105
)
2063
2106
2107
+ if (
2108
+ room_version_id is not None
2109
+ and room_version_id not in KNOWN_ROOM_VERSIONS
2110
+ ):
2111
+ # Ignore rooms with unknown room versions (these were
2112
+ # experimental rooms, that we no longer support).
2113
+ continue
2114
+
2064
2115
# There are some old out-of-band memberships (before
2065
2116
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the
2066
2117
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
@@ -2148,14 +2199,17 @@ def _find_previous_invite_or_knock_membership_txn(
2148
2199
# in the events table though. We'll just say that we don't
2149
2200
# know the state for these rooms and continue on with our
2150
2201
# day.
2151
- sliding_sync_membership_snapshots_insert_map ["has_known_state" ] = (
2152
- False
2153
- )
2202
+ sliding_sync_membership_snapshots_insert_map = {
2203
+ "has_known_state" : False ,
2204
+ "room_type" : None ,
2205
+ "room_name" : None ,
2206
+ "is_encrypted" : False ,
2207
+ }
2154
2208
elif membership in (Membership .INVITE , Membership .KNOCK ) or (
2155
2209
membership in (Membership .LEAVE , Membership .BAN ) and is_outlier
2156
2210
):
2157
- invite_or_knock_event_id = membership_event_id
2158
- invite_or_knock_membership = membership
2211
+ invite_or_knock_event_id = None
2212
+ invite_or_knock_membership = None
2159
2213
2160
2214
# If the event is an `out_of_band_membership` (special case of
2161
2215
# `outlier`), we never had historical state so we have to pull from
@@ -2164,35 +2218,55 @@ def _find_previous_invite_or_knock_membership_txn(
2164
2218
# membership (i.e. the room shouldn't disappear if your using the
2165
2219
# `is_encrypted` filter and you leave).
2166
2220
if membership in (Membership .LEAVE , Membership .BAN ) and is_outlier :
2167
- (
2168
- invite_or_knock_event_id ,
2169
- invite_or_knock_membership ,
2170
- ) = await self .db_pool .runInteraction (
2221
+ previous_membership = await self .db_pool .runInteraction (
2171
2222
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn" ,
2172
2223
_find_previous_invite_or_knock_membership_txn ,
2173
2224
room_id ,
2174
2225
user_id ,
2175
2226
membership_event_id ,
2176
2227
)
2228
+ if previous_membership is not None :
2229
+ (
2230
+ invite_or_knock_event_id ,
2231
+ invite_or_knock_membership ,
2232
+ ) = previous_membership
2233
+ else :
2234
+ invite_or_knock_event_id = membership_event_id
2235
+ invite_or_knock_membership = membership
2177
2236
2178
- # Pull from the stripped state on the invite/knock event
2179
- invite_or_knock_event = await self .get_event (invite_or_knock_event_id )
2180
-
2181
- raw_stripped_state_events = None
2182
- if invite_or_knock_membership == Membership .INVITE :
2183
- invite_room_state = invite_or_knock_event .unsigned .get (
2184
- "invite_room_state"
2185
- )
2186
- raw_stripped_state_events = invite_room_state
2187
- elif invite_or_knock_membership == Membership .KNOCK :
2188
- knock_room_state = invite_or_knock_event .unsigned .get (
2189
- "knock_room_state"
2237
+ if (
2238
+ invite_or_knock_event_id is not None
2239
+ and invite_or_knock_membership is not None
2240
+ ):
2241
+ # Pull from the stripped state on the invite/knock event
2242
+ invite_or_knock_event = await self .get_event (
2243
+ invite_or_knock_event_id
2190
2244
)
2191
- raw_stripped_state_events = knock_room_state
2192
2245
2193
- sliding_sync_membership_snapshots_insert_map = PersistEventsStore ._get_sliding_sync_insert_values_from_stripped_state (
2194
- raw_stripped_state_events
2195
- )
2246
+ raw_stripped_state_events = None
2247
+ if invite_or_knock_membership == Membership .INVITE :
2248
+ invite_room_state = invite_or_knock_event .unsigned .get (
2249
+ "invite_room_state"
2250
+ )
2251
+ raw_stripped_state_events = invite_room_state
2252
+ elif invite_or_knock_membership == Membership .KNOCK :
2253
+ knock_room_state = invite_or_knock_event .unsigned .get (
2254
+ "knock_room_state"
2255
+ )
2256
+ raw_stripped_state_events = knock_room_state
2257
+
2258
+ sliding_sync_membership_snapshots_insert_map = PersistEventsStore ._get_sliding_sync_insert_values_from_stripped_state (
2259
+ raw_stripped_state_events
2260
+ )
2261
+ else :
2262
+ # We couldn't find any state for the membership, so we just have to
2263
+ # leave it as empty.
2264
+ sliding_sync_membership_snapshots_insert_map = {
2265
+ "has_known_state" : False ,
2266
+ "room_type" : None ,
2267
+ "room_name" : None ,
2268
+ "is_encrypted" : False ,
2269
+ }
2196
2270
2197
2271
# We should have some insert values for each room, even if no
2198
2272
# stripped state is on the event because we still want to record
@@ -2311,19 +2385,42 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
2311
2385
)
2312
2386
# We need to find the `forgotten` value during the transaction because
2313
2387
# we can't risk inserting stale data.
2314
- txn .execute (
2315
- """
2316
- UPDATE sliding_sync_membership_snapshots
2317
- SET
2318
- forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
2319
- WHERE room_id = ? and user_id = ?
2320
- """ ,
2321
- (
2322
- membership_event_id ,
2323
- room_id ,
2324
- user_id ,
2325
- ),
2326
- )
2388
+ if isinstance (txn .database_engine , PostgresEngine ):
2389
+ txn .execute (
2390
+ """
2391
+ UPDATE sliding_sync_membership_snapshots
2392
+ SET
2393
+ forgotten = m.forgotten
2394
+ FROM room_memberships AS m
2395
+ WHERE sliding_sync_membership_snapshots.room_id = ?
2396
+ AND sliding_sync_membership_snapshots.user_id = ?
2397
+ AND membership_event_id = ?
2398
+ AND membership_event_id = m.event_id
2399
+ AND m.event_id IS NOT NULL
2400
+ """ ,
2401
+ (
2402
+ room_id ,
2403
+ user_id ,
2404
+ membership_event_id ,
2405
+ ),
2406
+ )
2407
+ else :
2408
+ # SQLite doesn't support UPDATE FROM before 3.33.0, so we do
2409
+ # this via sub-selects.
2410
+ txn .execute (
2411
+ """
2412
+ UPDATE sliding_sync_membership_snapshots
2413
+ SET
2414
+ forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
2415
+ WHERE room_id = ? and user_id = ? AND membership_event_id = ?
2416
+ """ ,
2417
+ (
2418
+ membership_event_id ,
2419
+ room_id ,
2420
+ user_id ,
2421
+ membership_event_id ,
2422
+ ),
2423
+ )
2327
2424
2328
2425
await self .db_pool .runInteraction (
2329
2426
"sliding_sync_membership_snapshots_bg_update" , _fill_table_txn
@@ -2333,6 +2430,7 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:
2333
2430
(
2334
2431
room_id ,
2335
2432
_room_id_from_rooms_table ,
2433
+ _room_version_id ,
2336
2434
user_id ,
2337
2435
_sender ,
2338
2436
_membership_event_id ,
0 commit comments