Skip to content

Stateless only #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions api/server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { z } from "zod";
import { initializeMcpApiHandler } from "../lib/mcp-api-handler";

const handler = initializeMcpApiHandler(
export default initializeMcpApiHandler(
(server) => {
// Add more tools, resources, and prompts here
server.tool("echo", { message: z.string() }, async ({ message }) => ({
Expand All @@ -18,5 +18,3 @@ const handler = initializeMcpApiHandler(
},
}
);

export default handler;
297 changes: 43 additions & 254 deletions lib/mcp-api-handler.ts
Original file line number Diff line number Diff line change
@@ -1,279 +1,68 @@
import getRawBody from "raw-body";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from "http";
import { createClient } from "redis";
import { Socket } from "net";
import { Readable } from "stream";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { IncomingMessage, ServerResponse } from "http";
import { ServerOptions } from "@modelcontextprotocol/sdk/server/index.js";
import vercelJson from "../vercel.json";

interface SerializedRequest {
requestId: string;
url: string;
method: string;
body: string;
headers: IncomingHttpHeaders;
}

export function initializeMcpApiHandler(
initializeServer: (server: McpServer) => void,
serverOptions: ServerOptions = {}
) {
const maxDuration =
vercelJson?.functions?.["api/server.ts"]?.maxDuration || 800;
const redisUrl = process.env.REDIS_URL || process.env.KV_URL;
if (!redisUrl) {
throw new Error("REDIS_URL environment variable is not set");
}
const redis = createClient({
url: redisUrl,
});
const redisPublisher = createClient({
url: redisUrl,
let statelessServer: McpServer;
const statelessTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});
redis.on("error", (err) => {
console.error("Redis error", err);
});
redisPublisher.on("error", (err) => {
console.error("Redis error", err);
});
const redisPromise = Promise.all([redis.connect(), redisPublisher.connect()]);

let servers: McpServer[] = [];

return async function mcpApiHandler(
req: IncomingMessage,
res: ServerResponse
) {
await redisPromise;
const url = new URL(req.url || "", "https://example.com");
if (url.pathname === "/sse") {
console.log("Got new SSE connection");

const transport = new SSEServerTransport("/message", res);
const sessionId = transport.sessionId;
const server = new McpServer(
{
name: "mcp-typescript server on vercel",
version: "0.1.0",
},
serverOptions
);
initializeServer(server);

servers.push(server);

server.server.onclose = () => {
console.log("SSE connection closed");
servers = servers.filter((s) => s !== server);
};

let logs: {
type: "log" | "error";
messages: string[];
}[] = [];
// This ensures that we logs in the context of the right invocation since the subscriber
// is not itself invoked in request context.
function logInContext(severity: "log" | "error", ...messages: string[]) {
logs.push({
type: severity,
messages,
});
}

// Handles messages originally received via /message
const handleMessage = async (message: string) => {
console.log("Received message from Redis", message);
logInContext("log", "Received message from Redis", message);
const request = JSON.parse(message) as SerializedRequest;

// Make in IncomingMessage object because that is what the SDK expects.
const req = createFakeIncomingMessage({
method: request.method,
url: request.url,
headers: request.headers,
body: request.body,
});
const syntheticRes = new ServerResponse(req);
let status = 100;
let body = "";
syntheticRes.writeHead = (statusCode: number) => {
status = statusCode;
return syntheticRes;
};
syntheticRes.end = (b: unknown) => {
body = b as string;
return syntheticRes;
};
await transport.handlePostMessage(req, syntheticRes);

await redisPublisher.publish(
`responses:${sessionId}:${request.requestId}`,
if (url.pathname === "/mcp") {
if (req.method === "GET") {
console.log("Received GET MCP request");
res.writeHead(405).end(
JSON.stringify({
status,
body,
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed.",
},
id: null,
})
);

if (status >= 200 && status < 300) {
logInContext(
"log",
`Request ${sessionId}:${request.requestId} succeeded: ${body}`
);
} else {
logInContext(
"error",
`Message for ${sessionId}:${request.requestId} failed with status ${status}: ${body}`
);
}
};

const interval = setInterval(() => {
for (const log of logs) {
console[log.type].call(console, ...log.messages);
}
logs = [];
}, 100);

await redis.subscribe(`requests:${sessionId}`, handleMessage);
console.log(`Subscribed to requests:${sessionId}`);

let timeout: NodeJS.Timeout;
let resolveTimeout: (value: unknown) => void;
const waitPromise = new Promise((resolve) => {
resolveTimeout = resolve;
timeout = setTimeout(() => {
resolve("max duration reached");
}, (maxDuration - 5) * 1000);
});

async function cleanup() {
clearTimeout(timeout);
clearInterval(interval);
await redis.unsubscribe(`requests:${sessionId}`, handleMessage);
console.log("Done");
res.statusCode = 200;
res.end();
return;
}
req.on("close", () => resolveTimeout("client hang up"));

await server.connect(transport);
const closeReason = await waitPromise;
console.log(closeReason);
await cleanup();
} else if (url.pathname === "/message") {
console.log("Received message");

const body = await getRawBody(req, {
length: req.headers["content-length"],
encoding: "utf-8",
});

const sessionId = url.searchParams.get("sessionId") || "";
if (!sessionId) {
res.statusCode = 400;
res.end("No sessionId provided");
if (req.method === "DELETE") {
console.log("Received DELETE MCP request");
res.writeHead(405).end(
JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed.",
},
id: null,
})
);
return;
}
const requestId = crypto.randomUUID();
const serializedRequest: SerializedRequest = {
requestId,
url: req.url || "",
method: req.method || "",
body: body,
headers: req.headers,
};

// Handles responses from the /sse endpoint.
await redis.subscribe(
`responses:${sessionId}:${requestId}`,
(message) => {
clearTimeout(timeout);
const response = JSON.parse(message) as {
status: number;
body: string;
};
res.statusCode = response.status;
res.end(response.body);
}
);

// Queue the request in Redis so that a subscriber can pick it up.
// One queue per session.
await redisPublisher.publish(
`requests:${sessionId}`,
JSON.stringify(serializedRequest)
);
console.log(`Published requests:${sessionId}`, serializedRequest);

let timeout = setTimeout(async () => {
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
res.statusCode = 408;
res.end("Request timed out");
}, 10 * 1000);
console.log("Got new MCP connection", req.url, req.method);

if (!statelessServer) {
statelessServer = new McpServer(
{
name: "mcp-typescript server on vercel",
version: "0.1.0",
},
serverOptions
);

res.on("close", async () => {
clearTimeout(timeout);
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
});
initializeServer(statelessServer);
await statelessServer.connect(statelessTransport);
}
await statelessTransport.handleRequest(req, res);
} else {
res.statusCode = 404;
res.end("Not found");
res.writeHead(404).end("Not found");
}
};
}

// Define the options interface
interface FakeIncomingMessageOptions {
method?: string;
url?: string;
headers?: IncomingHttpHeaders;
body?: string | Buffer | Record<string, any> | null;
socket?: Socket;
}

// Create a fake IncomingMessage
function createFakeIncomingMessage(
options: FakeIncomingMessageOptions = {}
): IncomingMessage {
const {
method = "GET",
url = "/",
headers = {},
body = null,
socket = new Socket(),
} = options;

// Create a readable stream that will be used as the base for IncomingMessage
const readable = new Readable();
readable._read = (): void => {}; // Required implementation

// Add the body content if provided
if (body) {
if (typeof body === "string") {
readable.push(body);
} else if (Buffer.isBuffer(body)) {
readable.push(body);
} else {
readable.push(JSON.stringify(body));
}
readable.push(null); // Signal the end of the stream
}

// Create the IncomingMessage instance
const req = new IncomingMessage(socket);

// Set the properties
req.method = method;
req.url = url;
req.headers = headers;

// Copy over the stream methods
req.push = readable.push.bind(readable);
req.read = readable.read.bind(readable);
req.on = readable.on.bind(readable);
req.pipe = readable.pipe.bind(readable);

return req;
}
5 changes: 1 addition & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
"license": "ISC",
"packageManager": "[email protected]+sha512.c85cd21b6da10332156b1ca2aa79c0a61ee7ad2eb0453b88ab299289e9e8ca93e6091232b25c07cbf61f6df77128d9c849e5c9ac6e44854dbd211c49f3a67adc",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.6.1",
"content-type": "^1.0.5",
"raw-body": "^3.0.0",
"redis": "^4.7.0",
"@modelcontextprotocol/sdk": "^1.10.2",
"zod": "^3.24.2"
},
"devDependencies": {
Expand Down
Loading