Skip to content

Commit 4e3ddf4

Browse files
authored
Merge pull request #345 from modelcontextprotocol/ihrpr/session_management
StreamableHttp transport - session management
2 parents 09e5d5b + 5dba947 commit 4e3ddf4

10 files changed

+527
-76
lines changed

src/client/streamableHttp.test.ts

+71
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,77 @@ describe("StreamableHTTPClientTransport", () => {
101101
expect(lastCall[1].headers.get("mcp-session-id")).toBe("test-session-id");
102102
});
103103

104+
it("should terminate session with DELETE request", async () => {
105+
// First, simulate getting a session ID
106+
const message: JSONRPCMessage = {
107+
jsonrpc: "2.0",
108+
method: "initialize",
109+
params: {
110+
clientInfo: { name: "test-client", version: "1.0" },
111+
protocolVersion: "2025-03-26"
112+
},
113+
id: "init-id"
114+
};
115+
116+
(global.fetch as jest.Mock).mockResolvedValueOnce({
117+
ok: true,
118+
status: 200,
119+
headers: new Headers({ "content-type": "text/event-stream", "mcp-session-id": "test-session-id" }),
120+
});
121+
122+
await transport.send(message);
123+
expect(transport.sessionId).toBe("test-session-id");
124+
125+
// Now terminate the session
126+
(global.fetch as jest.Mock).mockResolvedValueOnce({
127+
ok: true,
128+
status: 200,
129+
headers: new Headers()
130+
});
131+
132+
await transport.terminateSession();
133+
134+
// Verify the DELETE request was sent with the session ID
135+
const calls = (global.fetch as jest.Mock).mock.calls;
136+
const lastCall = calls[calls.length - 1];
137+
expect(lastCall[1].method).toBe("DELETE");
138+
expect(lastCall[1].headers.get("mcp-session-id")).toBe("test-session-id");
139+
140+
// The session ID should be cleared after successful termination
141+
expect(transport.sessionId).toBeUndefined();
142+
});
143+
144+
it("should handle 405 response when server doesn't support session termination", async () => {
145+
// First, simulate getting a session ID
146+
const message: JSONRPCMessage = {
147+
jsonrpc: "2.0",
148+
method: "initialize",
149+
params: {
150+
clientInfo: { name: "test-client", version: "1.0" },
151+
protocolVersion: "2025-03-26"
152+
},
153+
id: "init-id"
154+
};
155+
156+
(global.fetch as jest.Mock).mockResolvedValueOnce({
157+
ok: true,
158+
status: 200,
159+
headers: new Headers({ "content-type": "text/event-stream", "mcp-session-id": "test-session-id" }),
160+
});
161+
162+
await transport.send(message);
163+
164+
// Now terminate the session, but server responds with 405
165+
(global.fetch as jest.Mock).mockResolvedValueOnce({
166+
ok: false,
167+
status: 405,
168+
statusText: "Method Not Allowed",
169+
headers: new Headers()
170+
});
171+
172+
await expect(transport.terminateSession()).resolves.not.toThrow();
173+
});
174+
104175
it("should handle 404 response when session expires", async () => {
105176
const message: JSONRPCMessage = {
106177
jsonrpc: "2.0",

src/client/streamableHttp.ts

+46-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Transport } from "../shared/transport.js";
2-
import { isJSONRPCNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
2+
import { isInitializedNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
33
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
44
import { EventSourceParserStream } from "eventsource-parser/stream";
55

@@ -420,7 +420,7 @@ export class StreamableHTTPClientTransport implements Transport {
420420
if (response.status === 202) {
421421
// if the accepted notification is initialized, we start the SSE stream
422422
// if it's supported by the server
423-
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
423+
if (isInitializedNotification(message)) {
424424
// Start without a lastEventId since this is a fresh connection
425425
this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
426426
}
@@ -467,4 +467,48 @@ export class StreamableHTTPClientTransport implements Transport {
467467
get sessionId(): string | undefined {
468468
return this._sessionId;
469469
}
470+
471+
/**
472+
* Terminates the current session by sending a DELETE request to the server.
473+
*
474+
* Clients that no longer need a particular session
475+
* (e.g., because the user is leaving the client application) SHOULD send an
476+
* HTTP DELETE to the MCP endpoint with the Mcp-Session-Id header to explicitly
477+
* terminate the session.
478+
*
479+
* The server MAY respond with HTTP 405 Method Not Allowed, indicating that
480+
* the server does not allow clients to terminate sessions.
481+
*/
482+
async terminateSession(): Promise<void> {
483+
if (!this._sessionId) {
484+
return; // No session to terminate
485+
}
486+
487+
try {
488+
const headers = await this._commonHeaders();
489+
490+
const init = {
491+
...this._requestInit,
492+
method: "DELETE",
493+
headers,
494+
signal: this._abortController?.signal,
495+
};
496+
497+
const response = await fetch(this._url, init);
498+
499+
// We specifically handle 405 as a valid response according to the spec,
500+
// meaning the server does not support explicit session termination
501+
if (!response.ok && response.status !== 405) {
502+
throw new StreamableHTTPError(
503+
response.status,
504+
`Failed to terminate session: ${response.statusText}`
505+
);
506+
}
507+
508+
this._sessionId = undefined;
509+
} catch (error) {
510+
this.onerror?.(error as Error);
511+
throw error;
512+
}
513+
}
470514
}

src/examples/client/simpleStreamableHttp.ts

+47-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ function printHelp(): void {
4848
console.log('\nAvailable commands:');
4949
console.log(' connect [url] - Connect to MCP server (default: http://localhost:3000/mcp)');
5050
console.log(' disconnect - Disconnect from server');
51+
console.log(' terminate-session - Terminate the current session');
5152
console.log(' reconnect - Reconnect to the server');
5253
console.log(' list-tools - List available tools');
5354
console.log(' call-tool <name> [args] - Call a tool with optional JSON arguments');
@@ -76,6 +77,10 @@ function commandLoop(): void {
7677
await disconnect();
7778
break;
7879

80+
case 'terminate-session':
81+
await terminateSession();
82+
break;
83+
7984
case 'reconnect':
8085
await reconnect();
8186
break;
@@ -249,6 +254,36 @@ async function disconnect(): Promise<void> {
249254
}
250255
}
251256

257+
async function terminateSession(): Promise<void> {
258+
if (!client || !transport) {
259+
console.log('Not connected.');
260+
return;
261+
}
262+
263+
try {
264+
console.log('Terminating session with ID:', transport.sessionId);
265+
await transport.terminateSession();
266+
console.log('Session terminated successfully');
267+
268+
// Check if sessionId was cleared after termination
269+
if (!transport.sessionId) {
270+
console.log('Session ID has been cleared');
271+
sessionId = undefined;
272+
273+
// Also close the transport and clear client objects
274+
await transport.close();
275+
console.log('Transport closed after session termination');
276+
client = null;
277+
transport = null;
278+
} else {
279+
console.log('Server responded with 405 Method Not Allowed (session termination not supported)');
280+
console.log('Session ID is still active:', transport.sessionId);
281+
}
282+
} catch (error) {
283+
console.error('Error terminating session:', error);
284+
}
285+
}
286+
252287
async function reconnect(): Promise<void> {
253288
if (client) {
254289
await disconnect();
@@ -411,13 +446,24 @@ async function listResources(): Promise<void> {
411446
async function cleanup(): Promise<void> {
412447
if (client && transport) {
413448
try {
449+
// First try to terminate the session gracefully
450+
if (transport.sessionId) {
451+
try {
452+
console.log('Terminating session before exit...');
453+
await transport.terminateSession();
454+
console.log('Session terminated successfully');
455+
} catch (error) {
456+
console.error('Error terminating session:', error);
457+
}
458+
}
459+
460+
// Then close the transport
414461
await transport.close();
415462
} catch (error) {
416463
console.error('Error closing transport:', error);
417464
}
418465
}
419466

420-
421467
process.stdin.setRawMode(false);
422468
readline.close();
423469
console.log('\nGoodbye!');

src/examples/server/jsonResponseStreamableHttp.ts

+7-16
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { randomUUID } from 'node:crypto';
33
import { McpServer } from '../../server/mcp.js';
44
import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
55
import { z } from 'zod';
6-
import { CallToolResult } from '../../types.js';
6+
import { CallToolResult, isInitializeRequest } from '../../types.js';
77

88
// Create an MCP server with implementation details
99
const server = new McpServer({
@@ -95,18 +95,17 @@ app.post('/mcp', async (req: Request, res: Response) => {
9595
transport = new StreamableHTTPServerTransport({
9696
sessionIdGenerator: () => randomUUID(),
9797
enableJsonResponse: true, // Enable JSON response mode
98+
onsessioninitialized: (sessionId) => {
99+
// Store the transport by session ID when session is initialized
100+
// This avoids race conditions where requests might come in before the session is stored
101+
console.log(`Session initialized with ID: ${sessionId}`);
102+
transports[sessionId] = transport;
103+
}
98104
});
99105

100106
// Connect the transport to the MCP server BEFORE handling the request
101107
await server.connect(transport);
102-
103-
// After handling the request, if we get a session ID back, store the transport
104108
await transport.handleRequest(req, res, req.body);
105-
106-
// Store the transport by session ID for future requests
107-
if (transport.sessionId) {
108-
transports[transport.sessionId] = transport;
109-
}
110109
return; // Already handled
111110
} else {
112111
// Invalid request - no session ID or not initialization request
@@ -145,14 +144,6 @@ app.get('/mcp', async (req: Request, res: Response) => {
145144
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
146145
});
147146

148-
// Helper function to detect initialize requests
149-
function isInitializeRequest(body: unknown): boolean {
150-
if (Array.isArray(body)) {
151-
return body.some(msg => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize');
152-
}
153-
return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize';
154-
}
155-
156147
// Start the server
157148
const PORT = 3000;
158149
app.listen(PORT, () => {

src/examples/server/simpleStreamableHttp.ts

+48-14
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { randomUUID } from 'node:crypto';
33
import { McpServer } from '../../server/mcp.js';
44
import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
55
import { z } from 'zod';
6-
import { CallToolResult, GetPromptResult, JSONRPCMessage, ReadResourceResult } from '../../types.js';
6+
import { CallToolResult, GetPromptResult, isInitializeRequest, JSONRPCMessage, ReadResourceResult } from '../../types.js';
77

88
// Create a simple in-memory EventStore for resumability
99
class InMemoryEventStore implements EventStore {
@@ -36,7 +36,7 @@ class InMemoryEventStore implements EventStore {
3636
* Replays events that occurred after a specific event ID
3737
* Implements EventStore.replayEventsAfter
3838
*/
39-
async replayEventsAfter(lastEventId: string,
39+
async replayEventsAfter(lastEventId: string,
4040
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
4141
): Promise<string> {
4242
if (!lastEventId || !this.events.has(lastEventId)) {
@@ -247,19 +247,28 @@ app.post('/mcp', async (req: Request, res: Response) => {
247247
transport = new StreamableHTTPServerTransport({
248248
sessionIdGenerator: () => randomUUID(),
249249
eventStore, // Enable resumability
250+
onsessioninitialized: (sessionId) => {
251+
// Store the transport by session ID when session is initialized
252+
// This avoids race conditions where requests might come in before the session is stored
253+
console.log(`Session initialized with ID: ${sessionId}`);
254+
transports[sessionId] = transport;
255+
}
250256
});
251257

258+
// Set up onclose handler to clean up transport when closed
259+
transport.onclose = () => {
260+
const sid = transport.sessionId;
261+
if (sid && transports[sid]) {
262+
console.log(`Transport closed for session ${sid}, removing from transports map`);
263+
delete transports[sid];
264+
}
265+
};
266+
252267
// Connect the transport to the MCP server BEFORE handling the request
253268
// so responses can flow back through the same transport
254269
await server.connect(transport);
255270

256-
// After handling the request, if we get a session ID back, store the transport
257271
await transport.handleRequest(req, res, req.body);
258-
259-
// Store the transport by session ID for future requests
260-
if (transport.sessionId) {
261-
transports[transport.sessionId] = transport;
262-
}
263272
return; // Already handled
264273
} else {
265274
// Invalid request - no session ID or not initialization request
@@ -312,13 +321,26 @@ app.get('/mcp', async (req: Request, res: Response) => {
312321
await transport.handleRequest(req, res);
313322
});
314323

315-
// Helper function to detect initialize requests
316-
function isInitializeRequest(body: unknown): boolean {
317-
if (Array.isArray(body)) {
318-
return body.some(msg => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize');
324+
// Handle DELETE requests for session termination (according to MCP spec)
325+
app.delete('/mcp', async (req: Request, res: Response) => {
326+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
327+
if (!sessionId || !transports[sessionId]) {
328+
res.status(400).send('Invalid or missing session ID');
329+
return;
319330
}
320-
return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize';
321-
}
331+
332+
console.log(`Received session termination request for session ${sessionId}`);
333+
334+
try {
335+
const transport = transports[sessionId];
336+
await transport.handleRequest(req, res);
337+
} catch (error) {
338+
console.error('Error handling session termination:', error);
339+
if (!res.headersSent) {
340+
res.status(500).send('Error processing session termination');
341+
}
342+
}
343+
});
322344

323345
// Start the server
324346
const PORT = 3000;
@@ -351,6 +373,18 @@ app.listen(PORT, () => {
351373
// Handle server shutdown
352374
process.on('SIGINT', async () => {
353375
console.log('Shutting down server...');
376+
377+
// Close all active transports to properly clean up resources
378+
for (const sessionId in transports) {
379+
try {
380+
console.log(`Closing transport for session ${sessionId}`);
381+
await transports[sessionId].close();
382+
delete transports[sessionId];
383+
} catch (error) {
384+
console.error(`Error closing transport for session ${sessionId}:`, error);
385+
}
386+
}
354387
await server.close();
388+
console.log('Server shutdown complete');
355389
process.exit(0);
356390
});

0 commit comments

Comments
 (0)