diff --git a/src/examples/client/streamableHttpWithSseFallbackClient.ts b/src/examples/client/streamableHttpWithSseFallbackClient.ts new file mode 100644 index 00000000..7646f0f7 --- /dev/null +++ b/src/examples/client/streamableHttpWithSseFallbackClient.ts @@ -0,0 +1,192 @@ +import { Client } from '../../client/index.js'; +import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; +import { SSEClientTransport } from '../../client/sse.js'; +import { + ListToolsRequest, + ListToolsResultSchema, + CallToolRequest, + CallToolResultSchema, + LoggingMessageNotificationSchema, +} from '../../types.js'; + +/** + * Simplified Backwards Compatible MCP Client + * + * This client demonstrates backward compatibility with both: + * 1. Modern servers using Streamable HTTP transport (protocol version 2025-03-26) + * 2. Older servers using HTTP+SSE transport (protocol version 2024-11-05) + * + * Following the MCP specification for backwards compatibility: + * - Attempts to POST an initialize request to the server URL first (modern transport) + * - If that fails with 4xx status, falls back to GET request for SSE stream (older transport) + */ + +// Command line args processing +const args = process.argv.slice(2); +const serverUrl = args[0] || 'http://localhost:3000/mcp'; + +async function main(): Promise { + console.log('MCP Backwards Compatible Client'); + console.log('==============================='); + console.log(`Connecting to server at: ${serverUrl}`); + + let client: Client; + let transport: StreamableHTTPClientTransport | SSEClientTransport; + + try { + // Try connecting with automatic transport detection + const connection = await connectWithBackwardsCompatibility(serverUrl); + client = connection.client; + transport = connection.transport; + + // Set up notification handler + client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + console.log(`Notification: ${notification.params.level} - ${notification.params.data}`); + }); + + // DEMO WORKFLOW: + // 1. List available tools + console.log('\n=== Listing Available Tools ==='); + await listTools(client); + + // 2. Call the notification tool + console.log('\n=== Starting Notification Stream ==='); + await startNotificationTool(client); + + // 3. Wait for all notifications (5 seconds) + console.log('\n=== Waiting for all notifications ==='); + await new Promise(resolve => setTimeout(resolve, 5000)); + + // 4. Disconnect + console.log('\n=== Disconnecting ==='); + await transport.close(); + console.log('Disconnected from MCP server'); + + } catch (error) { + console.error('Error running client:', error); + process.exit(1); + } +} + +/** + * Connect to an MCP server with backwards compatibility + * Following the spec for client backward compatibility + */ +async function connectWithBackwardsCompatibility(url: string): Promise<{ + client: Client, + transport: StreamableHTTPClientTransport | SSEClientTransport, + transportType: 'streamable-http' | 'sse' +}> { + console.log('1. Trying Streamable HTTP transport first...'); + + // Step 1: Try Streamable HTTP transport first + const client = new Client({ + name: 'backwards-compatible-client', + version: '1.0.0' + }); + + client.onerror = (error) => { + console.error('Client error:', error); + }; + const baseUrl = new URL(url); + + try { + // Create modern transport + const streamableTransport = new StreamableHTTPClientTransport(baseUrl); + await client.connect(streamableTransport); + + console.log('Successfully connected using modern Streamable HTTP transport.'); + return { + client, + transport: streamableTransport, + transportType: 'streamable-http' + }; + } catch (error) { + // Step 2: If transport fails, try the older SSE transport + console.log(`StreamableHttp transport connection failed: ${error}`); + console.log('2. Falling back to deprecated HTTP+SSE transport...'); + + try { + // Create SSE transport pointing to /sse endpoint + const sseTransport = new SSEClientTransport(baseUrl); + const sseClient = new Client({ + name: 'backwards-compatible-client', + version: '1.0.0' + }); + await sseClient.connect(sseTransport); + + console.log('Successfully connected using deprecated HTTP+SSE transport.'); + return { + client: sseClient, + transport: sseTransport, + transportType: 'sse' + }; + } catch (sseError) { + console.error(`Failed to connect with either transport method:\n1. Streamable HTTP error: ${error}\n2. SSE error: ${sseError}`); + throw new Error('Could not connect to server with any available transport'); + } + } +} + +/** + * List available tools on the server + */ +async function listTools(client: Client): Promise { + try { + const toolsRequest: ListToolsRequest = { + method: 'tools/list', + params: {} + }; + const toolsResult = await client.request(toolsRequest, ListToolsResultSchema); + + console.log('Available tools:'); + if (toolsResult.tools.length === 0) { + console.log(' No tools available'); + } else { + for (const tool of toolsResult.tools) { + console.log(` - ${tool.name}: ${tool.description}`); + } + } + } catch (error) { + console.log(`Tools not supported by this server: ${error}`); + } +} + +/** + * Start a notification stream by calling the notification tool + */ +async function startNotificationTool(client: Client): Promise { + try { + // Call the notification tool using reasonable defaults + const request: CallToolRequest = { + method: 'tools/call', + params: { + name: 'start-notification-stream', + arguments: { + interval: 1000, // 1 second between notifications + count: 5 // Send 5 notifications + } + } + }; + + console.log('Calling notification tool...'); + const result = await client.request(request, CallToolResultSchema); + + console.log('Tool result:'); + result.content.forEach(item => { + if (item.type === 'text') { + console.log(` ${item.text}`); + } else { + console.log(` ${item.type} content:`, item); + } + }); + } catch (error) { + console.log(`Error calling notification tool: ${error}`); + } +} + +// Start the client +main().catch((error: unknown) => { + console.error('Error running MCP client:', error); + process.exit(1); +}); \ No newline at end of file diff --git a/src/examples/server/simpleSseServer.ts b/src/examples/server/simpleSseServer.ts new file mode 100644 index 00000000..4c5aa446 --- /dev/null +++ b/src/examples/server/simpleSseServer.ts @@ -0,0 +1,169 @@ +import express, { Request, Response } from 'express'; +import { McpServer } from '../../server/mcp.js'; +import { SSEServerTransport } from '../../server/sse.js'; +import { z } from 'zod'; +import { CallToolResult } from '../../types.js'; + +/** + * This example server demonstrates the deprecated HTTP+SSE transport + * (protocol version 2024-11-05). It mainly used for testing backward compatible clients. + * + * The server exposes two endpoints: + * - /sse: For establishing the SSE stream (GET) + * - /messages: For receiving client messages (POST) + * + */ + +// Create an MCP server instance +const server = new McpServer({ + name: 'simple-sse-server', + version: '1.0.0', +}, { capabilities: { logging: {} } }); + +server.tool( + 'start-notification-stream', + 'Starts sending periodic notifications', + { + interval: z.number().describe('Interval in milliseconds between notifications').default(1000), + count: z.number().describe('Number of notifications to send').default(10), + }, + async ({ interval, count }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + let counter = 0; + + // Send the initial notification + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Starting notification stream with ${count} messages every ${interval}ms` + } + }); + + // Send periodic notifications + while (counter < count) { + counter++; + await sleep(interval); + + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Notification #${counter} at ${new Date().toISOString()}` + } + }); + } + catch (error) { + console.error("Error sending notification:", error); + } + } + + return { + content: [ + { + type: 'text', + text: `Completed sending ${count} notifications every ${interval}ms`, + } + ], + }; + } +); + +const app = express(); +app.use(express.json()); + +// Store transports by session ID +const transports: Record = {}; + +// SSE endpoint for establishing the stream +app.get('/mcp', async (req: Request, res: Response) => { + console.log('Received GET request to /sse (establishing SSE stream)'); + + try { + // Create a new SSE transport for the client + // The endpoint for POST messages is '/messages' + const transport = new SSEServerTransport('/messages', res); + + // Store the transport by session ID + const sessionId = transport.sessionId; + transports[sessionId] = transport; + + // Set up onclose handler to clean up transport when closed + transport.onclose = () => { + console.log(`SSE transport closed for session ${sessionId}`); + delete transports[sessionId]; + }; + + // Connect the transport to the MCP server + await server.connect(transport); + + // Start the SSE transport to begin streaming + // This sends an initial 'endpoint' event with the session ID in the URL + await transport.start(); + + console.log(`Established SSE stream with session ID: ${sessionId}`); + } catch (error) { + console.error('Error establishing SSE stream:', error); + if (!res.headersSent) { + res.status(500).send('Error establishing SSE stream'); + } + } +}); + +// Messages endpoint for receiving client JSON-RPC requests +app.post('/messages', async (req: Request, res: Response) => { + console.log('Received POST request to /messages'); + + // Extract session ID from URL query parameter + // In the SSE protocol, this is added by the client based on the endpoint event + const sessionId = req.query.sessionId as string | undefined; + + if (!sessionId) { + console.error('No session ID provided in request URL'); + res.status(400).send('Missing sessionId parameter'); + return; + } + + const transport = transports[sessionId]; + if (!transport) { + console.error(`No active transport found for session ID: ${sessionId}`); + res.status(404).send('Session not found'); + return; + } + + try { + // Handle the POST message with the transport + await transport.handlePostMessage(req, res, req.body); + } catch (error) { + console.error('Error handling request:', error); + if (!res.headersSent) { + res.status(500).send('Error handling request'); + } + } +}); + +// Start the server +const PORT = 3000; +app.listen(PORT, () => { + console.log(`Simple SSE Server (deprecated protocol version 2024-11-05) listening on port ${PORT}`); +}); + +// Handle server shutdown +process.on('SIGINT', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId].close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + await server.close(); + console.log('Server shutdown complete'); + process.exit(0); +}); \ No newline at end of file diff --git a/src/examples/server/sseAndStreamableHttpCompatibleServer.ts b/src/examples/server/sseAndStreamableHttpCompatibleServer.ts new file mode 100644 index 00000000..e53fcf5f --- /dev/null +++ b/src/examples/server/sseAndStreamableHttpCompatibleServer.ts @@ -0,0 +1,316 @@ +import express, { Request, Response } from 'express'; +import { randomUUID } from "node:crypto"; +import { McpServer } from '../../server/mcp.js'; +import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { SSEServerTransport } from '../../server/sse.js'; +import { z } from 'zod'; +import { CallToolResult, isInitializeRequest, JSONRPCMessage } from '../../types.js'; + +/** + * This example server demonstrates backwards compatibility with both: + * 1. The deprecated HTTP+SSE transport (protocol version 2024-11-05) + * 2. The Streamable HTTP transport (protocol version 2025-03-26) + * + * It maintains a single MCP server instance but exposes two transport options: + * - /mcp: The new Streamable HTTP endpoint (supports GET/POST/DELETE) + * - /sse: The deprecated SSE endpoint for older clients (GET to establish stream) + * - /messages: The deprecated POST endpoint for older clients (POST to send messages) + */ + +// Simple in-memory event store for resumability +class InMemoryEventStore implements EventStore { + private events: Map = new Map(); + + /** + * Generates a unique event ID for a given stream ID + */ + private generateEventId(streamId: string): string { + return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; + } + + private getStreamIdFromEventId(eventId: string): string { + const parts = eventId.split('_'); + return parts.length > 0 ? parts[0] : ''; + } + + /** + * Stores an event with a generated event ID + * Implements EventStore.storeEvent + */ + async storeEvent(streamId: string, message: JSONRPCMessage): Promise { + const eventId = this.generateEventId(streamId); + console.log(`Storing event ${eventId} for stream ${streamId}`); + this.events.set(eventId, { streamId, message }); + return eventId; + } + + /** + * Replays events that occurred after a specific event ID + * Implements EventStore.replayEventsAfter + */ + async replayEventsAfter(lastEventId: string, + { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } + ): Promise { + if (!lastEventId || !this.events.has(lastEventId)) { + console.log(`No events found for lastEventId: ${lastEventId}`); + return ''; + } + + const streamId = this.getStreamIdFromEventId(lastEventId); + if (!streamId) { + console.log(`Could not extract streamId from lastEventId: ${lastEventId}`); + return ''; + } + + let foundLastEvent = false; + let eventCount = 0; + + // Sort events by eventId for chronological ordering + const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0])); + + for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { + // Only include events from the same stream + if (eventStreamId !== streamId) { + continue; + } + + // Start sending events after we find the lastEventId + if (eventId === lastEventId) { + foundLastEvent = true; + continue; + } + + if (foundLastEvent) { + await send(eventId, message); + eventCount++; + } + } + + console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`); + return streamId; + } +} + +// Create a shared MCP server instance +const server = new McpServer({ + name: 'backwards-compatible-server', + version: '1.0.0', +}, { capabilities: { logging: {} } }); + +// Register a simple tool that sends notifications over time +server.tool( + 'start-notification-stream', + 'Starts sending periodic notifications for testing resumability', + { + interval: z.number().describe('Interval in milliseconds between notifications').default(100), + count: z.number().describe('Number of notifications to send (0 for 100)').default(50), + }, + async ({ interval, count }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + let counter = 0; + + while (count === 0 || counter < count) { + counter++; + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Periodic notification #${counter} at ${new Date().toISOString()}` + } + }); + } + catch (error) { + console.error("Error sending notification:", error); + } + // Wait for the specified interval + await sleep(interval); + } + + return { + content: [ + { + type: 'text', + text: `Started sending periodic notifications every ${interval}ms`, + } + ], + }; + } +); + +// Create Express application +const app = express(); +app.use(express.json()); + +// Store transports by session ID +const transports: Record = {}; + +//============================================================================= +// STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26) +//============================================================================= + +// Handle all MCP Streamable HTTP requests (GET, POST, DELETE) on a single endpoint +app.all('/mcp', async (req: Request, res: Response) => { + console.log(`Received ${req.method} request to /mcp`); + + try { + // Check for existing session ID + const sessionId = req.headers['mcp-session-id'] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + // Check if the transport is of the correct type + const existingTransport = transports[sessionId]; + if (existingTransport instanceof StreamableHTTPServerTransport) { + // Reuse existing transport + transport = existingTransport; + } else { + // Transport exists but is not a StreamableHTTPServerTransport (could be SSEServerTransport) + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: Session exists but uses a different transport protocol', + }, + id: null, + }); + return; + } + } else if (!sessionId && req.method === 'POST' && isInitializeRequest(req.body)) { + const eventStore = new InMemoryEventStore(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore, // Enable resumability + onsessioninitialized: (sessionId) => { + // Store the transport by session ID when session is initialized + console.log(`StreamableHTTP session initialized with ID: ${sessionId}`); + transports[sessionId] = transport; + } + }); + + // Set up onclose handler to clean up transport when closed + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && transports[sid]) { + console.log(`Transport closed for session ${sid}, removing from transports map`); + delete transports[sid]; + } + }; + + // Connect the transport to the MCP server + await server.connect(transport); + } else { + // Invalid request - no session ID or not initialization request + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + // Handle the request with the transport + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling MCP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Internal server error', + }, + id: null, + }); + } + } +}); + +//============================================================================= +// DEPRECATED HTTP+SSE TRANSPORT (PROTOCOL VERSION 2024-11-05) +//============================================================================= + +app.get('/sse', async (req: Request, res: Response) => { + console.log('Received GET request to /sse (deprecated SSE transport)'); + const transport = new SSEServerTransport('/messages', res); + transports[transport.sessionId] = transport; + res.on("close", () => { + delete transports[transport.sessionId]; + }); + await server.connect(transport); +}); + +app.post("/messages", async (req: Request, res: Response) => { + const sessionId = req.query.sessionId as string; + let transport: SSEServerTransport; + const existingTransport = transports[sessionId]; + if (existingTransport instanceof SSEServerTransport) { + // Reuse existing transport + transport = existingTransport; + } else { + // Transport exists but is not a SSEServerTransport (could be StreamableHTTPServerTransport) + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: Session exists but uses a different transport protocol', + }, + id: null, + }); + return; + } + if (transport) { + await transport.handlePostMessage(req, res); + } else { + res.status(400).send('No transport found for sessionId'); + } +}); + + +// Start the server +const PORT = 3000; +app.listen(PORT, () => { + console.log(`Backwards compatible MCP server listening on port ${PORT}`); + console.log(` +============================================== +SUPPORTED TRANSPORT OPTIONS: + +1. Streamable Http(Protocol version: 2025-03-26) + Endpoint: /mcp + Methods: GET, POST, DELETE + Usage: + - Initialize with POST to /mcp + - Establish SSE stream with GET to /mcp + - Send requests with POST to /mcp + - Terminate session with DELETE to /mcp + +2. Http + SSE (Protocol version: 2024-11-05) + Endpoints: /sse (GET) and /messages (POST) + Usage: + - Establish SSE stream with GET to /sse + - Send requests with POST to /messages?sessionId= +============================================== +`); +}); + +// Handle server shutdown +process.on('SIGINT', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId].close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + await server.close(); + console.log('Server shutdown complete'); + process.exit(0); +}); \ No newline at end of file