From 28594fbd5c34da0c6eab32a37df5c90b333765be Mon Sep 17 00:00:00 2001 From: JermaineHua Date: Fri, 7 Mar 2025 22:38:37 +0800 Subject: [PATCH] Optimize nested streams --- .../spec/DefaultMcpSession.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpSession.java index e2d354f..10c893b 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpSession.java @@ -17,6 +17,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.scheduler.Schedulers; /** * Default implementation of the MCP (Model Context Protocol) session that manages @@ -135,13 +136,13 @@ public DefaultMcpSession(Duration requestTimeout, McpTransport transport, } else if (message instanceof McpSchema.JSONRPCRequest request) { logger.debug("Received request: {}", request); - handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(), - error -> { - var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), - null, new McpSchema.JSONRPCResponse.JSONRPCError( - McpSchema.ErrorCodes.INTERNAL_ERROR, error.getMessage(), null)); - transport.sendMessage(errorResponse).subscribe(); - }); + handleIncomingRequest(request).flatMap(transport::sendMessage).onErrorResume(error -> { + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null, + new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, + error.getMessage(), null)); + return transport.sendMessage(errorResponse); + }).subscribe(); + } else if (message instanceof McpSchema.JSONRPCNotification notification) { logger.debug("Received notification: {}", notification);