Skip to content

Commit 514074d

Browse files
committed
JSON-RPC: retrieve events via long polling
This way is more compatible to JSON-RPC libraries that do not support receiving notifications from the server and allows describing event types in the OpenRPC specification. Event thread converting events to notifications in the FFI is removed, so it is now possible to construct a dc_jsonrpc_instance_t while still retrieving events via dc_event_emitter_t.
1 parent e7aab5c commit 514074d

File tree

9 files changed

+121
-143
lines changed

9 files changed

+121
-143
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
## Unreleased
44

5+
### Changes
6+
- JSON-RPC: Use long polling instead of server-sent notifications to retrieve events.
7+
This better corresponds to JSON-RPC 2.0 server-client distinction
8+
and is expected to simplify writing new bindings
9+
because dispatching events can be done on higher level.
10+
511
### Fixes
612
- JSON-RPC: do not print to stdout on failure to find an account.
713

deltachat-ffi/src/lib.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4967,15 +4967,13 @@ pub unsafe extern "C" fn dc_accounts_get_event_emitter(
49674967
#[cfg(feature = "jsonrpc")]
49684968
mod jsonrpc {
49694969
use deltachat_jsonrpc::api::CommandApi;
4970-
use deltachat_jsonrpc::events::event_to_json_rpc_notification;
49714970
use deltachat_jsonrpc::yerpc::{OutReceiver, RpcClient, RpcSession};
49724971

49734972
use super::*;
49744973

49754974
pub struct dc_jsonrpc_instance_t {
49764975
receiver: OutReceiver,
49774976
handle: RpcSession<CommandApi>,
4978-
event_thread: JoinHandle<Result<(), anyhow::Error>>,
49794977
}
49804978

49814979
#[no_mangle]
@@ -4988,28 +4986,12 @@ mod jsonrpc {
49884986
}
49894987

49904988
let account_manager = &*account_manager;
4991-
let events = block_on(account_manager.read()).get_event_emitter();
49924989
let cmd_api = deltachat_jsonrpc::api::CommandApi::from_arc(account_manager.inner.clone());
49934990

49944991
let (request_handle, receiver) = RpcClient::new();
4995-
let handle = RpcSession::new(request_handle.clone(), cmd_api);
4996-
4997-
let event_thread = spawn(async move {
4998-
while let Some(event) = events.recv().await {
4999-
let event = event_to_json_rpc_notification(event);
5000-
request_handle
5001-
.send_notification("event", Some(event))
5002-
.await?;
5003-
}
5004-
let res: Result<(), anyhow::Error> = Ok(());
5005-
res
5006-
});
4992+
let handle = RpcSession::new(request_handle, cmd_api);
50074993

5008-
let instance = dc_jsonrpc_instance_t {
5009-
receiver,
5010-
handle,
5011-
event_thread,
5012-
};
4994+
let instance = dc_jsonrpc_instance_t { receiver, handle };
50134995

50144996
Box::into_raw(Box::new(instance))
50154997
}
@@ -5020,7 +5002,6 @@ mod jsonrpc {
50205002
eprintln!("ignoring careless call to dc_jsonrpc_unref()");
50215003
return;
50225004
}
5023-
(*jsonrpc_instance).event_thread.abort();
50245005
drop(Box::from_raw(jsonrpc_instance));
50255006
}
50265007

deltachat-jsonrpc/src/api/events.rs

Lines changed: 55 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
1-
use deltachat::{Event, EventType};
1+
use deltachat::{Event as CoreEvent, EventType as CoreEventType};
22
use serde::Serialize;
3-
use serde_json::{json, Value};
43
use typescript_type_def::TypeDef;
54

6-
pub fn event_to_json_rpc_notification(event: Event) -> Value {
7-
let id: JSONRPCEventType = event.typ.into();
8-
json!({
9-
"event": id,
10-
"contextId": event.id,
11-
})
5+
#[derive(Serialize, TypeDef)]
6+
pub struct Event {
7+
/// Event payload.
8+
event: EventType,
9+
10+
/// Account ID.
11+
context_id: u32,
12+
}
13+
14+
impl From<CoreEvent> for Event {
15+
fn from(event: CoreEvent) -> Self {
16+
Event {
17+
event: event.typ.into(),
18+
context_id: event.id,
19+
}
20+
}
1221
}
1322

1423
#[derive(Serialize, TypeDef)]
15-
#[serde(tag = "type", rename = "Event")]
16-
pub enum JSONRPCEventType {
24+
#[serde(tag = "type")]
25+
pub enum EventType {
1726
/// The library-user may write an informational string to the log.
1827
///
1928
/// This event should *not* be reported to the end-user using a popup or something like
@@ -286,27 +295,27 @@ pub enum JSONRPCEventType {
286295
},
287296
}
288297

289-
impl From<EventType> for JSONRPCEventType {
290-
fn from(event: EventType) -> Self {
291-
use JSONRPCEventType::*;
298+
impl From<CoreEventType> for EventType {
299+
fn from(event: CoreEventType) -> Self {
300+
use EventType::*;
292301
match event {
293-
EventType::Info(msg) => Info { msg },
294-
EventType::SmtpConnected(msg) => SmtpConnected { msg },
295-
EventType::ImapConnected(msg) => ImapConnected { msg },
296-
EventType::SmtpMessageSent(msg) => SmtpMessageSent { msg },
297-
EventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg },
298-
EventType::ImapMessageMoved(msg) => ImapMessageMoved { msg },
299-
EventType::ImapInboxIdle => ImapInboxIdle,
300-
EventType::NewBlobFile(file) => NewBlobFile { file },
301-
EventType::DeletedBlobFile(file) => DeletedBlobFile { file },
302-
EventType::Warning(msg) => Warning { msg },
303-
EventType::Error(msg) => Error { msg },
304-
EventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg },
305-
EventType::MsgsChanged { chat_id, msg_id } => MsgsChanged {
302+
CoreEventType::Info(msg) => Info { msg },
303+
CoreEventType::SmtpConnected(msg) => SmtpConnected { msg },
304+
CoreEventType::ImapConnected(msg) => ImapConnected { msg },
305+
CoreEventType::SmtpMessageSent(msg) => SmtpMessageSent { msg },
306+
CoreEventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg },
307+
CoreEventType::ImapMessageMoved(msg) => ImapMessageMoved { msg },
308+
CoreEventType::ImapInboxIdle => ImapInboxIdle,
309+
CoreEventType::NewBlobFile(file) => NewBlobFile { file },
310+
CoreEventType::DeletedBlobFile(file) => DeletedBlobFile { file },
311+
CoreEventType::Warning(msg) => Warning { msg },
312+
CoreEventType::Error(msg) => Error { msg },
313+
CoreEventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg },
314+
CoreEventType::MsgsChanged { chat_id, msg_id } => MsgsChanged {
306315
chat_id: chat_id.to_u32(),
307316
msg_id: msg_id.to_u32(),
308317
},
309-
EventType::ReactionsChanged {
318+
CoreEventType::ReactionsChanged {
310319
chat_id,
311320
msg_id,
312321
contact_id,
@@ -315,92 +324,76 @@ impl From<EventType> for JSONRPCEventType {
315324
msg_id: msg_id.to_u32(),
316325
contact_id: contact_id.to_u32(),
317326
},
318-
EventType::IncomingMsg { chat_id, msg_id } => IncomingMsg {
327+
CoreEventType::IncomingMsg { chat_id, msg_id } => IncomingMsg {
319328
chat_id: chat_id.to_u32(),
320329
msg_id: msg_id.to_u32(),
321330
},
322-
EventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch {
331+
CoreEventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch {
323332
msg_ids: msg_ids.into_iter().map(|id| id.to_u32()).collect(),
324333
},
325-
EventType::MsgsNoticed(chat_id) => MsgsNoticed {
334+
CoreEventType::MsgsNoticed(chat_id) => MsgsNoticed {
326335
chat_id: chat_id.to_u32(),
327336
},
328-
EventType::MsgDelivered { chat_id, msg_id } => MsgDelivered {
337+
CoreEventType::MsgDelivered { chat_id, msg_id } => MsgDelivered {
329338
chat_id: chat_id.to_u32(),
330339
msg_id: msg_id.to_u32(),
331340
},
332-
EventType::MsgFailed { chat_id, msg_id } => MsgFailed {
341+
CoreEventType::MsgFailed { chat_id, msg_id } => MsgFailed {
333342
chat_id: chat_id.to_u32(),
334343
msg_id: msg_id.to_u32(),
335344
},
336-
EventType::MsgRead { chat_id, msg_id } => MsgRead {
345+
CoreEventType::MsgRead { chat_id, msg_id } => MsgRead {
337346
chat_id: chat_id.to_u32(),
338347
msg_id: msg_id.to_u32(),
339348
},
340-
EventType::ChatModified(chat_id) => ChatModified {
349+
CoreEventType::ChatModified(chat_id) => ChatModified {
341350
chat_id: chat_id.to_u32(),
342351
},
343-
EventType::ChatEphemeralTimerModified { chat_id, timer } => {
352+
CoreEventType::ChatEphemeralTimerModified { chat_id, timer } => {
344353
ChatEphemeralTimerModified {
345354
chat_id: chat_id.to_u32(),
346355
timer: timer.to_u32(),
347356
}
348357
}
349-
EventType::ContactsChanged(contact) => ContactsChanged {
358+
CoreEventType::ContactsChanged(contact) => ContactsChanged {
350359
contact_id: contact.map(|c| c.to_u32()),
351360
},
352-
EventType::LocationChanged(contact) => LocationChanged {
361+
CoreEventType::LocationChanged(contact) => LocationChanged {
353362
contact_id: contact.map(|c| c.to_u32()),
354363
},
355-
EventType::ConfigureProgress { progress, comment } => {
364+
CoreEventType::ConfigureProgress { progress, comment } => {
356365
ConfigureProgress { progress, comment }
357366
}
358-
EventType::ImexProgress(progress) => ImexProgress { progress },
359-
EventType::ImexFileWritten(path) => ImexFileWritten {
367+
CoreEventType::ImexProgress(progress) => ImexProgress { progress },
368+
CoreEventType::ImexFileWritten(path) => ImexFileWritten {
360369
path: path.to_str().unwrap_or_default().to_owned(),
361370
},
362-
EventType::SecurejoinInviterProgress {
371+
CoreEventType::SecurejoinInviterProgress {
363372
contact_id,
364373
progress,
365374
} => SecurejoinInviterProgress {
366375
contact_id: contact_id.to_u32(),
367376
progress,
368377
},
369-
EventType::SecurejoinJoinerProgress {
378+
CoreEventType::SecurejoinJoinerProgress {
370379
contact_id,
371380
progress,
372381
} => SecurejoinJoinerProgress {
373382
contact_id: contact_id.to_u32(),
374383
progress,
375384
},
376-
EventType::ConnectivityChanged => ConnectivityChanged,
377-
EventType::SelfavatarChanged => SelfavatarChanged,
378-
EventType::WebxdcStatusUpdate {
385+
CoreEventType::ConnectivityChanged => ConnectivityChanged,
386+
CoreEventType::SelfavatarChanged => SelfavatarChanged,
387+
CoreEventType::WebxdcStatusUpdate {
379388
msg_id,
380389
status_update_serial,
381390
} => WebxdcStatusUpdate {
382391
msg_id: msg_id.to_u32(),
383392
status_update_serial: status_update_serial.to_u32(),
384393
},
385-
EventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted {
394+
CoreEventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted {
386395
msg_id: msg_id.to_u32(),
387396
},
388397
}
389398
}
390399
}
391-
392-
#[cfg(test)]
393-
#[test]
394-
fn generate_events_ts_types_definition() {
395-
let events = {
396-
let mut buf = Vec::new();
397-
let options = typescript_type_def::DefinitionFileOptions {
398-
root_namespace: None,
399-
..typescript_type_def::DefinitionFileOptions::default()
400-
};
401-
typescript_type_def::write_definition_file::<_, JSONRPCEventType>(&mut buf, options)
402-
.unwrap();
403-
String::from_utf8(buf).unwrap()
404-
};
405-
std::fs::write("typescript/generated/events.ts", events).unwrap();
406-
}

deltachat-jsonrpc/src/api/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use types::message::MessageObject;
4949
use types::provider_info::ProviderInfo;
5050
use types::webxdc::WebxdcMessageInfo;
5151

52+
use self::events::Event;
5253
use self::types::message::MessageLoadResult;
5354
use self::types::{
5455
chat::{BasicChat, JSONRPCChatVisibility, MuteDuration},
@@ -165,6 +166,16 @@ impl CommandApi {
165166
get_info()
166167
}
167168

169+
/// Get the next event.
170+
async fn get_next_event(&self) -> Result<Event> {
171+
let event_emitter = self.accounts.read().await.get_event_emitter();
172+
event_emitter
173+
.recv()
174+
.await
175+
.map(|event| event.into())
176+
.context("event channel is closed")
177+
}
178+
168179
// ---------------------------------------------
169180
// Account Management
170181
// ---------------------------------------------

deltachat-jsonrpc/src/webserver.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use yerpc::axum::handle_ws_rpc;
66
use yerpc::{RpcClient, RpcSession};
77

88
mod api;
9-
use api::events::event_to_json_rpc_notification;
109
use api::{Accounts, CommandApi};
1110

1211
const DEFAULT_PORT: u16 = 20808;
@@ -44,12 +43,5 @@ async fn main() -> Result<(), std::io::Error> {
4443
async fn handler(ws: WebSocketUpgrade, Extension(api): Extension<CommandApi>) -> Response {
4544
let (client, out_receiver) = RpcClient::new();
4645
let session = RpcSession::new(client.clone(), api.clone());
47-
tokio::spawn(async move {
48-
let events = api.accounts.read().await.get_event_emitter();
49-
while let Some(event) = events.recv().await {
50-
let event = event_to_json_rpc_notification(event);
51-
client.send_notification("event", Some(event)).await.ok();
52-
}
53-
});
5446
handle_ws_rpc(ws, out_receiver, session).await
5547
}

deltachat-jsonrpc/typescript/src/client.ts

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as T from "../generated/types.js";
22
import * as RPC from "../generated/jsonrpc.js";
33
import { RawClient } from "../generated/client.js";
4-
import { Event } from "../generated/events.js";
54
import { WebsocketTransport, BaseTransport, Request } from "yerpc";
65
import { TinyEmitter } from "@deltachat/tiny-emitter";
76

@@ -36,27 +35,30 @@ export class BaseDeltaChat<
3635
rpc: RawClient;
3736
account?: T.Account;
3837
private contextEmitters: { [key: number]: TinyEmitter<ContextEvents> } = {};
38+
39+
private eventTask: Promise<void>;
40+
3941
constructor(public transport: Transport) {
4042
super();
4143
this.rpc = new RawClient(this.transport);
42-
this.transport.on("request", (request: Request) => {
43-
const method = request.method;
44-
if (method === "event") {
45-
const event = request.params! as DCWireEvent<Event>;
46-
//@ts-ignore
47-
this.emit(event.event.type, event.contextId, event.event as any);
48-
this.emit("ALL", event.contextId, event.event as any);
49-
50-
if (this.contextEmitters[event.contextId]) {
51-
this.contextEmitters[event.contextId].emit(
52-
event.event.type,
53-
//@ts-ignore
54-
event.event as any
55-
);
56-
this.contextEmitters[event.contextId].emit("ALL", event.event);
57-
}
44+
this.eventTask = this.eventLoop();
45+
}
46+
47+
async eventLoop(): Promise<void> {
48+
while (true) {
49+
const event = await this.rpc.getNextEvent();
50+
this.emit(event.event.type, event.context_id, event.event as any);
51+
this.emit("ALL", event.context_id, event.event as any);
52+
53+
if (this.contextEmitters[event.context_id]) {
54+
this.contextEmitters[event.context_id].emit(
55+
event.event.type,
56+
//@ts-ignore
57+
event.event as any
58+
);
59+
this.contextEmitters[event.context_id].emit("ALL", event.event as any);
5860
}
59-
});
61+
}
6062
}
6163

6264
async listAccounts(): Promise<T.Account[]> {

deltachat-jsonrpc/typescript/src/lib.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
export * as RPC from "../generated/jsonrpc.js";
22
export * as T from "../generated/types.js";
3-
export * from "../generated/events.js";
43
export { RawClient } from "../generated/client.js";
54
export * from "./client.js";
65
export * as yerpc from "yerpc";

0 commit comments

Comments
 (0)