Skip to content

Commit 22f7ade

Browse files
committed
Change the backend and frontend interaction
The backend doesn't implement WebXdc anymore, but WebXdcMulti, which sends multiple messsages to the frontend at once in case of a (re)connection. We also introduce some tests for the webxdc frontend. To this end we've abstracted out a transport so we can put in a fake transport during tests.
1 parent 74316ab commit 22f7ade

File tree

8 files changed

+273
-96
lines changed

8 files changed

+273
-96
lines changed

backend/app.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import express, { Express } from "express";
22
import expressWs from "express-ws";
3-
import { createProcessor, IProcessor } from "./message";
4-
import { JsonValue, WebXdc, ReceivedUpdate } from "../types/webxdc-types";
3+
import { createProcessor, IProcessor, WebXdcMulti } from "./message";
4+
import { JsonValue, ReceivedUpdate } from "../types/webxdc-types";
55

66
export type WebXdcDescription = {
77
name: string;
@@ -58,7 +58,7 @@ export class Instance {
5858
constructor(
5959
public app: expressWs.Application,
6060
public port: number,
61-
public webXdc: WebXdc
61+
public webXdc: WebXdcMulti
6262
) {}
6363

6464
start() {
@@ -116,9 +116,9 @@ export class Instances {
116116
if (isSendUpdateMessage(parsed)) {
117117
instance.webXdc.sendUpdate(parsed.update, "update");
118118
} else if (isSetUpdateListenerMessage(parsed)) {
119-
instance.webXdc.setUpdateListener((update) => {
120-
console.log("gossip", update);
121-
ws.send(JSON.stringify(update));
119+
instance.webXdc.setUpdateListenerMulti((updates) => {
120+
console.log("gossip", updates);
121+
ws.send(JSON.stringify(updates));
122122
}, parsed.serial);
123123
} else {
124124
throw new Error(`Unknown message: ${JSON.stringify(parsed)}`);

backend/message.test.ts

+22-22
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ test("distribute to self", () => {
77

88
const client0Heard: ReceivedUpdate<string>[] = [];
99

10-
client0.setUpdateListener((update) => {
11-
client0Heard.push(update);
10+
client0.setUpdateListenerMulti((updates) => {
11+
client0Heard.push(...updates);
1212
}, 0);
1313

1414
client0.sendUpdate({ payload: "Hello" }, "update");
@@ -26,12 +26,12 @@ test("distribute to self and other", () => {
2626
const client0Heard: ReceivedUpdate<string>[] = [];
2727
const client1Heard: ReceivedUpdate<string>[] = [];
2828

29-
client0.setUpdateListener((update) => {
30-
client0Heard.push(update);
29+
client0.setUpdateListenerMulti((updates) => {
30+
client0Heard.push(...updates);
3131
}, 0);
3232

33-
client1.setUpdateListener((update) => {
34-
client1Heard.push(update);
33+
client1.setUpdateListenerMulti((updates) => {
34+
client1Heard.push(...updates);
3535
}, 0);
3636

3737
client0.sendUpdate({ payload: "Hello" }, "update");
@@ -54,12 +54,12 @@ test("setUpdateListener serial should skip older", () => {
5454
const client0Heard: ReceivedUpdate<string>[] = [];
5555
const client1Heard: ReceivedUpdate<string>[] = [];
5656

57-
client0.setUpdateListener((update) => {
58-
client0Heard.push(update);
57+
client0.setUpdateListenerMulti((updates) => {
58+
client0Heard.push(...updates);
5959
}, 0);
6060

61-
client1.setUpdateListener((update) => {
62-
client1Heard.push(update);
61+
client1.setUpdateListenerMulti((updates) => {
62+
client1Heard.push(...updates);
6363
}, 1);
6464

6565
client0.sendUpdate({ payload: "Hello" }, "update");
@@ -81,8 +81,8 @@ test("other starts listening later", () => {
8181
const client0Heard: ReceivedUpdate<string>[] = [];
8282
const client1Heard: ReceivedUpdate<string>[] = [];
8383

84-
client0.setUpdateListener((update) => {
85-
client0Heard.push(update);
84+
client0.setUpdateListenerMulti((updates) => {
85+
client0Heard.push(...updates);
8686
}, 0);
8787

8888
client0.sendUpdate({ payload: "Hello" }, "update");
@@ -95,8 +95,8 @@ test("other starts listening later", () => {
9595
// we only join later, so we haven't heard a thing yet
9696
expect(client1Heard).toMatchObject([]);
9797

98-
client1.setUpdateListener((update) => {
99-
client1Heard.push(update);
98+
client1.setUpdateListenerMulti((updates) => {
99+
client1Heard.push(...updates);
100100
}, 0);
101101

102102
expect(client0Heard).toMatchObject([
@@ -116,8 +116,8 @@ test("client is created later and needs to catch up", () => {
116116
const client0Heard: ReceivedUpdate<string>[] = [];
117117
const client1Heard: ReceivedUpdate<string>[] = [];
118118

119-
client0.setUpdateListener((update) => {
120-
client0Heard.push(update);
119+
client0.setUpdateListenerMulti((updates) => {
120+
client0Heard.push(...updates);
121121
}, 0);
122122

123123
client0.sendUpdate({ payload: "Hello" }, "update");
@@ -132,8 +132,8 @@ test("client is created later and needs to catch up", () => {
132132
expect(client1Heard).toMatchObject([]);
133133
const client1 = processor.createClient("3002");
134134

135-
client1.setUpdateListener((update) => {
136-
client1Heard.push(update);
135+
client1.setUpdateListenerMulti((updates) => {
136+
client1Heard.push(...updates);
137137
}, 0);
138138

139139
expect(client0Heard).toMatchObject([
@@ -154,8 +154,8 @@ test("other starts listening later but is partially caught up", () => {
154154
const client0Heard: ReceivedUpdate<string>[] = [];
155155
const client1Heard: ReceivedUpdate<string>[] = [];
156156

157-
client0.setUpdateListener((update) => {
158-
client0Heard.push(update);
157+
client0.setUpdateListenerMulti((updates) => {
158+
client0Heard.push(...updates);
159159
}, 0);
160160

161161
client0.sendUpdate({ payload: "Hello" }, "update");
@@ -168,8 +168,8 @@ test("other starts listening later but is partially caught up", () => {
168168
expect(client1Heard).toMatchObject([]);
169169

170170
// start at 1, as we're already partially caught up
171-
client1.setUpdateListener((update) => {
172-
client1Heard.push(update);
171+
client1.setUpdateListenerMulti((updates) => {
172+
client1Heard.push(...updates);
173173
}, 1);
174174

175175
expect(client0Heard).toMatchObject([

backend/message.ts

+27-24
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
import type {
2-
WebXdc,
32
Update,
4-
UpdateListener,
53
ReceivedUpdate,
64
JsonValue,
5+
SendUpdate,
76
} from "../types/webxdc-types";
87

8+
type UpdateListenerMulti<T> = (updates: ReceivedUpdate<T>[]) => void;
9+
10+
type SetUpdateListenerMulti<T> = (
11+
listener: UpdateListenerMulti<T>,
12+
serial: number
13+
) => Promise<void>;
14+
15+
export type WebXdcMulti<T = JsonValue> = {
16+
sendUpdate: SendUpdate<T>;
17+
setUpdateListenerMulti: SetUpdateListenerMulti<T>;
18+
};
19+
920
export interface IProcessor<T = JsonValue> {
10-
createClient(name: string): WebXdc<T>;
21+
createClient(name: string): WebXdcMulti<T>;
1122
}
1223

13-
class Client<T> implements WebXdc<T> {
14-
updateListener: UpdateListener<T> | null = null;
24+
class Client<T> implements WebXdcMulti<T> {
25+
updateListener: UpdateListenerMulti<T> | null = null;
1526
updateSerial: number | null = null;
1627

1728
constructor(public processor: Processor<T>, public name: string) {}
@@ -20,13 +31,14 @@ class Client<T> implements WebXdc<T> {
2031
this.processor.distribute(update, descr);
2132
}
2233

23-
async setUpdateListener(
24-
listener: UpdateListener<T>,
34+
async setUpdateListenerMulti(
35+
listener: UpdateListenerMulti<T>,
2536
serial: number
2637
): Promise<void> {
2738
this.updateListener = listener;
2839
this.updateSerial = serial;
2940
this.processor.catchUp(listener, serial);
41+
return Promise.resolve();
3042
}
3143

3244
receiveUpdate(update: ReceivedUpdate<T>) {
@@ -37,15 +49,7 @@ class Client<T> implements WebXdc<T> {
3749
if (update.serial <= this.updateSerial) {
3850
return;
3951
}
40-
this.updateListener(update);
41-
}
42-
43-
get selfAddr() {
44-
return this.name;
45-
}
46-
47-
get selfName() {
48-
return this.name;
52+
this.updateListener([update]);
4953
}
5054
}
5155

@@ -54,7 +58,7 @@ class Processor<T> implements IProcessor<T> {
5458
currentSerial: number = 0;
5559
updates: ReceivedUpdate<T>[] = [];
5660

57-
createClient(name: string): WebXdc<T> {
61+
createClient(name: string): WebXdcMulti<T> {
5862
const client = new Client(this, name);
5963
this.clients.push(client);
6064
return client;
@@ -73,14 +77,13 @@ class Processor<T> implements IProcessor<T> {
7377
}
7478
}
7579

76-
catchUp(updateListener: UpdateListener<T>, serial: number) {
80+
catchUp(updateListener: UpdateListenerMulti<T>, serial: number) {
7781
const updates = this.updates;
78-
for (const update of updates.slice(serial)) {
79-
updateListener({
80-
...update,
81-
max_serial: updates.length,
82-
});
83-
}
82+
updateListener(
83+
updates
84+
.slice(serial)
85+
.map((update) => ({ ...update, max_serial: updates.length }))
86+
);
8487
}
8588
}
8689

jest.config.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
module.exports = {
22
testEnvironment: "node",
33
testMatch: [
4-
"<rootDir>/backend/**/__tests__/**/*.{ts,tsx}",
54
"<rootDir>/backend/**/?(*.)(test).{ts,tsx}",
5+
"<rootDir>/sim/**/?(*.)(test).{ts,tsx}",
66
],
77
// SWC instead of ts-jest
88
transform: {

sim/create.ts

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { WebXdc, JsonValue, ReceivedUpdate } from "../types/webxdc-types";
2+
3+
export type TransportMessageCallback = (data: JsonValue) => void;
4+
export type TransportConnectCallback = () => void;
5+
6+
export type Transport = {
7+
send(data: JsonValue): void;
8+
onMessage(callback: TransportMessageCallback): void;
9+
onConnect(callback: TransportConnectCallback): void;
10+
address(): string;
11+
name(): string;
12+
};
13+
14+
export function createWebXdc(transport: Transport): WebXdc {
15+
let resolveUpdateListenerPromise: (() => void) | null = null;
16+
17+
const webXdc: WebXdc = {
18+
sendUpdate: (update, descr) => {
19+
transport.send({ type: "sendUpdate", update, descr });
20+
console.info("send", { update, descr });
21+
},
22+
setUpdateListener: (listener, serial = 0): Promise<void> => {
23+
transport.onMessage((data) => {
24+
const receivedUpdates: ReceivedUpdate<any>[] = data as any;
25+
console.info("recv", receivedUpdates);
26+
for (const update of receivedUpdates) {
27+
listener(update);
28+
}
29+
if (resolveUpdateListenerPromise != null) {
30+
resolveUpdateListenerPromise();
31+
resolveUpdateListenerPromise = null;
32+
}
33+
});
34+
transport.onConnect(() => {
35+
transport.send({ type: "setUpdateListener", serial });
36+
});
37+
const promise = new Promise<void>((resolve) => {
38+
resolveUpdateListenerPromise = resolve;
39+
});
40+
return promise;
41+
},
42+
selfAddr: transport.address(),
43+
selfName: transport.name(),
44+
};
45+
return webXdc;
46+
}

0 commit comments

Comments
 (0)