From 91166c0a547745e146d789fa18de4de37e7bcc83 Mon Sep 17 00:00:00 2001 From: Zijing Zhang <50045289+pluveto@users.noreply.github.com> Date: Sun, 20 Apr 2025 19:59:47 +0800 Subject: [PATCH 1/2] feat: add PseudoTransport for in-process MCP client-server communication --- src/client/pseudo.ts | 165 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 src/client/pseudo.ts diff --git a/src/client/pseudo.ts b/src/client/pseudo.ts new file mode 100644 index 00000000..0670c5c4 --- /dev/null +++ b/src/client/pseudo.ts @@ -0,0 +1,165 @@ +import { JSONRPCMessage } from "../types.js"; +import { Transport } from "../shared/transport.js"; +import { McpServer } from "src/server/mcp.js"; + +/** + * Pseudo-Transport for testing or embedding: connects an MCP client + * directly to an McpServer instance within the same process, bypassing + * actual network or stdio communication. + * + * Instantiate this with an McpServer, then pass this transport instance + * to your MCP client's connect method. + */ +export class PseudoTransport implements Transport { + private _started = false; + private _mcpServer: McpServer; + private _isServerConnected = false; + + private _clientMessageHandler?: (message: JSONRPCMessage) => void; + private _clientErrorHandler?: (error: Error) => void; + private _clientCloseHandler?: () => void; + + private _serverMessageHandler?: (message: JSONRPCMessage) => void; + + /** + * Creates a PseudoTransport instance linked to a specific McpServer. + * @param mcpServer The McpServer instance to communicate with. + */ + constructor(mcpServer: McpServer) { + if (!mcpServer) { + throw new Error("McpServer instance is required for PseudoTransport."); + } + this._mcpServer = mcpServer; + } + + set onmessage(handler: ((message: JSONRPCMessage) => void) | undefined) { + this._clientMessageHandler = handler; + } + get onmessage(): ((message: JSONRPCMessage) => void) | undefined { + return this._clientMessageHandler; + } + + set onerror(handler: ((error: Error) => void) | undefined) { + this._clientErrorHandler = handler; + } + get onerror(): ((error: Error) => void) | undefined { + return this._clientErrorHandler; + } + + set onclose(handler: (() => void) | undefined) { + this._clientCloseHandler = handler; + } + get onclose(): (() => void) | undefined { + return this._clientCloseHandler; + } + + /** + * Starts the pseudo-connection. This internally connects the transport + * to the provided McpServer instance and ensures handlers are set up + * before resolving. + */ + async start(): Promise { + if (this._started) { + throw new Error("PseudoTransport already started!"); + } + + const self = this; + const serverFacingTransportProxy: Transport = { + set onmessage(handler: ((message: JSONRPCMessage) => void) | undefined) { + self._serverMessageHandler = handler; + if (handler) { + self._isServerConnected = true; + } else { + self._isServerConnected = false; + } + }, + get onmessage(): ((message: JSONRPCMessage) => void) | undefined { + return self._serverMessageHandler; + }, + send: async (message: JSONRPCMessage): Promise => { + if (self._clientMessageHandler) { + setTimeout(() => { + try { + self._clientMessageHandler!(message); + } catch (error) { + self._clientErrorHandler?.(error instanceof Error ? error : new Error(String(error))); + } + }, 0); + } + }, + start: async (): Promise => {}, + close: async (): Promise => { + self._isServerConnected = false; + }, + onerror: undefined, + onclose: undefined, + }; + + try { + await this._mcpServer.connect(serverFacingTransportProxy); + if (!this._serverMessageHandler) { + await new Promise(resolve => setTimeout(resolve, 0)); + if (!this._serverMessageHandler) { + console.warn("[PseudoTransport] McpServer did NOT set a message handler after connect completed. Client requests might fail."); + } + } + } catch (error) { + const err = error instanceof Error ? error : new Error("Failed to connect internal McpServer"); + this._clientErrorHandler?.(err); + this._started = false; + throw err; + } + + this._started = true; + } + + /** + * Closes the pseudo-connection. Notifies the client. + */ + async close(): Promise { + if (!this._started) { + return Promise.resolve(); + } + this._started = false; + this._isServerConnected = false; + + const clientCloseHandler = this._clientCloseHandler; + this._serverMessageHandler = undefined; + this._clientMessageHandler = undefined; + this._clientErrorHandler = undefined; + this._clientCloseHandler = undefined; + + setTimeout(() => clientCloseHandler?.(), 0); + + return Promise.resolve(); + } + + /** + * Sends a message FROM the CLIENT TO the McpServer. + * This will invoke the message handler the McpServer registered. + * @param message The JSON-RPC message from the client. + */ + send(message: JSONRPCMessage): Promise { + if (!this._started) { + return Promise.reject(new Error("PseudoTransport is not started.")); + } + if (!this._serverMessageHandler || !this._isServerConnected) { + return Promise.reject(new Error("PseudoTransport: McpServer handler not available or server not connected.")); + } + + const serverHandler = this._serverMessageHandler; + + return new Promise((resolve, reject) => { + setTimeout(() => { + try { + serverHandler(message); + resolve(); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + this._clientErrorHandler?.(err); + reject(err); + } + }, 0); + }); + } +} From f6b8538297aefdefbe65b081af897c5153c10eb0 Mon Sep 17 00:00:00 2001 From: Zijing Zhang <50045289+pluveto@users.noreply.github.com> Date: Sun, 20 Apr 2025 20:04:46 +0800 Subject: [PATCH 2/2] tests: add test for PseudoTransport --- src/client/pseudo.test.ts | 124 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 src/client/pseudo.test.ts diff --git a/src/client/pseudo.test.ts b/src/client/pseudo.test.ts new file mode 100644 index 00000000..a486fcba --- /dev/null +++ b/src/client/pseudo.test.ts @@ -0,0 +1,124 @@ +import { McpServer } from "src/server/mcp.js"; +import { PseudoTransport } from "./pseudo.js"; +import { JSONRPCMessage } from "../types.js"; + +describe("PseudoTransport", () => { + let server: McpServer; + let transport: PseudoTransport; + + // Mock MCP server with minimal message handler logic + beforeEach(() => { + server = { + connect: jest.fn(async (t) => { + t.onmessage = (msg: JSONRPCMessage) => { + // Echo back with result if it's a request + if ("id" in msg && "method" in msg) { + t.send({ + jsonrpc: "2.0", + id: msg.id, + result: { echoed: msg.params }, + }); + } + }; + }), + } as unknown as McpServer; + transport = new PseudoTransport(server); + }); + + afterEach(async () => { + await transport.close(); + }); + + describe("connection handling", () => { + it("starts and connects to the server", async () => { + await expect(transport.start()).resolves.toBeUndefined(); + expect(server.connect).toHaveBeenCalled(); + }); + + it("throws if started twice", async () => { + await transport.start(); + await expect(transport.start()).rejects.toThrow(/already started/i); + }); + + it("calls onclose handler on close()", async () => { + const onclose = jest.fn(); + transport.onclose = onclose; + await transport.start(); + await transport.close(); + await new Promise((r) => setTimeout(r, 10)); + expect(onclose).toHaveBeenCalled(); + }); + }); + + describe("message handling", () => { + it("routes client message to server and receives response", async () => { + const received: JSONRPCMessage[] = []; + transport.onmessage = (msg) => received.push(msg); + await transport.start(); + + const request: JSONRPCMessage = { + jsonrpc: "2.0", + id: "1", + method: "echo", + params: { foo: "bar" }, + }; + + await transport.send(request); + await new Promise((r) => setTimeout(r, 10)); + expect(received).toHaveLength(1); + expect(received[0]).toMatchObject({ + id: "1", + result: { echoed: { foo: "bar" } }, + }); + }); + + it("calls onerror if server handler throws", async () => { + // Patch server to throw + server.connect = jest.fn(async (t) => { + t.onmessage = () => { + throw new Error("server fail"); + }; + }); + transport = new PseudoTransport(server); + + const errors: Error[] = []; + transport.onerror = (e) => errors.push(e); + + await transport.start(); + const req: JSONRPCMessage = { jsonrpc: "2.0", id: "2", method: "fail", params: {} }; + await expect(transport.send(req)).rejects.toThrow(/server fail/); + await new Promise((r) => setTimeout(r, 10)); + expect(errors[0].message).toMatch(/server fail/); + }); + + it("calls onerror if client handler throws", async () => { + await transport.start(); + transport.onmessage = () => { + throw new Error("client fail"); + }; + // Patch server to echo + (server.connect as jest.Mock).mock.calls[0][0].send({ + jsonrpc: "2.0", + id: "3", + result: "hi", + }); + const errors: Error[] = []; + transport.onerror = (e) => errors.push(e); + await new Promise((r) => setTimeout(r, 10)); + expect(errors[0].message).toMatch(/client fail/); + }); + + it("rejects send if not started", async () => { + const msg: JSONRPCMessage = { jsonrpc: "2.0", id: "4", method: "noop", params: {} }; + await expect(transport.send(msg)).rejects.toThrow(/not started/i); + }); + + it("rejects send if server not connected", async () => { + await transport.start(); + // Simulate server disconnect + (transport as any)._serverMessageHandler = undefined; + const msg: JSONRPCMessage = { jsonrpc: "2.0", id: "5", method: "noop", params: {} }; + await expect(transport.send(msg)).rejects.toThrow(/not available/i); + }); + }); +});