Skip to content

StreamableHttp transport - backwards compatibility examples #347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions src/examples/client/streamableHttpWithSseFallbackClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import { Client } from '../../client/index.js';
import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js';
import { SSEClientTransport } from '../../client/sse.js';
import {
ListToolsRequest,
ListToolsResultSchema,
CallToolRequest,
CallToolResultSchema,
LoggingMessageNotificationSchema,
} from '../../types.js';

/**
* Simplified Backwards Compatible MCP Client
*
* This client demonstrates backward compatibility with both:
* 1. Modern servers using Streamable HTTP transport (protocol version 2025-03-26)
* 2. Older servers using HTTP+SSE transport (protocol version 2024-11-05)
*
* Following the MCP specification for backwards compatibility:
* - Attempts to POST an initialize request to the server URL first (modern transport)
* - If that fails with 4xx status, falls back to GET request for SSE stream (older transport)
*/

// Command line args processing
const args = process.argv.slice(2);
const serverUrl = args[0] || 'http://localhost:3000/mcp';

async function main(): Promise<void> {
console.log('MCP Backwards Compatible Client');
console.log('===============================');
console.log(`Connecting to server at: ${serverUrl}`);

let client: Client;
let transport: StreamableHTTPClientTransport | SSEClientTransport;

try {
// Try connecting with automatic transport detection
const connection = await connectWithBackwardsCompatibility(serverUrl);
client = connection.client;
transport = connection.transport;

// Set up notification handler
client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => {
console.log(`Notification: ${notification.params.level} - ${notification.params.data}`);
});

// DEMO WORKFLOW:
// 1. List available tools
console.log('\n=== Listing Available Tools ===');
await listTools(client);

// 2. Call the notification tool
console.log('\n=== Starting Notification Stream ===');
await startNotificationTool(client);

// 3. Wait for all notifications (5 seconds)
console.log('\n=== Waiting for all notifications ===');
await new Promise(resolve => setTimeout(resolve, 5000));

// 4. Disconnect
console.log('\n=== Disconnecting ===');
await transport.close();
console.log('Disconnected from MCP server');

} catch (error) {
console.error('Error running client:', error);
process.exit(1);
}
}

/**
* Connect to an MCP server with backwards compatibility
* Following the spec for client backward compatibility
*/
async function connectWithBackwardsCompatibility(url: string): Promise<{
client: Client,
transport: StreamableHTTPClientTransport | SSEClientTransport,
transportType: 'streamable-http' | 'sse'
}> {
console.log('1. Trying Streamable HTTP transport first...');

// Step 1: Try Streamable HTTP transport first
const client = new Client({
name: 'backwards-compatible-client',
version: '1.0.0'
});

client.onerror = (error) => {
console.error('Client error:', error);
};
const baseUrl = new URL(url);

try {
// Create modern transport
const streamableTransport = new StreamableHTTPClientTransport(baseUrl);
await client.connect(streamableTransport);

console.log('Successfully connected using modern Streamable HTTP transport.');
return {
client,
transport: streamableTransport,
transportType: 'streamable-http'
};
} catch (error) {
// Step 2: If transport fails, try the older SSE transport
console.log(`StreamableHttp transport connection failed: ${error}`);
console.log('2. Falling back to deprecated HTTP+SSE transport...');

try {
// Create SSE transport pointing to /sse endpoint
const sseTransport = new SSEClientTransport(baseUrl);
const sseClient = new Client({
name: 'backwards-compatible-client',
version: '1.0.0'
});
await sseClient.connect(sseTransport);

console.log('Successfully connected using deprecated HTTP+SSE transport.');
return {
client: sseClient,
transport: sseTransport,
transportType: 'sse'
};
} catch (sseError) {
console.error(`Failed to connect with either transport method:\n1. Streamable HTTP error: ${error}\n2. SSE error: ${sseError}`);
throw new Error('Could not connect to server with any available transport');
}
}
}

/**
* List available tools on the server
*/
async function listTools(client: Client): Promise<void> {
try {
const toolsRequest: ListToolsRequest = {
method: 'tools/list',
params: {}
};
const toolsResult = await client.request(toolsRequest, ListToolsResultSchema);

console.log('Available tools:');
if (toolsResult.tools.length === 0) {
console.log(' No tools available');
} else {
for (const tool of toolsResult.tools) {
console.log(` - ${tool.name}: ${tool.description}`);
}
}
} catch (error) {
console.log(`Tools not supported by this server: ${error}`);
}
}

/**
* Start a notification stream by calling the notification tool
*/
async function startNotificationTool(client: Client): Promise<void> {
try {
// Call the notification tool using reasonable defaults
const request: CallToolRequest = {
method: 'tools/call',
params: {
name: 'start-notification-stream',
arguments: {
interval: 1000, // 1 second between notifications
count: 5 // Send 5 notifications
}
}
};

console.log('Calling notification tool...');
const result = await client.request(request, CallToolResultSchema);

console.log('Tool result:');
result.content.forEach(item => {
if (item.type === 'text') {
console.log(` ${item.text}`);
} else {
console.log(` ${item.type} content:`, item);
}
});
} catch (error) {
console.log(`Error calling notification tool: ${error}`);
}
}

// Start the client
main().catch((error: unknown) => {
console.error('Error running MCP client:', error);
process.exit(1);
});
169 changes: 169 additions & 0 deletions src/examples/server/simpleSseServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import express, { Request, Response } from 'express';
import { McpServer } from '../../server/mcp.js';
import { SSEServerTransport } from '../../server/sse.js';
import { z } from 'zod';
import { CallToolResult } from '../../types.js';

/**
* This example server demonstrates the deprecated HTTP+SSE transport
* (protocol version 2024-11-05). It mainly used for testing backward compatible clients.
*
* The server exposes two endpoints:
* - /sse: For establishing the SSE stream (GET)
* - /messages: For receiving client messages (POST)
*
*/

// Create an MCP server instance
const server = new McpServer({
name: 'simple-sse-server',
version: '1.0.0',
}, { capabilities: { logging: {} } });

server.tool(
'start-notification-stream',
'Starts sending periodic notifications',
{
interval: z.number().describe('Interval in milliseconds between notifications').default(1000),
count: z.number().describe('Number of notifications to send').default(10),
},
async ({ interval, count }, { sendNotification }): Promise<CallToolResult> => {
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
let counter = 0;

// Send the initial notification
await sendNotification({
method: "notifications/message",
params: {
level: "info",
data: `Starting notification stream with ${count} messages every ${interval}ms`
}
});

// Send periodic notifications
while (counter < count) {
counter++;
await sleep(interval);

try {
await sendNotification({
method: "notifications/message",
params: {
level: "info",
data: `Notification #${counter} at ${new Date().toISOString()}`
}
});
}
catch (error) {
console.error("Error sending notification:", error);
}
}

return {
content: [
{
type: 'text',
text: `Completed sending ${count} notifications every ${interval}ms`,
}
],
};
}
);

const app = express();
app.use(express.json());

// Store transports by session ID
const transports: Record<string, SSEServerTransport> = {};

// SSE endpoint for establishing the stream
app.get('/mcp', async (req: Request, res: Response) => {
console.log('Received GET request to /sse (establishing SSE stream)');

try {
// Create a new SSE transport for the client
// The endpoint for POST messages is '/messages'
const transport = new SSEServerTransport('/messages', res);

// Store the transport by session ID
const sessionId = transport.sessionId;
transports[sessionId] = transport;

// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
console.log(`SSE transport closed for session ${sessionId}`);
delete transports[sessionId];
};

// Connect the transport to the MCP server
await server.connect(transport);

// Start the SSE transport to begin streaming
// This sends an initial 'endpoint' event with the session ID in the URL
await transport.start();

console.log(`Established SSE stream with session ID: ${sessionId}`);
} catch (error) {
console.error('Error establishing SSE stream:', error);
if (!res.headersSent) {
res.status(500).send('Error establishing SSE stream');
}
}
});

// Messages endpoint for receiving client JSON-RPC requests
app.post('/messages', async (req: Request, res: Response) => {
console.log('Received POST request to /messages');

// Extract session ID from URL query parameter
// In the SSE protocol, this is added by the client based on the endpoint event
const sessionId = req.query.sessionId as string | undefined;

if (!sessionId) {
console.error('No session ID provided in request URL');
res.status(400).send('Missing sessionId parameter');
return;
}

const transport = transports[sessionId];
if (!transport) {
console.error(`No active transport found for session ID: ${sessionId}`);
res.status(404).send('Session not found');
return;
}

try {
// Handle the POST message with the transport
await transport.handlePostMessage(req, res, req.body);
} catch (error) {
console.error('Error handling request:', error);
if (!res.headersSent) {
res.status(500).send('Error handling request');
}
}
});

// Start the server
const PORT = 3000;
app.listen(PORT, () => {
console.log(`Simple SSE Server (deprecated protocol version 2024-11-05) listening on port ${PORT}`);
});

// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
await server.close();
console.log('Server shutdown complete');
process.exit(0);
});
Loading