4
4
//! same cache. It helps to define what it sometimes called a “cold start”, or a
5
5
//! “fast start”.
6
6
7
- use std:: collections:: BTreeMap ;
8
-
9
7
use matrix_sdk_base:: { StateStore , StoreError } ;
10
8
use matrix_sdk_common:: timer;
11
9
use ruma:: UserId ;
12
10
use tracing:: { trace, warn} ;
13
11
14
- use super :: {
15
- FrozenSlidingSync , FrozenSlidingSyncList , SlidingSync , SlidingSyncList ,
16
- SlidingSyncPositionMarkers ,
17
- } ;
12
+ use super :: { FrozenSlidingSyncList , SlidingSync , SlidingSyncPositionMarkers } ;
18
13
#[ cfg( feature = "e2e-encryption" ) ]
19
14
use crate :: sliding_sync:: FrozenSlidingSyncPos ;
20
15
use crate :: { sliding_sync:: SlidingSyncListCachePolicy , Client , Result } ;
@@ -48,30 +43,6 @@ async fn invalidate_cached_list(
48
43
let _ = storage. remove_custom_value ( storage_key_for_list. as_bytes ( ) ) . await ;
49
44
}
50
45
51
- /// Clean the storage for everything related to `SlidingSync` and all known
52
- /// lists.
53
- async fn clean_storage (
54
- client : & Client ,
55
- storage_key : & str ,
56
- lists : & BTreeMap < String , SlidingSyncList > ,
57
- ) {
58
- let storage = client. state_store ( ) ;
59
- for list_name in lists. keys ( ) {
60
- invalidate_cached_list ( storage, storage_key, list_name) . await ;
61
- }
62
- let instance_storage_key = format_storage_key_for_sliding_sync ( storage_key) ;
63
- let _ = storage. remove_custom_value ( instance_storage_key. as_bytes ( ) ) . await ;
64
-
65
- #[ cfg( feature = "e2e-encryption" ) ]
66
- if let Some ( olm_machine) = & * client. olm_machine ( ) . await {
67
- // Invalidate the value stored for the TERRIBLE HACK.
68
- let _ = olm_machine
69
- . store ( )
70
- . set_custom_value ( & instance_storage_key, "" . as_bytes ( ) . to_vec ( ) )
71
- . await ;
72
- }
73
- }
74
-
75
46
/// Store the `SlidingSync`'s state in the storage.
76
47
pub ( super ) async fn store_sliding_sync_state (
77
48
sliding_sync : & SlidingSync ,
@@ -83,15 +54,6 @@ pub(super) async fn store_sliding_sync_state(
83
54
trace ! ( storage_key, "Saving a `SlidingSync` to the state store" ) ;
84
55
let storage = sliding_sync. inner . client . state_store ( ) ;
85
56
86
- // Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside
87
- // the store.
88
- storage
89
- . set_custom_value (
90
- instance_storage_key. as_bytes ( ) ,
91
- serde_json:: to_vec ( & FrozenSlidingSync :: new ( ) ) ?,
92
- )
93
- . await ?;
94
-
95
57
#[ cfg( feature = "e2e-encryption" ) ]
96
58
{
97
59
let position = _position;
@@ -194,7 +156,6 @@ pub(super) struct RestoredFields {
194
156
pub ( super ) async fn restore_sliding_sync_state (
195
157
client : & Client ,
196
158
storage_key : & str ,
197
- lists : & BTreeMap < String , SlidingSyncList > ,
198
159
) -> Result < Option < RestoredFields > > {
199
160
let _timer = timer ! ( format!( "loading sliding sync {storage_key} state from DB" ) ) ;
200
161
@@ -210,61 +171,17 @@ pub(super) async fn restore_sliding_sync_state(
210
171
}
211
172
}
212
173
213
- let storage = client. state_store ( ) ;
214
174
let instance_storage_key = format_storage_key_for_sliding_sync ( storage_key) ;
215
175
216
176
// Preload the `SlidingSync` object from the cache.
217
- match storage
218
- . get_custom_value ( instance_storage_key. as_bytes ( ) )
219
- . await ?
220
- . map ( |custom_value| serde_json:: from_slice :: < FrozenSlidingSync > ( & custom_value) )
221
- {
222
- // `SlidingSync` has been found and successfully deserialized.
223
- Some ( Ok ( FrozenSlidingSync { to_device_since } ) ) => {
224
- trace ! ( "Successfully read the `SlidingSync` from the cache" ) ;
225
- // Only update the to-device token if we failed to read it from the crypto store
226
- // above.
227
- if restored_fields. to_device_token . is_none ( ) {
228
- restored_fields. to_device_token = to_device_since;
229
- }
230
-
231
- #[ cfg( feature = "e2e-encryption" ) ]
232
- {
233
- if let Some ( olm_machine) = & * client. olm_machine ( ) . await {
234
- if let Ok ( Some ( blob) ) =
235
- olm_machine. store ( ) . get_custom_value ( & instance_storage_key) . await
236
- {
237
- if let Ok ( frozen_pos) =
238
- serde_json:: from_slice :: < FrozenSlidingSyncPos > ( & blob)
239
- {
240
- trace ! ( "Successfully read the `Sliding Sync` pos from the crypto store cache" ) ;
241
- restored_fields. pos = frozen_pos. pos ;
242
- }
243
- }
244
- }
177
+ #[ cfg( feature = "e2e-encryption" ) ]
178
+ if let Some ( olm_machine) = & * client. olm_machine ( ) . await {
179
+ if let Ok ( Some ( blob) ) = olm_machine. store ( ) . get_custom_value ( & instance_storage_key) . await {
180
+ if let Ok ( frozen_pos) = serde_json:: from_slice :: < FrozenSlidingSyncPos > ( & blob) {
181
+ trace ! ( "Successfully read the `Sliding Sync` pos from the crypto store cache" ) ;
182
+ restored_fields. pos = frozen_pos. pos ;
245
183
}
246
184
}
247
-
248
- // `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
249
- // declared as obsolete. The main reason might be that the internal
250
- // representation of a `SlidingSync` might have changed.
251
- // Instead of considering this as a strong error, we remove
252
- // the entry from the cache and keep `SlidingSync` in its initial
253
- // state.
254
- Some ( Err ( _) ) => {
255
- warn ! (
256
- "failed to deserialize `SlidingSync` from the cache, it is obsolete; removing the cache entry!"
257
- ) ;
258
-
259
- // Let's clear everything and stop here.
260
- clean_storage ( client, storage_key, lists) . await ;
261
-
262
- return Ok ( None ) ;
263
- }
264
-
265
- None => {
266
- trace ! ( "No Sliding Sync object in the cache" ) ;
267
- }
268
185
}
269
186
270
187
Ok ( Some ( restored_fields) )
@@ -274,13 +191,12 @@ pub(super) async fn restore_sliding_sync_state(
274
191
mod tests {
275
192
use std:: sync:: { Arc , RwLock } ;
276
193
277
- use assert_matches:: assert_matches;
278
194
use matrix_sdk_test:: async_test;
279
195
280
196
use super :: {
281
- clean_storage , format_storage_key_for_sliding_sync,
197
+ super :: SlidingSyncList , format_storage_key_for_sliding_sync,
282
198
format_storage_key_for_sliding_sync_list, format_storage_key_prefix,
283
- restore_sliding_sync_state, store_sliding_sync_state, SlidingSyncList ,
199
+ restore_sliding_sync_state, store_sliding_sync_state,
284
200
} ;
285
201
use crate :: { test_utils:: logged_in_client, Result } ;
286
202
@@ -291,30 +207,26 @@ mod tests {
291
207
292
208
let store = client. state_store ( ) ;
293
209
294
- // Store entries don't exist.
295
- assert ! ( store
296
- . get_custom_value( format_storage_key_for_sliding_sync( "hello" ) . as_bytes( ) )
297
- . await ?
298
- . is_none( ) ) ;
210
+ let sync_id = "test-sync-id" ;
211
+ let storage_key = format_storage_key_prefix ( sync_id, client. user_id ( ) . unwrap ( ) ) ;
299
212
213
+ // Store entries don't exist.
300
214
assert ! ( store
301
215
. get_custom_value(
302
- format_storage_key_for_sliding_sync_list( "hello" , "list_foo" ) . as_bytes( )
216
+ format_storage_key_for_sliding_sync_list( & storage_key , "list_foo" ) . as_bytes( )
303
217
)
304
218
. await ?
305
219
. is_none( ) ) ;
306
220
307
221
assert ! ( store
308
222
. get_custom_value(
309
- format_storage_key_for_sliding_sync_list( "hello" , "list_bar" ) . as_bytes( )
223
+ format_storage_key_for_sliding_sync_list( & storage_key , "list_bar" ) . as_bytes( )
310
224
)
311
225
. await ?
312
226
. is_none( ) ) ;
313
227
314
228
// Create a new `SlidingSync` instance, and store it.
315
229
let storage_key = {
316
- let sync_id = "test-sync-id" ;
317
- let storage_key = format_storage_key_prefix ( sync_id, client. user_id ( ) . unwrap ( ) ) ;
318
230
let sliding_sync = client
319
231
. sliding_sync ( sync_id) ?
320
232
. add_cached_list ( SlidingSyncList :: builder ( "list_foo" ) )
@@ -340,20 +252,15 @@ mod tests {
340
252
storage_key
341
253
} ;
342
254
343
- // Store entries now exist for the sliding sync object and list_foo.
344
- assert ! ( store
345
- . get_custom_value( format_storage_key_for_sliding_sync( & storage_key) . as_bytes( ) )
346
- . await ?
347
- . is_some( ) ) ;
348
-
255
+ // Store entries now exist for `list_foo`.
349
256
assert ! ( store
350
257
. get_custom_value(
351
258
format_storage_key_for_sliding_sync_list( & storage_key, "list_foo" ) . as_bytes( )
352
259
)
353
260
. await ?
354
261
. is_some( ) ) ;
355
262
356
- // But not for list_bar.
263
+ // But not for ` list_bar` .
357
264
assert ! ( store
358
265
. get_custom_value(
359
266
format_storage_key_for_sliding_sync_list( & storage_key, "list_bar" ) . as_bytes( )
@@ -362,83 +269,52 @@ mod tests {
362
269
. is_none( ) ) ;
363
270
364
271
// Create a new `SlidingSync`, and it should be read from the cache.
365
- let storage_key = {
366
- let sync_id = "test-sync-id" ;
367
- let storage_key = format_storage_key_prefix ( sync_id, client. user_id ( ) . unwrap ( ) ) ;
368
- let max_number_of_room_stream = Arc :: new ( RwLock :: new ( None ) ) ;
369
- let cloned_stream = max_number_of_room_stream. clone ( ) ;
370
- let sliding_sync = client
371
- . sliding_sync ( sync_id) ?
372
- . add_cached_list ( SlidingSyncList :: builder ( "list_foo" ) . once_built ( move |list| {
373
- // In the `once_built()` handler, nothing has been read from the cache yet.
374
- assert_eq ! ( list. maximum_number_of_rooms( ) , None ) ;
375
-
376
- let mut stream = cloned_stream. write ( ) . unwrap ( ) ;
377
- * stream = Some ( list. maximum_number_of_rooms_stream ( ) ) ;
378
- list
379
- } ) )
380
- . await ?
381
- . add_list ( SlidingSyncList :: builder ( "list_bar" ) )
382
- . build ( )
383
- . await ?;
384
-
385
- // Check the list' state.
386
- {
387
- let lists = sliding_sync. inner . lists . read ( ) . await ;
388
-
389
- // This one was cached.
390
- let list_foo = lists. get ( "list_foo" ) . unwrap ( ) ;
391
- assert_eq ! ( list_foo. maximum_number_of_rooms( ) , Some ( 42 ) ) ;
392
-
393
- // This one wasn't.
394
- let list_bar = lists. get ( "list_bar" ) . unwrap ( ) ;
395
- assert_eq ! ( list_bar. maximum_number_of_rooms( ) , None ) ;
396
- }
397
-
398
- // The maximum number of rooms reloaded from the cache should have been
399
- // published.
400
- {
401
- let mut stream =
402
- max_number_of_room_stream. write ( ) . unwrap ( ) . take ( ) . expect ( "stream must be set" ) ;
403
- let initial_max_number_of_rooms =
404
- stream. next ( ) . await . expect ( "stream must have emitted something" ) ;
405
- assert_eq ! ( initial_max_number_of_rooms, Some ( 42 ) ) ;
406
- }
272
+ let max_number_of_room_stream = Arc :: new ( RwLock :: new ( None ) ) ;
273
+ let cloned_stream = max_number_of_room_stream. clone ( ) ;
274
+ let sliding_sync = client
275
+ . sliding_sync ( sync_id) ?
276
+ . add_cached_list ( SlidingSyncList :: builder ( "list_foo" ) . once_built ( move |list| {
277
+ // In the `once_built()` handler, nothing has been read from the cache yet.
278
+ assert_eq ! ( list. maximum_number_of_rooms( ) , None ) ;
279
+
280
+ let mut stream = cloned_stream. write ( ) . unwrap ( ) ;
281
+ * stream = Some ( list. maximum_number_of_rooms_stream ( ) ) ;
282
+ list
283
+ } ) )
284
+ . await ?
285
+ . add_list ( SlidingSyncList :: builder ( "list_bar" ) )
286
+ . build ( )
287
+ . await ?;
407
288
408
- // Clean the cache.
289
+ // Check the list' state.
290
+ {
409
291
let lists = sliding_sync. inner . lists . read ( ) . await ;
410
- clean_storage ( & client, & storage_key, & lists) . await ;
411
- storage_key
412
- } ;
413
292
414
- // Store entries don't exist.
415
- assert ! ( store
416
- . get_custom_value( format_storage_key_for_sliding_sync( & storage_key) . as_bytes( ) )
417
- . await ?
418
- . is_none( ) ) ;
293
+ // This one was cached.
294
+ let list_foo = lists. get ( "list_foo" ) . unwrap ( ) ;
295
+ assert_eq ! ( list_foo. maximum_number_of_rooms( ) , Some ( 42 ) ) ;
419
296
420
- assert ! ( store
421
- . get_custom_value(
422
- format_storage_key_for_sliding_sync_list( & storage_key, "list_foo" ) . as_bytes( )
423
- )
424
- . await ?
425
- . is_none( ) ) ;
297
+ // This one wasn't.
298
+ let list_bar = lists. get ( "list_bar" ) . unwrap ( ) ;
299
+ assert_eq ! ( list_bar. maximum_number_of_rooms( ) , None ) ;
300
+ }
426
301
427
- assert ! ( store
428
- . get_custom_value(
429
- format_storage_key_for_sliding_sync_list( & storage_key, "list_bar" ) . as_bytes( )
430
- )
431
- . await ?
432
- . is_none( ) ) ;
302
+ // The maximum number of rooms reloaded from the cache should have been
303
+ // published.
304
+ {
305
+ let mut stream =
306
+ max_number_of_room_stream. write ( ) . unwrap ( ) . take ( ) . expect ( "stream must be set" ) ;
307
+ let initial_max_number_of_rooms =
308
+ stream. next ( ) . await . expect ( "stream must have emitted something" ) ;
309
+ assert_eq ! ( initial_max_number_of_rooms, Some ( 42 ) ) ;
310
+ }
433
311
434
312
Ok ( ( ) )
435
313
}
436
314
437
315
#[ cfg( feature = "e2e-encryption" ) ]
438
316
#[ async_test]
439
317
async fn test_sliding_sync_high_level_cache_and_restore ( ) -> Result < ( ) > {
440
- use crate :: sliding_sync:: FrozenSlidingSync ;
441
-
442
318
let client = logged_in_client ( Some ( "https://foo.bar" . to_owned ( ) ) ) . await ;
443
319
444
320
let sync_id = "test-sync-id" ;
@@ -465,21 +341,10 @@ mod tests {
465
341
store_sliding_sync_state ( & sliding_sync, & position_guard) . await ?;
466
342
}
467
343
468
- // The delta token has been correctly written to the state store (but not the
469
- // to_device_since, since it's in the other store).
470
- let state_store = client. state_store ( ) ;
471
- assert_matches ! (
472
- state_store. get_custom_value( full_storage_key. as_bytes( ) ) . await ?,
473
- Some ( bytes) => {
474
- let deserialized: FrozenSlidingSync = serde_json:: from_slice( & bytes) ?;
475
- assert!( deserialized. to_device_since. is_none( ) ) ;
476
- }
477
- ) ;
478
-
479
344
// Ok, forget about the sliding sync, let's recreate one from scratch.
480
345
drop ( sliding_sync) ;
481
346
482
- let restored_fields = restore_sliding_sync_state ( & client, & storage_key_prefix, & [ ] . into ( ) )
347
+ let restored_fields = restore_sliding_sync_state ( & client, & storage_key_prefix)
483
348
. await ?
484
349
. expect ( "must have restored sliding sync fields" ) ;
485
350
@@ -496,28 +361,6 @@ mod tests {
496
361
assert ! ( olm_machine. store( ) . next_batch_token( ) . await ?. is_none( ) ) ;
497
362
}
498
363
499
- let to_device_token = "to_device_token" . to_owned ( ) ;
500
-
501
- // Put that delta-token in the state store.
502
- let state_store = client. state_store ( ) ;
503
- state_store
504
- . set_custom_value (
505
- full_storage_key. as_bytes ( ) ,
506
- serde_json:: to_vec ( & FrozenSlidingSync {
507
- to_device_since : Some ( to_device_token. clone ( ) ) ,
508
- } ) ?,
509
- )
510
- . await ?;
511
-
512
- let restored_fields = restore_sliding_sync_state ( & client, & storage_key_prefix, & [ ] . into ( ) )
513
- . await ?
514
- . expect ( "must have restored fields" ) ;
515
-
516
- // After restoring, the to-device since token, and the stream position could be
517
- // read from the state store.
518
- assert_eq ! ( restored_fields. to_device_token. unwrap( ) , to_device_token) ;
519
- assert_eq ! ( restored_fields. pos. unwrap( ) , pos) ;
520
-
521
364
Ok ( ( ) )
522
365
}
523
366
}
0 commit comments