Skip to content

Commit f5dac6d

Browse files
authored
Merge pull request #1496 from shivdeepak/main
add streamableHttp server support for everything server
2 parents a0c7a31 + 3020ae5 commit f5dac6d

File tree

4 files changed

+196
-7
lines changed

4 files changed

+196
-7
lines changed

package-lock.json

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/everything/README.md

+17-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ The server sends random-leveled log messages every 15 seconds, e.g.:
126126
}
127127
```
128128

129-
## Usage with Claude Desktop
129+
## Usage with Claude Desktop (uses [stdio Transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#stdio))
130130

131131
Add to your `claude_desktop_config.json`:
132132

@@ -172,3 +172,19 @@ Optionally, you can add it to a file called `.vscode/mcp.json` in your workspace
172172
}
173173
}
174174
```
175+
176+
## Run with [HTTP+SSE Transport](https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse) (deprecated as of [2025-03-26](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports))
177+
178+
```shell
179+
cd src/everything
180+
npm install
181+
npm run start:sse
182+
```
183+
184+
## Run with [Streamable HTTP Transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)
185+
186+
```shell
187+
cd src/everything
188+
npm install
189+
npm run start:streamableHttp
190+
```

src/everything/package.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
"prepare": "npm run build",
1919
"watch": "tsc --watch",
2020
"start": "node dist/index.js",
21-
"start:sse": "node dist/sse.js"
21+
"start:sse": "node dist/sse.js",
22+
"start:streamableHttp": "node dist/streamableHttp.js"
2223
},
2324
"dependencies": {
24-
"@modelcontextprotocol/sdk": "^1.9.0",
25+
"@modelcontextprotocol/sdk": "^1.10.1",
2526
"express": "^4.21.1",
2627
"zod": "^3.23.8",
2728
"zod-to-json-schema": "^3.23.5"

src/everything/streamableHttp.ts

+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
2+
import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js';
3+
import express, { Request, Response } from "express";
4+
import { createServer } from "./everything.js";
5+
import { randomUUID } from 'node:crypto';
6+
7+
const app = express();
8+
9+
const { server, cleanup } = createServer();
10+
11+
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
12+
13+
app.post('/mcp', async (req: Request, res: Response) => {
14+
console.log('Received MCP POST request');
15+
try {
16+
// Check for existing session ID
17+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
18+
let transport: StreamableHTTPServerTransport;
19+
20+
if (sessionId && transports[sessionId]) {
21+
// Reuse existing transport
22+
transport = transports[sessionId];
23+
} else if (!sessionId) {
24+
// New initialization request
25+
const eventStore = new InMemoryEventStore();
26+
transport = new StreamableHTTPServerTransport({
27+
sessionIdGenerator: () => randomUUID(),
28+
eventStore, // Enable resumability
29+
onsessioninitialized: (sessionId) => {
30+
// Store the transport by session ID when session is initialized
31+
// This avoids race conditions where requests might come in before the session is stored
32+
console.log(`Session initialized with ID: ${sessionId}`);
33+
transports[sessionId] = transport;
34+
}
35+
});
36+
37+
// Set up onclose handler to clean up transport when closed
38+
transport.onclose = () => {
39+
const sid = transport.sessionId;
40+
if (sid && transports[sid]) {
41+
console.log(`Transport closed for session ${sid}, removing from transports map`);
42+
delete transports[sid];
43+
}
44+
};
45+
46+
// Connect the transport to the MCP server BEFORE handling the request
47+
// so responses can flow back through the same transport
48+
await server.connect(transport);
49+
50+
await transport.handleRequest(req, res);
51+
return; // Already handled
52+
} else {
53+
// Invalid request - no session ID or not initialization request
54+
res.status(400).json({
55+
jsonrpc: '2.0',
56+
error: {
57+
code: -32000,
58+
message: 'Bad Request: No valid session ID provided',
59+
},
60+
id: req?.body?.id,
61+
});
62+
return;
63+
}
64+
65+
// Handle the request with existing transport - no need to reconnect
66+
// The existing transport is already connected to the server
67+
await transport.handleRequest(req, res);
68+
} catch (error) {
69+
console.error('Error handling MCP request:', error);
70+
if (!res.headersSent) {
71+
res.status(500).json({
72+
jsonrpc: '2.0',
73+
error: {
74+
code: -32603,
75+
message: 'Internal server error',
76+
},
77+
id: req?.body?.id,
78+
});
79+
return;
80+
}
81+
}
82+
});
83+
84+
// Handle GET requests for SSE streams (using built-in support from StreamableHTTP)
85+
app.get('/mcp', async (req: Request, res: Response) => {
86+
console.log('Received MCP GET request');
87+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
88+
if (!sessionId || !transports[sessionId]) {
89+
res.status(400).json({
90+
jsonrpc: '2.0',
91+
error: {
92+
code: -32000,
93+
message: 'Bad Request: No valid session ID provided',
94+
},
95+
id: req?.body?.id,
96+
});
97+
return;
98+
}
99+
100+
// Check for Last-Event-ID header for resumability
101+
const lastEventId = req.headers['last-event-id'] as string | undefined;
102+
if (lastEventId) {
103+
console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
104+
} else {
105+
console.log(`Establishing new SSE stream for session ${sessionId}`);
106+
}
107+
108+
const transport = transports[sessionId];
109+
await transport.handleRequest(req, res);
110+
});
111+
112+
// Handle DELETE requests for session termination (according to MCP spec)
113+
app.delete('/mcp', async (req: Request, res: Response) => {
114+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
115+
if (!sessionId || !transports[sessionId]) {
116+
res.status(400).json({
117+
jsonrpc: '2.0',
118+
error: {
119+
code: -32000,
120+
message: 'Bad Request: No valid session ID provided',
121+
},
122+
id: req?.body?.id,
123+
});
124+
return;
125+
}
126+
127+
console.log(`Received session termination request for session ${sessionId}`);
128+
129+
try {
130+
const transport = transports[sessionId];
131+
await transport.handleRequest(req, res);
132+
} catch (error) {
133+
console.error('Error handling session termination:', error);
134+
if (!res.headersSent) {
135+
res.status(500).json({
136+
jsonrpc: '2.0',
137+
error: {
138+
code: -32603,
139+
message: 'Error handling session termination',
140+
},
141+
id: req?.body?.id,
142+
});
143+
return;
144+
}
145+
}
146+
});
147+
148+
// Start the server
149+
const PORT = process.env.PORT || 3001;
150+
app.listen(PORT, () => {
151+
console.log(`MCP Streamable HTTP Server listening on port ${PORT}`);
152+
});
153+
154+
// Handle server shutdown
155+
process.on('SIGINT', async () => {
156+
console.log('Shutting down server...');
157+
158+
// Close all active transports to properly clean up resources
159+
for (const sessionId in transports) {
160+
try {
161+
console.log(`Closing transport for session ${sessionId}`);
162+
await transports[sessionId].close();
163+
delete transports[sessionId];
164+
} catch (error) {
165+
console.error(`Error closing transport for session ${sessionId}:`, error);
166+
}
167+
}
168+
await cleanup();
169+
await server.close();
170+
console.log('Server shutdown complete');
171+
process.exit(0);
172+
});

0 commit comments

Comments
 (0)