Skip to content

Commit 9cae44f

Browse files
authored
Track unconverted device list outbound pokes using a position instead (#14516)
When a local device list change is added to `device_lists_changes_in_room`, the `converted_to_destinations` flag is set to `FALSE` and the `_handle_new_device_update_async` background process is started. This background process looks for unconverted rows in `device_lists_changes_in_room`, copies them to `device_lists_outbound_pokes` and updates the flag. To update the `converted_to_destinations` flag, the database performs a `DELETE` and `INSERT` internally, which fragments the table. To avoid this, track unconverted rows using a `(stream ID, room ID)` position instead of the flag. From now on, the `converted_to_destinations` column indicates rows that need converting to outbound pokes, but does not indicate whether the conversion has already taken place. Closes #14037. Signed-off-by: Sean Quah <[email protected]>
1 parent 7eb7460 commit 9cae44f

File tree

6 files changed

+158
-49
lines changed

6 files changed

+158
-49
lines changed

changelog.d/14516.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor conversion of device list changes in room to outbound pokes to track unconverted rows using a `(stream ID, room ID)` position instead of updating the `converted_to_destinations` flag on every row.

synapse/handlers/device.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -682,13 +682,33 @@ async def _handle_new_device_update_async(self) -> None:
682682
hosts_already_sent_to: Set[str] = set()
683683

684684
try:
685+
stream_id, room_id = await self.store.get_device_change_last_converted_pos()
686+
685687
while True:
686688
self._handle_new_device_update_new_data = False
687-
rows = await self.store.get_uncoverted_outbound_room_pokes()
689+
max_stream_id = self.store.get_device_stream_token()
690+
rows = await self.store.get_uncoverted_outbound_room_pokes(
691+
stream_id, room_id
692+
)
688693
if not rows:
689694
# If the DB returned nothing then there is nothing left to
690695
# do, *unless* a new device list update happened during the
691696
# DB query.
697+
698+
# Advance `(stream_id, room_id)`.
699+
# `max_stream_id` comes from *before* the query for unconverted
700+
# rows, which means that any unconverted rows must have a larger
701+
# stream ID.
702+
if max_stream_id > stream_id:
703+
stream_id, room_id = max_stream_id, ""
704+
await self.store.set_device_change_last_converted_pos(
705+
stream_id, room_id
706+
)
707+
else:
708+
assert max_stream_id == stream_id
709+
# Avoid moving `room_id` backwards.
710+
pass
711+
692712
if self._handle_new_device_update_new_data:
693713
continue
694714
else:
@@ -718,7 +738,6 @@ async def _handle_new_device_update_async(self) -> None:
718738
user_id=user_id,
719739
device_id=device_id,
720740
room_id=room_id,
721-
stream_id=stream_id,
722741
hosts=hosts,
723742
context=opentracing_context,
724743
)
@@ -752,6 +771,12 @@ async def _handle_new_device_update_async(self) -> None:
752771
hosts_already_sent_to.update(hosts)
753772
current_stream_id = stream_id
754773

774+
# Advance `(stream_id, room_id)`.
775+
_, _, room_id, stream_id, _ = rows[-1]
776+
await self.store.set_device_change_last_converted_pos(
777+
stream_id, room_id
778+
)
779+
755780
finally:
756781
self._handle_new_device_update_is_processing = False
757782

@@ -834,7 +859,6 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:
834859
user_id=user_id,
835860
device_id=device_id,
836861
room_id=room_id,
837-
stream_id=None,
838862
hosts=potentially_changed_hosts,
839863
context=None,
840864
)

synapse/storage/database.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2075,13 +2075,14 @@ def simple_select_one_txn(
20752075
retcols: Collection[str],
20762076
allow_none: bool = False,
20772077
) -> Optional[Dict[str, Any]]:
2078-
select_sql = "SELECT %s FROM %s WHERE %s" % (
2079-
", ".join(retcols),
2080-
table,
2081-
" AND ".join("%s = ?" % (k,) for k in keyvalues),
2082-
)
2078+
select_sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
2079+
2080+
if keyvalues:
2081+
select_sql += " WHERE %s" % (" AND ".join("%s = ?" % k for k in keyvalues),)
2082+
txn.execute(select_sql, list(keyvalues.values()))
2083+
else:
2084+
txn.execute(select_sql)
20832085

2084-
txn.execute(select_sql, list(keyvalues.values()))
20852086
row = txn.fetchone()
20862087

20872088
if not row:

synapse/storage/databases/main/devices.py

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,27 +2008,48 @@ def _add_device_outbound_room_poke_txn(
20082008
)
20092009

20102010
async def get_uncoverted_outbound_room_pokes(
2011-
self, limit: int = 10
2011+
self, start_stream_id: int, start_room_id: str, limit: int = 10
20122012
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
20132013
"""Get device list changes by room that have not yet been handled and
20142014
written to `device_lists_outbound_pokes`.
20152015
2016+
Args:
2017+
start_stream_id: Together with `start_room_id`, indicates the position after
2018+
which to return device list changes.
2019+
start_room_id: Together with `start_stream_id`, indicates the position after
2020+
which to return device list changes.
2021+
limit: The maximum number of device list changes to return.
2022+
20162023
Returns:
2017-
A list of user ID, device ID, room ID, stream ID and optional opentracing context.
2024+
A list of user ID, device ID, room ID, stream ID and optional opentracing
2025+
context, in order of ascending (stream ID, room ID).
20182026
"""
20192027

20202028
sql = """
20212029
SELECT user_id, device_id, room_id, stream_id, opentracing_context
20222030
FROM device_lists_changes_in_room
2023-
WHERE NOT converted_to_destinations
2024-
ORDER BY stream_id
2031+
WHERE
2032+
(stream_id, room_id) > (?, ?) AND
2033+
stream_id <= ? AND
2034+
NOT converted_to_destinations
2035+
ORDER BY stream_id ASC, room_id ASC
20252036
LIMIT ?
20262037
"""
20272038

20282039
def get_uncoverted_outbound_room_pokes_txn(
20292040
txn: LoggingTransaction,
20302041
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
2031-
txn.execute(sql, (limit,))
2042+
txn.execute(
2043+
sql,
2044+
(
2045+
start_stream_id,
2046+
start_room_id,
2047+
# Avoid returning rows if there may be uncommitted device list
2048+
# changes with smaller stream IDs.
2049+
self._device_list_id_gen.get_current_token(),
2050+
limit,
2051+
),
2052+
)
20322053

20332054
return [
20342055
(
@@ -2050,49 +2071,25 @@ async def add_device_list_outbound_pokes(
20502071
user_id: str,
20512072
device_id: str,
20522073
room_id: str,
2053-
stream_id: Optional[int],
20542074
hosts: Collection[str],
20552075
context: Optional[Dict[str, str]],
20562076
) -> None:
20572077
"""Queue the device update to be sent to the given set of hosts,
20582078
calculated from the room ID.
2059-
2060-
Marks the associated row in `device_lists_changes_in_room` as handled,
2061-
if `stream_id` is provided.
20622079
"""
2080+
if not hosts:
2081+
return
20632082

20642083
def add_device_list_outbound_pokes_txn(
20652084
txn: LoggingTransaction, stream_ids: List[int]
20662085
) -> None:
2067-
if hosts:
2068-
self._add_device_outbound_poke_to_stream_txn(
2069-
txn,
2070-
user_id=user_id,
2071-
device_id=device_id,
2072-
hosts=hosts,
2073-
stream_ids=stream_ids,
2074-
context=context,
2075-
)
2076-
2077-
if stream_id:
2078-
self.db_pool.simple_update_txn(
2079-
txn,
2080-
table="device_lists_changes_in_room",
2081-
keyvalues={
2082-
"user_id": user_id,
2083-
"device_id": device_id,
2084-
"stream_id": stream_id,
2085-
"room_id": room_id,
2086-
},
2087-
updatevalues={"converted_to_destinations": True},
2088-
)
2089-
2090-
if not hosts:
2091-
# If there are no hosts then we don't try and generate stream IDs.
2092-
return await self.db_pool.runInteraction(
2093-
"add_device_list_outbound_pokes",
2094-
add_device_list_outbound_pokes_txn,
2095-
[],
2086+
self._add_device_outbound_poke_to_stream_txn(
2087+
txn,
2088+
user_id=user_id,
2089+
device_id=device_id,
2090+
hosts=hosts,
2091+
stream_ids=stream_ids,
2092+
context=context,
20962093
)
20972094

20982095
async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
@@ -2156,3 +2153,37 @@ def get_pending_remote_device_list_updates_for_room_txn(
21562153
"get_pending_remote_device_list_updates_for_room",
21572154
get_pending_remote_device_list_updates_for_room_txn,
21582155
)
2156+
2157+
async def get_device_change_last_converted_pos(self) -> Tuple[int, str]:
2158+
"""
2159+
Get the position of the last row in `device_list_changes_in_room` that has been
2160+
converted to `device_lists_outbound_pokes`.
2161+
2162+
Rows with a strictly greater position where `converted_to_destinations` is
2163+
`FALSE` have not been converted.
2164+
"""
2165+
2166+
row = await self.db_pool.simple_select_one(
2167+
table="device_lists_changes_converted_stream_position",
2168+
keyvalues={},
2169+
retcols=["stream_id", "room_id"],
2170+
desc="get_device_change_last_converted_pos",
2171+
)
2172+
return row["stream_id"], row["room_id"]
2173+
2174+
async def set_device_change_last_converted_pos(
2175+
self,
2176+
stream_id: int,
2177+
room_id: str,
2178+
) -> None:
2179+
"""
2180+
Set the position of the last row in `device_list_changes_in_room` that has been
2181+
converted to `device_lists_outbound_pokes`.
2182+
"""
2183+
2184+
await self.db_pool.simple_update_one(
2185+
table="device_lists_changes_converted_stream_position",
2186+
keyvalues={},
2187+
updatevalues={"stream_id": stream_id, "room_id": room_id},
2188+
desc="set_device_change_last_converted_pos",
2189+
)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/* Copyright 2022 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- Prior to this schema delta, we tracked the set of unconverted rows in
17+
-- `device_lists_changes_in_room` using the `converted_to_destinations` flag. When rows
18+
-- were converted to `device_lists_outbound_pokes`, the `converted_to_destinations` flag
19+
-- would be set.
20+
--
21+
-- After this schema delta, the `converted_to_destinations` is still populated like
22+
-- before, but the set of unconverted rows is determined by the `stream_id` in the new
23+
-- `device_lists_changes_converted_stream_position` table.
24+
--
25+
-- If rolled back, Synapse will re-send all device list changes that happened since the
26+
-- schema delta.
27+
28+
CREATE TABLE IF NOT EXISTS device_lists_changes_converted_stream_position(
29+
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
30+
-- The (stream id, room id) of the last row in `device_lists_changes_in_room` that
31+
-- has been converted to `device_lists_outbound_pokes`. Rows with a strictly larger
32+
-- (stream id, room id) where `converted_to_destinations` is `FALSE` have not been
33+
-- converted.
34+
stream_id BIGINT NOT NULL,
35+
-- `room_id` may be an empty string, which compares less than all valid room IDs.
36+
room_id TEXT NOT NULL,
37+
CHECK (Lock='X')
38+
);
39+
40+
INSERT INTO device_lists_changes_converted_stream_position (stream_id, room_id) VALUES (
41+
(
42+
SELECT COALESCE(
43+
-- The last converted stream id is the smallest unconverted stream id minus
44+
-- one.
45+
MIN(stream_id) - 1,
46+
-- If there is no unconverted stream id, the last converted stream id is the
47+
-- largest stream id.
48+
-- Otherwise, pick 1, since stream ids start at 2.
49+
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_in_room)
50+
) FROM device_lists_changes_in_room WHERE NOT converted_to_destinations
51+
),
52+
''
53+
);

tests/storage/test_devices.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def add_device_change(self, user_id, device_ids, host):
2828
"""
2929

3030
for device_id in device_ids:
31-
stream_id = self.get_success(
31+
self.get_success(
3232
self.store.add_device_change_to_streams(
3333
user_id, [device_id], ["!some:room"]
3434
)
@@ -39,7 +39,6 @@ def add_device_change(self, user_id, device_ids, host):
3939
user_id=user_id,
4040
device_id=device_id,
4141
room_id="!some:room",
42-
stream_id=stream_id,
4342
hosts=[host],
4443
context={},
4544
)

0 commit comments

Comments
 (0)