Skip to content

feat: add PseudoTransport for in-process MCP client-server communication #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions src/client/pseudo.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { McpServer } from "src/server/mcp.js";
import { PseudoTransport } from "./pseudo.js";
import { JSONRPCMessage } from "../types.js";

describe("PseudoTransport", () => {
let server: McpServer;
let transport: PseudoTransport;

// Mock MCP server with minimal message handler logic
beforeEach(() => {
server = {
connect: jest.fn(async (t) => {
t.onmessage = (msg: JSONRPCMessage) => {
// Echo back with result if it's a request
if ("id" in msg && "method" in msg) {
t.send({
jsonrpc: "2.0",
id: msg.id,
result: { echoed: msg.params },
});
}
};
}),
} as unknown as McpServer;
transport = new PseudoTransport(server);
});

afterEach(async () => {
await transport.close();
});

describe("connection handling", () => {
it("starts and connects to the server", async () => {
await expect(transport.start()).resolves.toBeUndefined();
expect(server.connect).toHaveBeenCalled();
});

it("throws if started twice", async () => {
await transport.start();
await expect(transport.start()).rejects.toThrow(/already started/i);
});

it("calls onclose handler on close()", async () => {
const onclose = jest.fn();
transport.onclose = onclose;
await transport.start();
await transport.close();
await new Promise((r) => setTimeout(r, 10));
expect(onclose).toHaveBeenCalled();
});
});

describe("message handling", () => {
it("routes client message to server and receives response", async () => {
const received: JSONRPCMessage[] = [];
transport.onmessage = (msg) => received.push(msg);
await transport.start();

const request: JSONRPCMessage = {
jsonrpc: "2.0",
id: "1",
method: "echo",
params: { foo: "bar" },
};

await transport.send(request);
await new Promise((r) => setTimeout(r, 10));
expect(received).toHaveLength(1);
expect(received[0]).toMatchObject({
id: "1",
result: { echoed: { foo: "bar" } },
});
});

it("calls onerror if server handler throws", async () => {
// Patch server to throw
server.connect = jest.fn(async (t) => {
t.onmessage = () => {
throw new Error("server fail");
};
});
transport = new PseudoTransport(server);

const errors: Error[] = [];
transport.onerror = (e) => errors.push(e);

await transport.start();
const req: JSONRPCMessage = { jsonrpc: "2.0", id: "2", method: "fail", params: {} };
await expect(transport.send(req)).rejects.toThrow(/server fail/);
await new Promise((r) => setTimeout(r, 10));
expect(errors[0].message).toMatch(/server fail/);
});

it("calls onerror if client handler throws", async () => {
await transport.start();
transport.onmessage = () => {
throw new Error("client fail");
};
// Patch server to echo
(server.connect as jest.Mock).mock.calls[0][0].send({
jsonrpc: "2.0",
id: "3",
result: "hi",
});
const errors: Error[] = [];
transport.onerror = (e) => errors.push(e);
await new Promise((r) => setTimeout(r, 10));
expect(errors[0].message).toMatch(/client fail/);
});

it("rejects send if not started", async () => {
const msg: JSONRPCMessage = { jsonrpc: "2.0", id: "4", method: "noop", params: {} };
await expect(transport.send(msg)).rejects.toThrow(/not started/i);
});

it("rejects send if server not connected", async () => {
await transport.start();
// Simulate server disconnect
(transport as any)._serverMessageHandler = undefined;
const msg: JSONRPCMessage = { jsonrpc: "2.0", id: "5", method: "noop", params: {} };
await expect(transport.send(msg)).rejects.toThrow(/not available/i);
});
});
});
165 changes: 165 additions & 0 deletions src/client/pseudo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { JSONRPCMessage } from "../types.js";
import { Transport } from "../shared/transport.js";
import { McpServer } from "src/server/mcp.js";

/**
* Pseudo-Transport for testing or embedding: connects an MCP client
* directly to an McpServer instance within the same process, bypassing
* actual network or stdio communication.
*
* Instantiate this with an McpServer, then pass this transport instance
* to your MCP client's connect method.
*/
export class PseudoTransport implements Transport {
private _started = false;
private _mcpServer: McpServer;
private _isServerConnected = false;

private _clientMessageHandler?: (message: JSONRPCMessage) => void;
private _clientErrorHandler?: (error: Error) => void;
private _clientCloseHandler?: () => void;

private _serverMessageHandler?: (message: JSONRPCMessage) => void;

/**
* Creates a PseudoTransport instance linked to a specific McpServer.
* @param mcpServer The McpServer instance to communicate with.
*/
constructor(mcpServer: McpServer) {
if (!mcpServer) {
throw new Error("McpServer instance is required for PseudoTransport.");
}
this._mcpServer = mcpServer;
}

set onmessage(handler: ((message: JSONRPCMessage) => void) | undefined) {
this._clientMessageHandler = handler;
}
get onmessage(): ((message: JSONRPCMessage) => void) | undefined {
return this._clientMessageHandler;
}

set onerror(handler: ((error: Error) => void) | undefined) {
this._clientErrorHandler = handler;
}
get onerror(): ((error: Error) => void) | undefined {
return this._clientErrorHandler;
}

set onclose(handler: (() => void) | undefined) {
this._clientCloseHandler = handler;
}
get onclose(): (() => void) | undefined {
return this._clientCloseHandler;
}

/**
* Starts the pseudo-connection. This internally connects the transport
* to the provided McpServer instance and ensures handlers are set up
* before resolving.
*/
async start(): Promise<void> {
if (this._started) {
throw new Error("PseudoTransport already started!");
}

const self = this;
const serverFacingTransportProxy: Transport = {
set onmessage(handler: ((message: JSONRPCMessage) => void) | undefined) {
self._serverMessageHandler = handler;
if (handler) {
self._isServerConnected = true;
} else {
self._isServerConnected = false;
}
},
get onmessage(): ((message: JSONRPCMessage) => void) | undefined {
return self._serverMessageHandler;
},
send: async (message: JSONRPCMessage): Promise<void> => {
if (self._clientMessageHandler) {
setTimeout(() => {
try {
self._clientMessageHandler!(message);
} catch (error) {
self._clientErrorHandler?.(error instanceof Error ? error : new Error(String(error)));
}
}, 0);
}
},
start: async (): Promise<void> => {},
close: async (): Promise<void> => {
self._isServerConnected = false;
},
onerror: undefined,
onclose: undefined,
};

try {
await this._mcpServer.connect(serverFacingTransportProxy);
if (!this._serverMessageHandler) {
await new Promise(resolve => setTimeout(resolve, 0));
if (!this._serverMessageHandler) {
console.warn("[PseudoTransport] McpServer did NOT set a message handler after connect completed. Client requests might fail.");
}
}
} catch (error) {
const err = error instanceof Error ? error : new Error("Failed to connect internal McpServer");
this._clientErrorHandler?.(err);
this._started = false;
throw err;
}

this._started = true;
}

/**
* Closes the pseudo-connection. Notifies the client.
*/
async close(): Promise<void> {
if (!this._started) {
return Promise.resolve();
}
this._started = false;
this._isServerConnected = false;

const clientCloseHandler = this._clientCloseHandler;
this._serverMessageHandler = undefined;
this._clientMessageHandler = undefined;
this._clientErrorHandler = undefined;
this._clientCloseHandler = undefined;

setTimeout(() => clientCloseHandler?.(), 0);

return Promise.resolve();
}

/**
* Sends a message FROM the CLIENT TO the McpServer.
* This will invoke the message handler the McpServer registered.
* @param message The JSON-RPC message from the client.
*/
send(message: JSONRPCMessage): Promise<void> {
if (!this._started) {
return Promise.reject(new Error("PseudoTransport is not started."));
}
if (!this._serverMessageHandler || !this._isServerConnected) {
return Promise.reject(new Error("PseudoTransport: McpServer handler not available or server not connected."));
}

const serverHandler = this._serverMessageHandler;

return new Promise((resolve, reject) => {
setTimeout(() => {
try {
serverHandler(message);
resolve();
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
this._clientErrorHandler?.(err);
reject(err);
}
}, 0);
});
}
}