Skip to content

Commit

Permalink
Fix subscribe in non-blocking context
Browse files Browse the repository at this point in the history
Signed-off-by: JermaineHua <[email protected]>
  • Loading branch information
CrazyHZM committed Mar 7, 2025
1 parent 1bee8d4 commit 7893d2b
Showing 1 changed file with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;

/**
* Default implementation of the MCP (Model Context Protocol) session that manages
Expand Down Expand Up @@ -135,18 +136,18 @@ public DefaultMcpSession(Duration requestTimeout, McpTransport transport,
}
else if (message instanceof McpSchema.JSONRPCRequest request) {
logger.debug("Received request: {}", request);
handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(),
error -> {
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(),
null, new McpSchema.JSONRPCResponse.JSONRPCError(
McpSchema.ErrorCodes.INTERNAL_ERROR, error.getMessage(), null));
transport.sendMessage(errorResponse).subscribe();
});
handleIncomingRequest(request).subscribeOn(Schedulers.boundedElastic())
.subscribe(response -> transport.sendMessage(response).subscribe(), error -> {
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null,
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
error.getMessage(), null));
transport.sendMessage(errorResponse).subscribe();
});
}
else if (message instanceof McpSchema.JSONRPCNotification notification) {
logger.debug("Received notification: {}", notification);
handleIncomingNotification(notification).subscribe(null,
error -> logger.error("Error handling notification: {}", error.getMessage()));
handleIncomingNotification(notification).subscribeOn(Schedulers.boundedElastic())
.subscribe(null, error -> logger.error("Error handling notification: {}", error.getMessage()));
}
})).subscribe();
}
Expand Down

0 comments on commit 7893d2b

Please sign in to comment.