@@ -30,14 +30,7 @@ class LaravelStreamableHttpTransport implements ServerTransportInterface
30
30
public function __construct (
31
31
protected SessionManager $ sessionManager ,
32
32
protected ?EventStoreInterface $ eventStore = null
33
- ) {
34
- $ this ->on ('message ' , function (Message $ message , string $ sessionId ) {
35
- $ session = $ this ->sessionManager ->getSession ($ sessionId );
36
- if ($ session !== null ) {
37
- $ session ->save ();
38
- }
39
- });
40
- }
33
+ ) {}
41
34
42
35
protected function generateId (): string
43
36
{
@@ -59,14 +52,14 @@ public function sendMessage(Message $message, string $sessionId, array $context
59
52
60
53
$ eventId = null ;
61
54
if ($ this ->eventStore && isset ($ context ['type ' ]) && in_array ($ context ['type ' ], ['get_sse ' , 'post_sse ' ])) {
62
- $ streamKey = $ context ['type ' ] === ' get_sse ' ? " get_stream_ { $ sessionId }" : $ context [ ' streamId ' ] ?? " post_stream_ { $ sessionId }" ;
63
- $ eventId = $ this ->eventStore ->storeEvent ($ streamKey , $ rawMessage );
55
+ $ streamId = $ context ['streamId ' ];
56
+ $ eventId = $ this ->eventStore ->storeEvent ($ streamId , $ rawMessage );
64
57
}
65
58
66
59
$ messageData = [
67
60
'id ' => $ eventId ?? $ this ->generateId (),
68
61
'data ' => $ rawMessage ,
69
- 'context ' => $ context[ ' type ' ] ?? ' get_sse ' ,
62
+ 'context ' => $ context ,
70
63
'timestamp ' => time ()
71
64
];
72
65
@@ -161,35 +154,26 @@ protected function handleJsonResponse(Message $message, string $sessionId, array
161
154
$ context ['type ' ] = 'post_json ' ;
162
155
$ this ->emit ('message ' , [$ message , $ sessionId , $ context ]);
163
156
164
- $ maxWaitTime = config ('mcp.transports.http_integrated.json_response_timeout ' , 30 );
165
- $ pollInterval = 0.1 ; // 100ms
166
- $ waitedTime = 0 ;
167
-
168
- while ($ waitedTime < $ maxWaitTime ) {
169
- $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_json ' );
170
-
171
- if (!empty ($ messages )) {
172
- $ responseMessage = $ messages [0 ];
173
- $ data = $ responseMessage ['data ' ];
157
+ $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_json ' );
174
158
175
- $ headers = [
176
- ' Content-Type ' => ' application/json ' ,
177
- ... $ this ->getCorsHeaders ()
178
- ];
159
+ if ( empty ( $ messages )) {
160
+ $ error = Error:: forInternalError ( ' Internal error ' );
161
+ return response ()-> json ( $ error , 500 , $ this ->getCorsHeaders ());
162
+ }
179
163
180
- if ($ context ['is_initialize_request ' ] ?? false ) {
181
- $ headers ['Mcp-Session-Id ' ] = $ sessionId ;
182
- }
164
+ $ responseMessage = $ messages [0 ];
165
+ $ data = $ responseMessage ['data ' ];
183
166
184
- return response ()->make ($ data , 200 , $ headers );
185
- }
167
+ $ headers = [
168
+ 'Content-Type ' => 'application/json ' ,
169
+ ...$ this ->getCorsHeaders ()
170
+ ];
186
171
187
- usleep (( int )( $ pollInterval * 1000000 ));
188
- $ waitedTime + = $ pollInterval ;
172
+ if ( $ context [ ' is_initialize_request ' ] ?? false ) {
173
+ $ headers [ ' Mcp-Session-Id ' ] = $ sessionId ;
189
174
}
190
175
191
- $ error = Error::forInternalError ('Request timeout ' );
192
- return response ()->json ($ error , 504 , $ this ->getCorsHeaders ());
176
+ return response ()->make ($ data , 200 , $ headers );
193
177
} catch (Throwable $ e ) {
194
178
Log::error ('JSON response mode error ' , ['exception ' => $ e ]);
195
179
$ error = Error::forInternalError ('Internal error ' );
@@ -202,46 +186,29 @@ protected function handleJsonResponse(Message $message, string $sessionId, array
202
186
*/
203
187
protected function handleSseResponse (Message $ message , string $ sessionId , int $ nRequests , array $ context ): StreamedResponse
204
188
{
205
- $ streamId = $ this ->generateId ();
206
- $ context ['type ' ] = 'post_sse ' ;
207
- $ context ['streamId ' ] = $ streamId ;
208
- $ context ['nRequests ' ] = $ nRequests ;
209
-
210
- $ this ->emit ('message ' , [$ message , $ sessionId , $ context ]);
211
-
212
- return response ()->stream (function () use ($ sessionId , $ nRequests , $ streamId ) {
213
- $ responsesSent = 0 ;
214
- $ maxWaitTime = 30 ; // 30 seconds timeout
215
- $ pollInterval = 0.1 ; // 100ms
216
- $ waitedTime = 0 ;
189
+ $ headers = array_merge ([
190
+ 'Content-Type ' => 'text/event-stream ' ,
191
+ 'Cache-Control ' => 'no-cache ' ,
192
+ 'Connection ' => 'keep-alive ' ,
193
+ 'X-Accel-Buffering ' => 'no ' ,
194
+ ], $ this ->getCorsHeaders ());
217
195
218
- while ($ responsesSent < $ nRequests && $ waitedTime < $ maxWaitTime ) {
219
- if (connection_aborted ()) {
220
- break ;
221
- }
196
+ if ($ context ['is_initialize_request ' ] ?? false ) {
197
+ $ headers ['Mcp-Session-Id ' ] = $ sessionId ;
198
+ }
222
199
223
- $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_sse ' , $ streamId );
200
+ return response ()->stream (function () use ($ sessionId , $ nRequests , $ message , $ context ) {
201
+ $ streamId = $ this ->generateId ();
202
+ $ context ['type ' ] = 'post_sse ' ;
203
+ $ context ['streamId ' ] = $ streamId ;
204
+ $ context ['nRequests ' ] = $ nRequests ;
224
205
225
- foreach ($ messages as $ messageData ) {
226
- $ this ->sendSseEvent ($ messageData ['data ' ], $ messageData ['id ' ]);
227
- $ responsesSent ++;
206
+ $ this ->emit ('message ' , [$ message , $ sessionId , $ context ]);
228
207
229
- if ($ responsesSent >= $ nRequests ) {
230
- break ;
231
- }
232
- }
208
+ $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_sse ' , $ streamId );
233
209
234
- if ($ responsesSent < $ nRequests ) {
235
- usleep ((int )($ pollInterval * 1000000 ));
236
- $ waitedTime += $ pollInterval ;
237
- }
238
- }
239
- }, headers: array_merge ([
240
- 'Content-Type ' => 'text/event-stream ' ,
241
- 'Cache-Control ' => 'no-cache ' ,
242
- 'Connection ' => 'keep-alive ' ,
243
- 'X-Accel-Buffering ' => 'no ' ,
244
- ], $ this ->getCorsHeaders ()));
210
+ $ this ->sendSseEvent ($ messages [0 ]['data ' ], $ messages [0 ]['id ' ]);
211
+ }, headers: $ headers );
245
212
}
246
213
247
214
/**
@@ -325,20 +292,21 @@ public function handleDeleteRequest(Request $request): Response
325
292
/**
326
293
* Dequeue messages for specific context, requeue others
327
294
*/
328
- protected function dequeueMessagesForContext (string $ sessionId , string $ context , ?string $ streamId = null ): array
295
+ protected function dequeueMessagesForContext (string $ sessionId , string $ type , ?string $ streamId = null ): array
329
296
{
330
297
$ allMessages = $ this ->sessionManager ->dequeueMessages ($ sessionId );
331
298
$ contextMessages = [];
332
299
$ requeueMessages = [];
333
300
334
301
foreach ($ allMessages as $ rawMessage ) {
335
302
$ messageData = json_decode ($ rawMessage , true );
303
+ $ context = $ messageData ['context ' ] ?? [];
336
304
337
- if ($ messageData && isset ( $ messageData [ ' context ' ]) ) {
338
- $ matchesContext = $ messageData [ ' context ' ] === $ context ;
305
+ if ($ messageData ) {
306
+ $ matchesContext = $ context [ ' type ' ] === $ type ;
339
307
340
- if ($ context === 'post_sse ' && $ streamId ) {
341
- $ matchesContext = $ matchesContext && isset ($ messageData ['streamId ' ]) && $ messageData ['streamId ' ] === $ streamId ;
308
+ if ($ type === 'post_sse ' && $ streamId ) {
309
+ $ matchesContext = $ matchesContext && isset ($ context ['streamId ' ]) && $ context ['streamId ' ] === $ streamId ;
342
310
}
343
311
344
312
if ($ matchesContext ) {
@@ -412,6 +380,15 @@ private function flushOutput(): void
412
380
@flush ();
413
381
}
414
382
383
+ protected function collectSessionGarbage (): void
384
+ {
385
+ $ lottery = config ('mcp.session.lottery ' , [2 , 100 ]);
386
+
387
+ if (random_int (1 , $ lottery [1 ]) <= $ lottery [0 ]) {
388
+ $ this ->sessionManager ->gc ();
389
+ }
390
+ }
391
+
415
392
/**
416
393
* Get CORS headers
417
394
*/
0 commit comments