@@ -122,42 +122,38 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
122
122
// Observation associated with the individual message - it can be used to
123
123
// create child Observation and emit it together with the message to the
124
124
// consumer
125
- this .connection = this .transport .connect (mono -> mono .doOnNext (message -> handle (message ).subscribe ()))
126
- .subscribe ();
125
+ this .connection = this .transport .connect (mono -> mono .doOnNext (this ::handle )).subscribe ();
127
126
}
128
127
129
- public Mono <Void > handle (McpSchema .JSONRPCMessage message ) {
130
- return Mono .defer (() -> {
131
- if (message instanceof McpSchema .JSONRPCResponse response ) {
132
- logger .debug ("Received Response: {}" , response );
133
- var sink = pendingResponses .remove (response .id ());
134
- if (sink == null ) {
135
- logger .warn ("Unexpected response for unknown id {}" , response .id ());
136
- }
137
- else {
138
- sink .success (response );
139
- }
140
- return Mono .empty ();
141
- }
142
- else if (message instanceof McpSchema .JSONRPCRequest request ) {
143
- logger .debug ("Received request: {}" , request );
144
- return handleIncomingRequest (request ).onErrorResume (error -> {
145
- var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
146
- new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
147
- error .getMessage (), null ));
148
- return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
149
- }).flatMap (this .transport ::sendMessage );
150
- }
151
- else if (message instanceof McpSchema .JSONRPCNotification notification ) {
152
- logger .debug ("Received notification: {}" , notification );
153
- return handleIncomingNotification (notification )
154
- .doOnError (error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
128
+ private void handle (McpSchema .JSONRPCMessage message ) {
129
+ if (message instanceof McpSchema .JSONRPCResponse response ) {
130
+ logger .debug ("Received Response: {}" , response );
131
+ var sink = pendingResponses .remove (response .id ());
132
+ if (sink == null ) {
133
+ logger .warn ("Unexpected response for unknown id {}" , response .id ());
155
134
}
156
135
else {
157
- logger .warn ("Received unknown message type: {}" , message );
158
- return Mono .empty ();
136
+ sink .success (response );
159
137
}
160
- });
138
+ }
139
+ else if (message instanceof McpSchema .JSONRPCRequest request ) {
140
+ logger .debug ("Received request: {}" , request );
141
+ handleIncomingRequest (request ).onErrorResume (error -> {
142
+ var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
143
+ new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
144
+ error .getMessage (), null ));
145
+ return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
146
+ }).flatMap (this .transport ::sendMessage ).subscribe ();
147
+ }
148
+ else if (message instanceof McpSchema .JSONRPCNotification notification ) {
149
+ logger .debug ("Received notification: {}" , notification );
150
+ handleIncomingNotification (notification )
151
+ .doOnError (error -> logger .error ("Error handling notification: {}" , error .getMessage ()))
152
+ .subscribe ();
153
+ }
154
+ else {
155
+ logger .warn ("Received unknown message type: {}" , message );
156
+ }
161
157
}
162
158
163
159
/**
0 commit comments