Skip to content

Exhaustive sync_events::Response conversion #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 45 additions & 29 deletions matrix_sdk_base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,35 @@ impl BaseClient {
&self,
response: api::sync::sync_events::Response,
) -> Result<SyncResponse> {
#[cfg(test)]
let api::sync::sync_events::Response {
next_batch,
rooms,
presence,
account_data,
to_device,
device_lists,
device_one_time_keys_count,
__test_exhaustive: _,
} = response;

#[cfg(not(test))]
let api::sync::sync_events::Response {
next_batch,
rooms,
presence,
account_data,
to_device,
device_lists,
device_one_time_keys_count,
..
} = response;

// The server might respond multiple times with the same sync token, in
// that case we already received this response and there's nothing to
// do.
if self.sync_token.read().await.as_ref() == Some(&response.next_batch) {
return Ok(SyncResponse::new(response.next_batch));
if self.sync_token.read().await.as_ref() == Some(&next_batch) {
return Ok(SyncResponse::new(next_batch));
}

let now = Instant::now();
Expand All @@ -700,15 +724,10 @@ impl BaseClient {
// decryptes to-device events, but leaves room events alone.
// This makes sure that we have the deryption keys for the room
// events at hand.
o.receive_sync_changes(
&response.to_device,
&response.device_lists,
&response.device_one_time_keys_count,
)
.await?
o.receive_sync_changes(&to_device, &device_lists, &device_one_time_keys_count)
.await?
} else {
response
.to_device
to_device
.events
.into_iter()
.filter_map(|e| e.deserialize().ok())
Expand All @@ -717,20 +736,19 @@ impl BaseClient {
}
};
#[cfg(not(feature = "encryption"))]
let to_device = response
.to_device
let to_device = to_device
.events
.into_iter()
.filter_map(|e| e.deserialize().ok())
.collect::<Vec<AnyToDeviceEvent>>()
.into();

let mut changes = StateChanges::new(response.next_batch.clone());
let mut changes = StateChanges::new(next_batch.clone());
let mut ambiguity_cache = AmbiguityCache::new(self.store.clone());

let mut rooms = Rooms::default();
let mut new_rooms = Rooms::default();

for (room_id, new_info) in response.rooms.join {
for (room_id, new_info) in rooms.join {
let room = self
.store
.get_or_create_room(&room_id, RoomType::Joined)
Expand Down Expand Up @@ -799,15 +817,15 @@ impl BaseClient {
.collect(),
};

rooms.join.insert(
new_rooms.join.insert(
room_id,
JoinedRoom::new(timeline, state, account_data, ephemeral, notification_count),
);

changes.add_room(room_info);
}

for (room_id, new_info) in response.rooms.leave {
for (room_id, new_info) in rooms.leave {
let room = self
.store
.get_or_create_room(&room_id, RoomType::Left)
Expand Down Expand Up @@ -840,12 +858,12 @@ impl BaseClient {
.await;

changes.add_room(room_info);
rooms
new_rooms
.leave
.insert(room_id, LeftRoom::new(timeline, state, account_data));
}

for (room_id, new_info) in response.rooms.invite {
for (room_id, new_info) in rooms.invite {
{
let room = self
.store
Expand All @@ -870,11 +888,10 @@ impl BaseClient {
invite_state: state,
};

rooms.invite.insert(room_id, room);
new_rooms.invite.insert(room_id, room);
}

let presence: BTreeMap<UserId, PresenceEvent> = response
.presence
let presence: BTreeMap<UserId, PresenceEvent> = presence
.events
.into_iter()
.filter_map(|e| {
Expand All @@ -885,30 +902,29 @@ impl BaseClient {

changes.presence = presence;

self.handle_account_data(response.account_data.events, &mut changes)
self.handle_account_data(account_data.events, &mut changes)
.await;

changes.ambiguity_maps = ambiguity_cache.cache;

self.store.save_changes(&changes).await?;
*self.sync_token.write().await = Some(response.next_batch.clone());
*self.sync_token.write().await = Some(next_batch.clone());
self.apply_changes(&changes).await;

info!("Processed a sync response in {:?}", now.elapsed());

let response = SyncResponse {
next_batch: response.next_batch,
rooms,
next_batch,
rooms: new_rooms,
presence: Presence {
events: changes.presence.into_iter().map(|(_, v)| v).collect(),
},
account_data: AccountData {
events: changes.account_data.into_iter().map(|(_, e)| e).collect(),
},
to_device,
device_lists: response.device_lists,
device_one_time_keys_count: response
.device_one_time_keys_count
device_lists,
device_one_time_keys_count: device_one_time_keys_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
Expand Down
6 changes: 3 additions & 3 deletions matrix_sdk_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ serde = "1.0.122"
async-trait = "0.1.42"

[dependencies.ruma]
version = "0.0.2"
version = "0.0.3"
git = "https://github.com/ruma/ruma"
rev = "47d6b458574247545f8836b9421800f0089f3008"
features = ["client-api", "compat", "unstable-pre-spec"]
rev = "c816630058ab625d93ebab294e9e6c02dd9d866c"
features = ["client-api-c", "compat", "unstable-pre-spec"]

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
uuid = { version = "0.8.2", default-features = false, features = ["v4", "serde"] }
Expand Down