Skip to content

Commit 3381a59

Browse files
authored
Merge pull request #67 from webxdc/sk/realtime2
Feat: Add realtime API
2 parents 0b9fafe + 2a041ec commit 3381a59

File tree

7 files changed

+260
-17
lines changed

7 files changed

+260
-17
lines changed

backend/instance.ts

+21
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ type SendUpdateMessage = {
2121
update: ReceivedStatusUpdate<any>;
2222
};
2323

24+
type SendRealtimeMessage = {
25+
type: "sendRealtime";
26+
data: Uint8Array;
27+
};
28+
2429
type SetUpdateListenerMessage = {
2530
type: "setUpdateListener";
2631
serial: number;
@@ -125,6 +130,12 @@ export class Instances {
125130
// XXX should validate parsed
126131
if (isSendUpdateMessage(parsed)) {
127132
instance.webXdc.sendUpdate(parsed.update, "");
133+
} else if (isSendRealtimeMessage(parsed)) {
134+
instance.webXdc.sendRealtimeData(parsed.data);
135+
} else if (isSetRealtimeListenerMessage(parsed)) {
136+
instance.webXdc.connectRealtime((data) => {
137+
return broadcast(wss, JSON.stringify({ type: "realtime", data }));
138+
});
128139
} else if (isSetUpdateListenerMessage(parsed)) {
129140
instance.webXdc.connect(
130141
(updates) => {
@@ -216,6 +227,16 @@ function isSendUpdateMessage(value: any): value is SendUpdateMessage {
216227
return value.type === "sendUpdate";
217228
}
218229

230+
function isSendRealtimeMessage(value: any): value is SendRealtimeMessage {
231+
return value.type === "sendRealtime";
232+
}
233+
234+
function isSetRealtimeListenerMessage(
235+
value: any,
236+
): value is { type: "setRealtimeListener" } {
237+
return value.type === "setRealtimeListener";
238+
}
239+
219240
function isSetUpdateListenerMessage(
220241
value: any,
221242
): value is SetUpdateListenerMessage {

backend/message.test.ts

+27
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,33 @@ test("distribute to self", () => {
5353
]);
5454
});
5555

56+
test("Send realtime", () => {
57+
const [getMessages, onMessage] = track();
58+
const processor = createProcessor(onMessage);
59+
const client0 = processor.createClient("3001");
60+
const client1 = processor.createClient("3002");
61+
62+
const client0Heard: string[] = [];
63+
const client1Heard: string[] = [];
64+
65+
const decoder = new TextDecoder();
66+
client0.connectRealtime((data) => {
67+
client0Heard.push(decoder.decode(data));
68+
return true;
69+
});
70+
client1.connectRealtime((data) => {
71+
client1Heard.push(decoder.decode(data));
72+
return true;
73+
});
74+
75+
const encoder = new TextEncoder();
76+
77+
client0.sendRealtimeData(new Uint8Array(encoder.encode("hi")));
78+
79+
expect(client1Heard).toMatchObject(["hi"]);
80+
expect(client0Heard).toMatchObject([]);
81+
});
82+
5683
test("distribute to self and other", () => {
5784
const [getMessages, onMessage] = track();
5885
const processor = createProcessor(onMessage);

backend/message.ts

+57-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import type {
1+
import {
2+
RealtimeListener as WebxdcRealtimeListener,
23
ReceivedStatusUpdate,
34
SendingStatusUpdate,
45
Webxdc,
@@ -10,6 +11,7 @@ type UpdateListenerMulti = (updates: ReceivedStatusUpdate<any>[]) => boolean;
1011

1112
type ClearListener = () => boolean;
1213
type DeleteListener = () => boolean;
14+
type RTListener = (data: Uint8Array) => boolean;
1315

1416
type Connect = (
1517
updateListener: UpdateListenerMulti,
@@ -20,7 +22,9 @@ type Connect = (
2022

2123
export type WebXdcMulti = {
2224
connect: Connect;
25+
connectRealtime: (listener: RTListener) => void;
2326
sendUpdate: Webxdc<any>["sendUpdate"];
27+
sendRealtimeData: (data: Uint8Array) => void;
2428
};
2529

2630
export type OnMessage = (message: Message) => void;
@@ -33,6 +37,7 @@ export interface IProcessor {
3337

3438
class Client implements WebXdcMulti {
3539
updateListener: UpdateListenerMulti | null = null;
40+
realtimeListener: RTListener | null = null;
3641
clearListener: ClearListener | null = null;
3742
updateSerial: number | null = null;
3843
deleteListener: DeleteListener | null = null;
@@ -46,6 +51,35 @@ class Client implements WebXdcMulti {
4651
this.processor.distribute(this.id, update);
4752
}
4853

54+
sendRealtimeData(data: Uint8Array) {
55+
this.processor.distributeRealtime(this.id, data);
56+
}
57+
58+
connectRealtime(listener: RTListener) {
59+
this.processor.onMessage({
60+
type: "connect-realtime",
61+
instanceId: this.id,
62+
instanceColor: getColorForId(this.id),
63+
timestamp: Date.now(),
64+
});
65+
66+
const realtimeListener = (data: Uint8Array) => {
67+
const hasReceived = listener(data);
68+
if (hasReceived) {
69+
this.processor.onMessage({
70+
type: "realtime-received",
71+
data,
72+
instanceId: this.id,
73+
instanceColor: getColorForId(this.id),
74+
timestamp: Date.now(),
75+
});
76+
}
77+
return hasReceived;
78+
};
79+
80+
this.realtimeListener = realtimeListener;
81+
}
82+
4983
connect(
5084
listener: UpdateListenerMulti,
5185
serial: number,
@@ -108,6 +142,13 @@ class Client implements WebXdcMulti {
108142
this.updateListener([update]);
109143
}
110144

145+
receiveRealtime(data: Uint8Array) {
146+
if (this.realtimeListener == null) {
147+
return;
148+
}
149+
this.realtimeListener(data);
150+
}
151+
111152
clear() {
112153
if (
113154
this.clearListener == null ||
@@ -148,6 +189,21 @@ class Processor implements IProcessor {
148189
this.clients.splice(client_index, 1);
149190
}
150191

192+
distributeRealtime(instanceId: string, data: Uint8Array) {
193+
this.onMessage({
194+
type: "realtime-sent",
195+
instanceId: instanceId,
196+
instanceColor: getColorForId(instanceId),
197+
data,
198+
timestamp: Date.now(),
199+
});
200+
for (const client of this.clients) {
201+
if (client.id != instanceId) {
202+
client.receiveRealtime(data);
203+
}
204+
}
205+
}
206+
151207
distribute(instanceId: string, update: SendingStatusUpdate<any>) {
152208
this.currentSerial++;
153209
const receivedUpdate: ReceivedStatusUpdate<any> = {

0 commit comments

Comments
 (0)