Skip to content

Commit d7c65f6

Browse files
committed
remove rpcService.onBeforeSend
1 parent 38b202b commit d7c65f6

File tree

9 files changed

+21
-34
lines changed

9 files changed

+21
-34
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
NOTES.txt
22
.nvmrc
33
*.swp
4+
*.txt
45
build/
56
dist/
67
node_modules/

src/rpc/service.ts

-15
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@ import { ITransport } from '../transport';
1212

1313
// Encodes, sends, decodes and receives RPC messages.
1414
export class RpcService implements IRpcService {
15-
private beforeSendHooks: Set<RpcBeforeSendHook>;
1615
private codec: ICodec;
1716
private receivers: Set<RpcReceiver>;
1817
private transport: ITransport;
1918

2019
constructor(options: IRpcServiceOptions = {}) {
21-
this.beforeSendHooks = new Set<RpcBeforeSendHook>();
2220
this.codec = options.codec;
2321
this.receivers = new Set<RpcReceiver>();
2422
this.transport = options.transport;
@@ -48,16 +46,6 @@ export class RpcService implements IRpcService {
4846
}
4947
}
5048

51-
// Register a hook to be invoked before a message is sent.
52-
public onBeforeSend(hook: RpcBeforeSendHook): IDetacher {
53-
this.beforeSendHooks.add(hook);
54-
return {
55-
detach: (): void => {
56-
this.beforeSendHooks.delete(hook);
57-
}
58-
};
59-
}
60-
6149
// Register a receiver for messages of any procedure and call type.
6250
public onReceive(receiver: RpcReceiver): IDetacher {
6351
this.receivers.add(receiver);
@@ -73,9 +61,6 @@ export class RpcService implements IRpcService {
7361
endpoint: IEndpoint,
7462
message: IRpcMessage
7563
): Promise<void> {
76-
for (const hook of this.beforeSendHooks) {
77-
await hook(endpoint, message);
78-
}
7964
const encoded = this.codec.encode(message);
8065
return await this.transport.send(endpoint, encoded);
8166
}

src/rpc/types.ts

-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ export type RpcReceiver = (endpoint: IEndpoint, message: IRpcMessage) => any;
1616
export interface IRpcService {
1717
close(): Promise<void>;
1818
listen(endpoint: IEndpoint): Promise<void>;
19-
onBeforeSend(hook: RpcBeforeSendHook): IDetacher;
2019
onReceive(receiver: RpcReceiver): IDetacher;
2120
send(
2221
endpoint: IEndpoint,

src/server.ts

+13-11
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class Server implements IServer {
3434
private lastApplied: number;
3535
public readonly log: ILog;
3636
public readonly logger: ILogger;
37-
public readonly rpcService: IRpcService;
37+
private readonly rpcService: IRpcService;
3838
private rpcServiceDetachers: Set<IDetacher>;
3939
private state: IState;
4040
public readonly stateMachine: IStateMachine;
@@ -119,6 +119,17 @@ export class Server implements IServer {
119119
await this.state.handleRpcMessage(endpoint, message);
120120
}
121121

122+
public async sendRpcMessage(endpoint: IEndpoint, message: IRpcMessage): Promise<void> {
123+
// Before responding to an RPC request, the recipient `Server`
124+
// updates persistent state on stable storage.
125+
// > *§5. "...(Updated on stable storage before responding)..."*
126+
// > *§5.6 "...Raft’s RPCs...require the recipient to persist..."*
127+
if (isRpcResponse(message)) {
128+
await this.updatePersistentState();
129+
}
130+
await this.rpcService.send(endpoint, message);
131+
}
132+
122133
// When the term is updated, it is not immediately
123134
// persisted because, as the Raft paper says, the
124135
// term is part of persistent state that is:
@@ -155,22 +166,13 @@ export class Server implements IServer {
155166
this.commitIndex = this.lastApplied = 0;
156167

157168
this.logger.debug('Loading persistent state');
169+
158170
// The current term is...
159171
// > *§5. "...initialized to zero on first boot..."*
160172
await this.currentTerm.readIfExistsElseSetAndWrite(0)
161173

162174
this.logger.debug('Starting RPC service');
163175

164-
this.rpcServiceDetachers.add(this.rpcService.onBeforeSend(async (endpoint, message) => {
165-
// Before responding to an RPC request, the recipient `Server`
166-
// updates persistent state on stable storage.
167-
// > *§5. "...(Updated on stable storage before responding)..."*
168-
// > *§5.6 "...Raft’s RPCs...require the recipient to persist..."*
169-
if (isRpcResponse(message)) {
170-
await this.updatePersistentState();
171-
}
172-
}));
173-
174176
this.rpcServiceDetachers.add(this.rpcService.onReceive(async (endpoint, message) => {
175177
await this.handleRpcMessage(endpoint, message);
176178
}));

src/state/candidate.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export class CandidateState implements IState {
108108
const lastLogIndex = this.server.log.getLastIndex();
109109

110110
for (const serverEndpoint of this.server.getServerEndpoints()) {
111-
this.server.rpcService.send(
111+
this.server.sendRpcMessage(
112112
serverEndpoint,
113113
createRequestVoteRpcRequest({
114114
candidateId: this.server.id,

src/state/follower.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ export class FollowerState implements IState {
9696
}
9797
}
9898

99-
await this.server.rpcService.send(
99+
await this.server.sendRpcMessage(
100100
endpoint,
101101
createAppendEntriesRpcResponse({
102102
// The followerCommit field is not part of the Raft spec. It is a
@@ -155,7 +155,7 @@ export class FollowerState implements IState {
155155
);
156156
}
157157

158-
await this.server.rpcService.send(
158+
await this.server.sendRpcMessage(
159159
endpoint,
160160
createRequestVoteRpcResponse({
161161
term: currentTerm,

src/state/leader.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ export class LeaderState implements IState {
8484
for (const serverId of this.server.getServerIds()) {
8585
if (this.server.log.getLastIndex() >= this.nextIndex[serverId]) {
8686
const serverEndpoint = this.server.getCluster().servers[serverId];
87-
this.server.rpcService.send(
87+
this.server.sendRpcMessage(
8888
serverEndpoint,
8989
createAppendEntriesRpcRequest({
9090
entries: this.server.log.slice(this.nextIndex[serverId]),
@@ -103,7 +103,7 @@ export class LeaderState implements IState {
103103

104104
private sendHeartbeats() {
105105
for (const serverEndpoint of this.server.getServerEndpoints()) {
106-
this.server.rpcService.send(
106+
this.server.sendRpcMessage(
107107
serverEndpoint,
108108
createAppendEntriesRpcRequest({
109109
entries: [],

src/types.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ export interface IServer {
2525
readonly id: ServerId;
2626
readonly log: ILog;
2727
readonly logger: ILogger;
28-
readonly rpcService: IRpcService;
2928
readonly stateMachine: IStateMachine;
3029
getCommitIndex(): number;
3130
getCluster(): ICluster;
@@ -36,6 +35,7 @@ export interface IServer {
3635
getState(): IState;
3736
getVotedFor(): ServerId;
3837
handleClientRequest(request: IClientRequest): Promise<IClientResponse>;
38+
sendRpcMessage(endpoint: IEndpoint, message: IRpcMessage): Promise<void>;
3939
setCurrentTerm(newTerm: number): void;
4040
setVotedFor(candidateId: ServerId): void;
4141
start(): Promise<void>;

test/server.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ describe('server', function() {
145145
spyLogWrite = sinon.spy(log, "write");
146146
spyTransportSend = sinon.spy(transport, "send");
147147
spyVotedForWrite = sinon.spy(votedFor, "write");
148-
return server.rpcService.send(peerEndpoint, message);
148+
return server.sendRpcMessage(peerEndpoint, message);
149149
});
150150

151151
if (isRpcRequest(message)) {

0 commit comments

Comments
 (0)