Skip to content

Commit 04046ca

Browse files
CrazyHZMchemicL
authored andcommitted
Optimize client nested streams in McpClientSession (#33)
Signed-off-by: JermaineHua <[email protected]>
1 parent 41c6bd9 commit 04046ca

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

+20-11
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,12 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
122122
// Observation associated with the individual message - it can be used to
123123
// create child Observation and emit it together with the message to the
124124
// consumer
125-
this.connection = this.transport.connect(mono -> mono.doOnNext(message -> {
125+
this.connection = this.transport.connect(mono -> mono.doOnNext(message -> handle(message).subscribe()))
126+
.subscribe();
127+
}
128+
129+
public Mono<Void> handle(McpSchema.JSONRPCMessage message) {
130+
return Mono.defer(() -> {
126131
if (message instanceof McpSchema.JSONRPCResponse response) {
127132
logger.debug("Received Response: {}", response);
128133
var sink = pendingResponses.remove(response.id());
@@ -132,23 +137,27 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
132137
else {
133138
sink.success(response);
134139
}
140+
return Mono.empty();
135141
}
136142
else if (message instanceof McpSchema.JSONRPCRequest request) {
137143
logger.debug("Received request: {}", request);
138-
handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(),
139-
error -> {
140-
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(),
141-
null, new McpSchema.JSONRPCResponse.JSONRPCError(
142-
McpSchema.ErrorCodes.INTERNAL_ERROR, error.getMessage(), null));
143-
transport.sendMessage(errorResponse).subscribe();
144-
});
144+
return handleIncomingRequest(request).onErrorResume(error -> {
145+
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null,
146+
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
147+
error.getMessage(), null));
148+
return this.transport.sendMessage(errorResponse).then(Mono.empty());
149+
}).flatMap(this.transport::sendMessage);
145150
}
146151
else if (message instanceof McpSchema.JSONRPCNotification notification) {
147152
logger.debug("Received notification: {}", notification);
148-
handleIncomingNotification(notification).subscribe(null,
149-
error -> logger.error("Error handling notification: {}", error.getMessage()));
153+
return handleIncomingNotification(notification)
154+
.doOnError(error -> logger.error("Error handling notification: {}", error.getMessage()));
150155
}
151-
})).subscribe();
156+
else {
157+
logger.warn("Received unknown message type: {}", message);
158+
return Mono.empty();
159+
}
160+
});
152161
}
153162

154163
/**

0 commit comments

Comments
 (0)