From e218d9bf0412a943c122c710688130a842a3941b Mon Sep 17 00:00:00 2001 From: noear Date: Sat, 29 Mar 2025 18:24:19 +0800 Subject: [PATCH 1/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-bom/pom.xml | 11 +- mcp-solon/mcp-solon-webrx/pom.xml | 132 +++++ .../transport/WebRxSseClientTransport.java | 385 ++++++++++++++ .../WebRxSseServerTransportProvider.java | 408 ++++++++++++++ .../WebRxSseIntegrationTests.java | 499 ++++++++++++++++++ .../client/WebRxSseMcpAsyncClientTests.java | 52 ++ .../client/WebRxSseMcpSyncClientTests.java | 53 ++ .../WebRxSseClientTransportTests.java | 334 ++++++++++++ .../server/WebRxSseMcpServerTests.java | 50 ++ .../server/transport/BlockingInputStream.java | 69 +++ mcp-spring/mcp-spring-webflux/pom.xml | 2 +- pom.xml | 37 +- 12 files changed, 2015 insertions(+), 17 deletions(-) create mode 100644 mcp-solon/mcp-solon-webrx/pom.xml create mode 100644 mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java create mode 100644 mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpServerTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/transport/BlockingInputStream.java diff --git a/mcp-bom/pom.xml b/mcp-bom/pom.xml index 77d55da3..1d5f6d58 100644 --- a/mcp-bom/pom.xml +++ b/mcp-bom/pom.xml @@ -40,20 +40,27 @@ ${project.version} - + io.modelcontextprotocol.sdk mcp-spring-webflux ${project.version} - + io.modelcontextprotocol.sdk mcp-spring-webmvc ${project.version} + + + io.modelcontextprotocol.sdk + mcp-solon-webrx + ${project.version} + + diff --git a/mcp-solon/mcp-solon-webrx/pom.xml b/mcp-solon/mcp-solon-webrx/pom.xml new file mode 100644 index 00000000..dae892d9 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/pom.xml @@ -0,0 +1,132 @@ + + + 4.0.0 + + io.modelcontextprotocol.sdk + mcp-parent + 0.9.0-SNAPSHOT + ../../pom.xml + + mcp-solon-webrx + jar + Solon WebRx implementation of the Java MCP SSE transport + + https://github.com/modelcontextprotocol/java-sdk + + + https://github.com/modelcontextprotocol/java-sdk + git://github.com/modelcontextprotocol/java-sdk.git + git@github.com/modelcontextprotocol/java-sdk.git + + + + + io.modelcontextprotocol.sdk + mcp + 0.9.0-SNAPSHOT + + + + io.modelcontextprotocol.sdk + mcp-test + 0.9.0-SNAPSHOT + test + + + + org.noear + solon-web-rx + ${solon.version} + + + + org.noear + solon-web-sse + ${solon.version} + + + + org.noear + solon-net-httputils + ${solon.version} + + + + org.noear + solon-logging-simple + ${solon.version} + test + + + + org.noear + solon-web + ${solon.version} + test + + + + org.noear + solon-test + ${solon.version} + test + + + + org.assertj + assertj-core + ${assert4j.version} + test + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + org.mockito + mockito-core + ${mockito.version} + test + + + + io.projectreactor + reactor-test + test + + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + + org.awaitility + awaitility + ${awaitility.version} + test + + + + ch.qos.logback + logback-classic + ${logback.version} + test + + + + org.junit.jupiter + junit-jupiter-params + ${junit-jupiter.version} + test + + + diff --git a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java new file mode 100644 index 00000000..5e89dace --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java @@ -0,0 +1,385 @@ +/* + * Copyright 2025 - 2025 the original author or authors. + */ +package io.modelcontextprotocol.client.transport; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage; +import io.modelcontextprotocol.util.Assert; +import org.noear.solon.core.util.MimeType; +import org.noear.solon.net.http.HttpUtils; +import org.noear.solon.net.http.textstream.ServerSentEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.SynchronousSink; +import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; +import reactor.util.retry.Retry.RetrySignal; + +import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** + * Server-Sent Events (SSE) implementation of the + * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE + * transport specification. + * + *

+ * This transport establishes a bidirectional communication channel where: + *

+ * + *

+ * The message flow follows these steps: + *

    + *
  1. The client establishes an SSE connection to the server's /sse endpoint
  2. + *
  3. The server sends an 'endpoint' event containing the URI for sending messages
  4. + *
+ * + * This implementation uses {@link org.noear.solon.net.http.HttpUtils} for HTTP communications and supports JSON + * serialization/deserialization of messages. + * + * @author Christian Tzolov + * @author noear + * @see MCP + * HTTP with SSE Transport Specification + */ +public class WebRxSseClientTransport implements McpClientTransport { + + private static final Logger logger = LoggerFactory.getLogger(WebRxSseClientTransport.class); + + /** + * Event type for JSON-RPC messages received through the SSE connection. The server + * sends messages with this event type to transmit JSON-RPC protocol data. + */ + private static final String MESSAGE_EVENT_TYPE = "message"; + + /** + * Event type for receiving the message endpoint URI from the server. The server MUST + * send this event when a client connects, providing the URI where the client should + * send its messages via HTTP POST. + */ + private static final String ENDPOINT_EVENT_TYPE = "endpoint"; + + /** + * Default SSE endpoint path as specified by the MCP transport specification. This + * endpoint is used to establish the SSE connection with the server. + */ + private static final String DEFAULT_SSE_ENDPOINT = "/sse"; + + /** + * ObjectMapper for serializing outbound messages and deserializing inbound messages. + * Handles conversion between JSON-RPC messages and their string representation. + */ + protected ObjectMapper objectMapper; + + /** + * Subscription for the SSE connection handling inbound messages. Used for cleanup + * during transport shutdown. + */ + private Disposable inboundSubscription; + + /** + * Flag indicating if the transport is in the process of shutting down. Used to + * prevent new operations during shutdown and handle cleanup gracefully. + */ + private volatile boolean isClosing = false; + + /** + * Sink for managing the message endpoint URI provided by the server. Stores the most + * recent endpoint URI and makes it available for outbound message processing. + */ + protected final Sinks.One messageEndpointSink = Sinks.one(); + + /** + * The SSE service base URL provided by the server. Used for sending outbound messages via + * HTTP POST requests. + */ + private String baseUrl; + + /** + * The SSE endpoint URI provided by the server. Used for sending outbound messages via + * HTTP POST requests. + */ + private String sseEndpoint; + + /** + * Constructs a new SseClientTransport with the specified WebClient builder. Uses a + * default ObjectMapper instance for JSON processing. + * instance + * @param baseUrl the SSE service base URL + * @throws IllegalArgumentException if webClientBuilder is null + */ + public WebRxSseClientTransport(String baseUrl) { + this(baseUrl,new ObjectMapper()); + } + + /** + * Constructs a new SseClientTransport with the specified WebClient builder and + * ObjectMapper. Initializes both inbound and outbound message processing pipelines. + * instance + * @param baseUrl the SSE service base URL + * @param objectMapper the ObjectMapper to use for JSON processing + * @throws IllegalArgumentException if either parameter is null + */ + public WebRxSseClientTransport(String baseUrl,ObjectMapper objectMapper) { + this(baseUrl, objectMapper, DEFAULT_SSE_ENDPOINT); + } + + /** + * Constructs a new SseClientTransport with the specified WebClient builder and + * ObjectMapper. Initializes both inbound and outbound message processing pipelines. + * instance + * @param baseUrl the SSE service base URL + * @param objectMapper the ObjectMapper to use for JSON processing + * @param sseEndpoint the SSE endpoint URI to use for establishing the connection + * @throws IllegalArgumentException if either parameter is null + */ + public WebRxSseClientTransport(String baseUrl,ObjectMapper objectMapper, + String sseEndpoint) { + Assert.notNull(objectMapper, "ObjectMapper must not be null"); + Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty"); + + this.baseUrl = baseUrl; + this.objectMapper = objectMapper; + this.sseEndpoint = sseEndpoint; + } + + /** + * Establishes a connection to the MCP server using Server-Sent Events (SSE). This + * method initiates the SSE connection and sets up the message processing pipeline. + * + *

+ * The connection process follows these steps: + *

    + *
  1. Establishes an SSE connection to the server's /sse endpoint
  2. + *
  3. Waits for the server to send an 'endpoint' event with the message posting + * URI
  4. + *
  5. Sets up message handling for incoming JSON-RPC messages
  6. + *
+ * + *

+ * The connection is considered established only after receiving the endpoint event + * from the server. + * @param handler a function that processes incoming JSON-RPC messages and returns + * responses + * @return a Mono that completes when the connection is fully established + * @throws McpError if there's an error processing SSE events or if an unrecognized + * event type is received + */ + @Override + public Mono connect(Function, Mono> handler) { + Flux events = eventStream(); + this.inboundSubscription = events.concatMap(event -> Mono.just(event).handle((e, s) -> { + if (ENDPOINT_EVENT_TYPE.equals(event.event())) { + String messageEndpointUri = event.data(); + if (messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) { + s.complete(); + } + else { + // TODO: clarify with the spec if multiple events can be + // received + s.error(new McpError("Failed to handle SSE endpoint event")); + } + } + else if (MESSAGE_EVENT_TYPE.equals(event.event())) { + try { + JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data()); + s.next(message); + } + catch (IOException ioException) { + s.error(ioException); + } + } + else { + s.error(new McpError("Received unrecognized SSE event type: " + event.event())); + } + }).transform(handler)).subscribe(); + + // The connection is established once the server sends the endpoint event + return messageEndpointSink.asMono().then(); + } + + /** + * Sends a JSON-RPC message to the server using the endpoint provided during + * connection. + * + *

+ * Messages are sent via HTTP POST requests to the server-provided endpoint URI. The + * message is serialized to JSON before transmission. If the transport is in the + * process of closing, the message send operation is skipped gracefully. + * @param message the JSON-RPC message to send + * @return a Mono that completes when the message has been sent successfully + * @throws RuntimeException if message serialization fails + */ + @Override + public Mono sendMessage(JSONRPCMessage message) { + // The messageEndpoint is the endpoint URI to send the messages + // It is provided by the server as part of the endpoint event + return messageEndpointSink.asMono().flatMap(messageEndpointUri -> { + if (isClosing) { + return Mono.empty(); + } + + try { + String jsonText = this.objectMapper.writeValueAsString(message); + + HttpUtils.http(baseUrl + messageEndpointUri) + .contentType(MimeType.APPLICATION_JSON_VALUE) + .bodyOfJson(jsonText) + .post(); + + logger.debug("Message sent successfully"); + } catch (Exception error) { + if (!isClosing) { + logger.error("Error sending message: {}", error.getMessage()); + } + } + + return Mono.empty(); + }).then(); // TODO: Consider non-200-ok response + } + + /** + * Initializes and starts the inbound SSE event processing. Establishes the SSE + * connection and sets up event handling for both message and endpoint events. + * Includes automatic retry logic for handling transient connection failures. + */ + // visible for tests + protected Flux eventStream() {// @formatter:off + return Flux.from(HttpUtils.http(baseUrl + this.sseEndpoint) + .accept(MimeType.TEXT_EVENT_STREAM_VALUE) + .execAsEventStream("GET")) + .retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler))); + } // @formatter:on + + /** + * Retry handler for the inbound SSE stream. Implements the retry logic for handling + * connection failures and other errors. + */ + private BiConsumer> inboundRetryHandler = (retrySpec, sink) -> { + if (isClosing) { + logger.debug("SSE connection closed during shutdown"); + sink.error(retrySpec.failure()); + return; + } + if (retrySpec.failure() instanceof IOException) { + logger.debug("Retrying SSE connection after IO error"); + sink.next(retrySpec); + return; + } + logger.error("Fatal SSE error, not retrying: {}", retrySpec.failure().getMessage()); + sink.error(retrySpec.failure()); + }; + + /** + * Implements graceful shutdown of the transport. Cleans up all resources including + * subscriptions and schedulers. Ensures orderly shutdown of both inbound and outbound + * message processing. + * @return a Mono that completes when shutdown is finished + */ + @Override + public Mono closeGracefully() { // @formatter:off + return Mono.fromRunnable(() -> { + isClosing = true; + + // Dispose of subscriptions + + if (inboundSubscription != null) { + inboundSubscription.dispose(); + } + + }) + .then() + .subscribeOn(Schedulers.boundedElastic()); + } // @formatter:on + + /** + * Unmarshalls data from a generic Object into the specified type using the configured + * ObjectMapper. + * + *

+ * This method is particularly useful when working with JSON-RPC parameters or result + * objects that need to be converted to specific Java types. It leverages Jackson's + * type conversion capabilities to handle complex object structures. + * @param the target type to convert the data into + * @param data the source object to convert + * @param typeRef the TypeReference describing the target type + * @return the unmarshalled object of type T + * @throws IllegalArgumentException if the conversion cannot be performed + */ + @Override + public T unmarshalFrom(Object data, TypeReference typeRef) { + return this.objectMapper.convertValue(data, typeRef); + } + + /** + * Creates a new builder for {@link WebRxSseClientTransport}. + * instance + * @return a new builder instance + */ + public static Builder builder(String baseUrl) { + return new Builder(baseUrl); + } + + /** + * Builder for {@link WebRxSseClientTransport}. + */ + public static class Builder { + private String baseUrl; + + private String sseEndpoint = DEFAULT_SSE_ENDPOINT; + + private ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates a new builder with the specified WebClient.Builder. + */ + public Builder(String baseUrl) { + this.baseUrl = baseUrl; + } + + /** + * Sets the SSE endpoint path. + * @param sseEndpoint the SSE endpoint path + * @return this builder + */ + public Builder sseEndpoint(String sseEndpoint) { + Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); + this.sseEndpoint = sseEndpoint; + return this; + } + + /** + * Sets the object mapper for JSON serialization/deserialization. + * @param objectMapper the object mapper + * @return this builder + */ + public Builder objectMapper(ObjectMapper objectMapper) { + Assert.notNull(objectMapper, "objectMapper must not be null"); + this.objectMapper = objectMapper; + return this; + } + + /** + * Builds a new {@link WebRxSseClientTransport} instance. + * @return a new transport instance + */ + public WebRxSseClientTransport build() { + return new WebRxSseClientTransport(baseUrl,objectMapper, sseEndpoint); + } + } +} \ No newline at end of file diff --git a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java new file mode 100644 index 00000000..2746445f --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java @@ -0,0 +1,408 @@ +/* + * Copyright 2025 - 2025 the original author or authors. + */ +package io.modelcontextprotocol.server.transport; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.*; +import io.modelcontextprotocol.util.Assert; +import org.noear.solon.SolonApp; +import org.noear.solon.Utils; +import org.noear.solon.core.handle.Context; +import org.noear.solon.core.handle.Entity; +import org.noear.solon.core.util.MimeType; +import org.noear.solon.web.sse.SseEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Server-side implementation of the MCP (Model Context Protocol) HTTP transport using + * Server-Sent Events (SSE). This implementation provides a bidirectional communication + * channel between MCP clients and servers using HTTP POST for client-to-server messages + * and SSE for server-to-client messages. + * + *

+ * Key features: + *

+ * + *

+ * The transport sets up two main endpoints: + *

+ * + *

+ * This implementation is thread-safe and can handle multiple concurrent client + * connections. It uses {@link ConcurrentHashMap} for session management and Project + * Reactor's non-blocking APIs for message processing and delivery. + * + * @author Christian Tzolov + * @author Alexandros Pappas + * @author Dariusz Jędrzejczyk + * @author noear + * @see McpServerTransport + * @see SseEvent + */ +public class WebRxSseServerTransportProvider implements McpServerTransportProvider { + + private static final Logger logger = LoggerFactory.getLogger(WebRxSseServerTransportProvider.class); + + /** + * Event type for JSON-RPC messages sent through the SSE connection. + */ + public static final String MESSAGE_EVENT_TYPE = "message"; + + /** + * Event type for sending the message endpoint URI to clients. + */ + public static final String ENDPOINT_EVENT_TYPE = "endpoint"; + + /** + * Default SSE endpoint path as specified by the MCP transport specification. + */ + public static final String DEFAULT_SSE_ENDPOINT = "/sse"; + + private final ObjectMapper objectMapper; + + private final String messageEndpoint; + + private final String sseEndpoint; + + private McpServerSession.Factory sessionFactory; + + /** + * Map of active client sessions, keyed by session ID. + */ + private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); + + /** + * Flag indicating if the transport is shutting down. + */ + private volatile boolean isClosing = false; + + /** + * Constructs a new WebFlux SSE server transport provider instance. + * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization + * of MCP messages. Must not be null. + * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC + * messages. This endpoint will be communicated to clients during SSE connection + * setup. Must not be null. + * @throws IllegalArgumentException if either parameter is null + */ + public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) { + Assert.notNull(objectMapper, "ObjectMapper must not be null"); + Assert.notNull(messageEndpoint, "Message endpoint must not be null"); + Assert.notNull(sseEndpoint, "SSE endpoint must not be null"); + + this.objectMapper = objectMapper; + this.messageEndpoint = messageEndpoint; + this.sseEndpoint = sseEndpoint; + } + + public void toHttpHandler(SolonApp app) { + app.get(this.sseEndpoint, this::handleSseConnection); + app.post(this.messageEndpoint, this::handleMessage); + } + + /** + * Constructs a new WebFlux SSE server transport provider instance with the default + * SSE endpoint. + * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization + * of MCP messages. Must not be null. + * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC + * messages. This endpoint will be communicated to clients during SSE connection + * setup. Must not be null. + * @throws IllegalArgumentException if either parameter is null + */ + public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) { + this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT); + } + + @Override + public void setSessionFactory(McpServerSession.Factory sessionFactory) { + this.sessionFactory = sessionFactory; + } + + /** + * Broadcasts a JSON-RPC message to all connected clients through their SSE + * connections. The message is serialized to JSON and sent as a server-sent event to + * each active session. + * + *

+ * The method: + *

+ * @param method The JSON-RPC method to send to clients + * @param params The method parameters to send to clients + * @return A Mono that completes when the message has been sent to all sessions, or + * errors if any session fails to receive the message + */ + @Override + public Mono notifyClients(String method, Map params) { + if (sessions.isEmpty()) { + logger.debug("No active sessions to broadcast message to"); + return Mono.empty(); + } + + logger.debug("Attempting to broadcast message to {} active sessions", sessions.size()); + + return Flux.fromStream(sessions.values().stream()) + .flatMap(session -> session.sendNotification(method, params) + .doOnError(e -> logger.error("Failed to " + "send message to session " + "{}: {}", session.getId(), + e.getMessage())) + .onErrorComplete()) + .then(); + } + + // FIXME: This javadoc makes claims about using isClosing flag but it's not actually + // doing that. + /** + * Initiates a graceful shutdown of all the sessions. This method ensures all active + * sessions are properly closed and cleaned up. + * + *

+ * The shutdown process: + *

    + *
  • Marks the transport as closing to prevent new connections
  • + *
  • Closes each active session
  • + *
  • Removes closed sessions from the sessions map
  • + *
  • Times out after 5 seconds if shutdown takes too long
  • + *
+ * @return A Mono that completes when all sessions have been closed + */ + @Override + public Mono closeGracefully() { + return Flux.fromIterable(sessions.values()) + .doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size())) + .flatMap(McpServerSession::closeGracefully) + .then(); + } + + /** + * Handles new SSE connection requests from clients. Creates a new session for each + * connection and sets up the SSE event stream. + * @param ctx The incoming server context + * @return A Mono which emits a response with the SSE event stream + */ + private void handleSseConnection(Context ctx) throws Throwable{ + if (isClosing) { + ctx.status(503); + ctx.output("Server is shutting down"); + return; + } + + Flux publisher = Flux.create(sink -> { + WebRxMcpSessionTransport sessionTransport = new WebRxMcpSessionTransport(sink); + + McpServerSession session = sessionFactory.create(sessionTransport); + String sessionId = session.getId(); + + logger.debug("Created new SSE connection for session: {}", sessionId); + sessions.put(sessionId, session); + + // Send initial endpoint event + logger.debug("Sending initial endpoint event to session: {}", sessionId); + sink.next(new SseEvent() + .name(ENDPOINT_EVENT_TYPE) + .data(messageEndpoint + "?sessionId=" + sessionId)); + sink.onCancel(() -> { + logger.debug("Session {} cancelled", sessionId); + sessions.remove(sessionId); + }); + }); + + ctx.contentType(MimeType.TEXT_EVENT_STREAM_VALUE); + ctx.returnValue(publisher); + } + + /** + * Handles incoming JSON-RPC messages from clients. Deserializes the message and + * processes it through the configured message handler. + * + *

+ * The handler: + *

    + *
  • Deserializes the incoming JSON-RPC message
  • + *
  • Passes it through the message handler chain
  • + *
  • Returns appropriate HTTP responses based on processing results
  • + *
  • Handles various error conditions with appropriate error responses
  • + *
+ * @param request The incoming server request containing the JSON-RPC message + * @return A Mono emitting the response indicating the message processing result + */ + private void handleMessage(Context request) throws Throwable { + if (isClosing) { + request.status(503); + request.output("Server is shutting down"); + return; + } + + if (Utils.isEmpty(request.param("sessionId"))) { + request.status(404); + request.render(new McpError("Session ID missing in message endpoint")); + return; + } + + McpServerSession session = sessions.get(request.param("sessionId")); + + String body = request.body(); + try { + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); + + Mono mono = session.handle(message) + .flatMap(response -> { + return Mono.just(new Entity()); + }) + .onErrorResume(error -> { + logger.error("Error processing message: {}", error.getMessage()); + // TODO: instead of signalling the error, just respond with 200 OK + // - the error is signalled on the SSE connection + // return ServerResponse.ok().build(); + return Mono.just(new Entity().status(500).body(new McpError(error.getMessage()))); + }); + + request.returnValue(mono); + } catch (IllegalArgumentException | IOException e) { + logger.error("Failed to deserialize message: {}", e.getMessage()); + request.status(400); + request.render(new McpError("Invalid message format")); + } + } + + private class WebRxMcpSessionTransport implements McpServerTransport { + + private final FluxSink sink; + + public WebRxMcpSessionTransport(FluxSink sink) { + this.sink = sink; + } + + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + return Mono.fromSupplier(() -> { + try { + return objectMapper.writeValueAsString(message); + } + catch (IOException e) { + throw Exceptions.propagate(e); + } + }).doOnNext(jsonText -> { + SseEvent event = new SseEvent() + .name(MESSAGE_EVENT_TYPE) + .data(jsonText); + sink.next(event); + }).doOnError(e -> { + // TODO log with sessionid + Throwable exception = Exceptions.unwrap(e); + sink.error(exception); + }).then(); + } + + @Override + public T unmarshalFrom(Object data, TypeReference typeRef) { + return objectMapper.convertValue(data, typeRef); + } + + @Override + public Mono closeGracefully() { + return Mono.fromRunnable(sink::complete); + } + + @Override + public void close() { + sink.complete(); + } + + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for creating instances of {@link WebRxSseServerTransportProvider}. + *

+ * This builder provides a fluent API for configuring and creating instances of + * WebFluxSseServerTransportProvider with custom settings. + */ + public static class Builder { + + private ObjectMapper objectMapper; + + private String messageEndpoint; + + private String sseEndpoint = DEFAULT_SSE_ENDPOINT; + + /** + * Sets the ObjectMapper to use for JSON serialization/deserialization of MCP + * messages. + * @param objectMapper The ObjectMapper instance. Must not be null. + * @return this builder instance + * @throws IllegalArgumentException if objectMapper is null + */ + public Builder objectMapper(ObjectMapper objectMapper) { + Assert.notNull(objectMapper, "ObjectMapper must not be null"); + this.objectMapper = objectMapper; + return this; + } + + /** + * Sets the endpoint URI where clients should send their JSON-RPC messages. + * @param messageEndpoint The message endpoint URI. Must not be null. + * @return this builder instance + * @throws IllegalArgumentException if messageEndpoint is null + */ + public Builder messageEndpoint(String messageEndpoint) { + Assert.notNull(messageEndpoint, "Message endpoint must not be null"); + this.messageEndpoint = messageEndpoint; + return this; + } + + /** + * Sets the SSE endpoint path. + * @param sseEndpoint The SSE endpoint path. Must not be null. + * @return this builder instance + * @throws IllegalArgumentException if sseEndpoint is null + */ + public Builder sseEndpoint(String sseEndpoint) { + Assert.notNull(sseEndpoint, "SSE endpoint must not be null"); + this.sseEndpoint = sseEndpoint; + return this; + } + + /** + * Builds a new instance of {@link WebRxSseServerTransportProvider} with the + * configured settings. + * @return A new WebFluxSseServerTransportProvider instance + * @throws IllegalStateException if required parameters are not set + */ + public WebRxSseServerTransportProvider build() { + Assert.notNull(objectMapper, "ObjectMapper must be set"); + Assert.notNull(messageEndpoint, "Message endpoint must be set"); + + return new WebRxSseServerTransportProvider(objectMapper, messageEndpoint, sseEndpoint); + } + } +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java new file mode 100644 index 00000000..6fac1f60 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java @@ -0,0 +1,499 @@ +/* + * Copyright 2024 - 2024 the original author or authors. + */ +package io.modelcontextprotocol; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.client.McpClient; +import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; +import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpServerFeatures; +import io.modelcontextprotocol.server.transport.WebRxSseServerTransportProvider; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.CallToolResult; +import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; +import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest; +import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult; +import io.modelcontextprotocol.spec.McpSchema.InitializeResult; +import io.modelcontextprotocol.spec.McpSchema.ModelPreferences; +import io.modelcontextprotocol.spec.McpSchema.Role; +import io.modelcontextprotocol.spec.McpSchema.Root; +import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities; +import io.modelcontextprotocol.spec.McpSchema.Tool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.noear.solon.Solon; +import org.noear.solon.net.http.HttpUtils; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; + +public class WebRxSseIntegrationTests { + + private static final int PORT = 8182; + + // private static final String MESSAGE_ENDPOINT = "/mcp/message"; + + private static final String CUSTOM_SSE_ENDPOINT = "/somePath/sse"; + + private static final String CUSTOM_MESSAGE_ENDPOINT = "/otherPath/mcp/message"; + + + private WebRxSseServerTransportProvider mcpServerTransportProvider; + + ConcurrentHashMap clientBulders = new ConcurrentHashMap<>(); + + @BeforeEach + public void before() { + + this.mcpServerTransportProvider = new WebRxSseServerTransportProvider.Builder() + .objectMapper(new ObjectMapper()) + .messageEndpoint(CUSTOM_MESSAGE_ENDPOINT) + .sseEndpoint(CUSTOM_SSE_ENDPOINT) + .build(); + + Solon.start(WebRxSseIntegrationTests.class, new String[]{"server.port=" + PORT}, app -> { + mcpServerTransportProvider.toHttpHandler(app); + }); + + clientBulders.put("httpclient", + McpClient.sync(HttpClientSseClientTransport.builder("http://localhost:" + PORT) + .sseEndpoint(CUSTOM_SSE_ENDPOINT) + .build())); + clientBulders.put("webflux", + McpClient + .sync(WebRxSseClientTransport.builder("http://localhost:" + PORT) + .sseEndpoint(CUSTOM_SSE_ENDPOINT) + .build())); + + } + + @AfterEach + public void after() { + if (Solon.app() != null) { + Solon.stopBlock(); + } + } + + // --------------------------------------- + // Sampling Tests + // --------------------------------------- + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testCreateMessageWithoutSamplingCapabilities(String clientType) { + + var clientBuilder = clientBulders.get(clientType); + + McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( + new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> { + + exchange.createMessage(mock(McpSchema.CreateMessageRequest.class)).block(); + + return Mono.just(mock(CallToolResult.class)); + }); + + McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build(); + + // Create client without sampling capabilities + var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")).build(); + + assertThat(client.initialize()).isNotNull(); + + try { + client.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class) + .hasMessage("Client must be configured with sampling capabilities"); + } + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testCreateMessageSuccess(String clientType) throws InterruptedException { + + // Client + var clientBuilder = clientBulders.get(clientType); + + Function samplingHandler = request -> { + assertThat(request.messages()).hasSize(1); + assertThat(request.messages().get(0).content()).isInstanceOf(McpSchema.TextContent.class); + + return new CreateMessageResult(Role.USER, new McpSchema.TextContent("Test message"), "MockModelName", + CreateMessageResult.StopReason.STOP_SEQUENCE); + }; + + var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) + .capabilities(ClientCapabilities.builder().sampling().build()) + .sampling(samplingHandler) + .build(); + + // Server + + CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), + null); + + McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( + new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> { + + var craeteMessageRequest = McpSchema.CreateMessageRequest.builder() + .messages(List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, + new McpSchema.TextContent("Test message")))) + .modelPreferences(ModelPreferences.builder() + .hints(List.of()) + .costPriority(1.0) + .speedPriority(1.0) + .intelligencePriority(1.0) + .build()) + .build(); + + StepVerifier.create(exchange.createMessage(craeteMessageRequest)).consumeNextWith(result -> { + assertThat(result).isNotNull(); + assertThat(result.role()).isEqualTo(Role.USER); + assertThat(result.content()).isInstanceOf(McpSchema.TextContent.class); + assertThat(((McpSchema.TextContent) result.content()).text()).isEqualTo("Test message"); + assertThat(result.model()).isEqualTo("MockModelName"); + assertThat(result.stopReason()).isEqualTo(CreateMessageResult.StopReason.STOP_SEQUENCE); + }).verifyComplete(); + + return Mono.just(callResponse); + }); + + var mcpServer = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .tools(tool) + .build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + + assertThat(response).isNotNull(); + assertThat(response).isEqualTo(callResponse); + + mcpClient.close(); + mcpServer.close(); + } + + // --------------------------------------- + // Roots Tests + // --------------------------------------- + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testRootsSuccess(String clientType) { + var clientBuilder = clientBulders.get(clientType); + + List roots = List.of(new Root("uri1://", "root1"), new Root("uri2://", "root2")); + + AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) + .build(); + + var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + .roots(roots) + .build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + assertThat(rootsRef.get()).isNull(); + + mcpClient.rootsListChangedNotification(); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); + + // Remove a root + mcpClient.removeRoot(roots.get(0).uri()); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1))); + }); + + // Add a new root + var root3 = new Root("uri3://", "root3"); + mcpClient.addRoot(root3); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1), root3)); + }); + + mcpClient.close(); + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testRootsWithoutCapability(String clientType) { + + var clientBuilder = clientBulders.get(clientType); + + McpServerFeatures.SyncToolSpecification tool = new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> { + + exchange.listRoots(); // try to list roots + + return mock(CallToolResult.class); + }); + + var mcpServer = McpServer.sync(mcpServerTransportProvider).rootsChangeHandler((exchange, rootsUpdate) -> { + }).tools(tool).build(); + + // Create client without roots capability + // No roots capability + var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()).build(); + + assertThat(mcpClient.initialize()).isNotNull(); + + // Attempt to list roots should fail + try { + mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class).hasMessage("Roots not supported"); + } + + mcpClient.close(); + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testRootsNotifciationWithEmptyRootsList(String clientType) { + var clientBuilder = clientBulders.get(clientType); + + AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) + .build(); + + var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + .roots(List.of()) // Empty roots list + .build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + mcpClient.rootsListChangedNotification(); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); + + mcpClient.close(); + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testRootsWithMultipleHandlers(String clientType) { + var clientBuilder = clientBulders.get(clientType); + + List roots = List.of(new Root("uri1://", "root1")); + + AtomicReference> rootsRef1 = new AtomicReference<>(); + AtomicReference> rootsRef2 = new AtomicReference<>(); + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef1.set(rootsUpdate)) + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef2.set(rootsUpdate)) + .build(); + + var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + .roots(roots) + .build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + mcpClient.rootsListChangedNotification(); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef1.get()).containsAll(roots); + assertThat(rootsRef2.get()).containsAll(roots); + }); + + mcpClient.close(); + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testRootsServerCloseWithActiveSubscription(String clientType) { + + var clientBuilder = clientBulders.get(clientType); + + List roots = List.of(new Root("uri1://", "root1")); + + AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) + .build(); + + var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + .roots(roots) + .build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + mcpClient.rootsListChangedNotification(); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); + + // Close server while subscription is active + mcpServer.close(); + + // Verify client can handle server closure gracefully + mcpClient.close(); + } + + // --------------------------------------- + // Tools Tests + // --------------------------------------- + + String emptyJsonSchema = """ + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + } + """; + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testToolCallSuccess(String clientType) { + + var clientBuilder = clientBulders.get(clientType); + + var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); + McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> { + // perform a blocking call to a remote service + String response = HttpUtils.http("https://github.com/modelcontextprotocol/specification/blob/main/README.md") + .get(); + assertThat(response).isNotBlank(); + return callResponse; + }); + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool1) + .build(); + + var mcpClient = clientBuilder.build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + + assertThat(response).isNotNull(); + assertThat(response).isEqualTo(callResponse); + + mcpClient.close(); + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testToolListChangeHandlingSuccess(String clientType) { + + var clientBuilder = clientBulders.get(clientType); + + var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); + McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> { + // perform a blocking call to a remote service + String response = HttpUtils + .http("https://github.com/modelcontextprotocol/specification/blob/main/README.md") + .get(); + assertThat(response).isNotBlank(); + return callResponse; + }); + + var mcpServer = McpServer.sync(mcpServerTransportProvider) + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool1) + .build(); + + AtomicReference> rootsRef = new AtomicReference<>(); + var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { + // perform a blocking call to a remote service + String response = HttpUtils + .http("https://github.com/modelcontextprotocol/specification/blob/main/README.md") + .get(); + assertThat(response).isNotBlank(); + rootsRef.set(toolsUpdate); + }).build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + assertThat(rootsRef.get()).isNull(); + + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + + mcpServer.notifyToolsListChanged(); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool1.tool())); + }); + + // Remove a tool + mcpServer.removeTool("tool1"); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); + + // Add a new tool + McpServerFeatures.SyncToolSpecification tool2 = new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool2", "tool2 description", emptyJsonSchema), (exchange, request) -> callResponse); + + mcpServer.addTool(tool2); + + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool2.tool())); + }); + + mcpClient.close(); + mcpServer.close(); + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = {"httpclient", "webflux"}) + void testInitialize(String clientType) { + + var clientBuilder = clientBulders.get(clientType); + + var mcpServer = McpServer.sync(mcpServerTransportProvider).build(); + + var mcpClient = clientBuilder.build(); + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + mcpClient.close(); + mcpServer.close(); + } +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java new file mode 100644 index 00000000..b92ee0c2 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import java.time.Duration; + +import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; +import io.modelcontextprotocol.spec.McpClientTransport; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +/** + * Tests for the {@link McpAsyncClient} with {@link WebRxSseClientTransport}. + * + * @author Christian Tzolov + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebRxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests { + + static String host = "http://localhost:3001"; + + // Uses the https://github.com/tzolov/mcp-everything-server-docker-image + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + @Override + protected McpClientTransport createMcpTransport() { + return WebRxSseClientTransport.builder(host).build(); + } + + @Override + protected void onStart() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @Override + public void onClose() { + container.stop(); + } + + protected Duration getInitializationTimeout() { + return Duration.ofSeconds(1); + } +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java new file mode 100644 index 00000000..05478290 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java @@ -0,0 +1,53 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import java.time.Duration; + +import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; +import io.modelcontextprotocol.spec.McpClientTransport; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +/** + * Tests for the {@link McpSyncClient} with {@link WebRxSseClientTransport}. + * + * @author Christian Tzolov + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebRxSseMcpSyncClientTests extends AbstractMcpSyncClientTests { + + static String host = "http://localhost:3001"; + + // Uses the https://github.com/tzolov/mcp-everything-server-docker-image + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + @Override + protected McpClientTransport createMcpTransport() { + return WebRxSseClientTransport.builder(host).build(); + } + + @Override + protected void onStart() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @Override + protected void onClose() { + container.stop(); + } + + protected Duration getInitializationTimeout() { + return Duration.ofSeconds(1); + } + +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java new file mode 100644 index 00000000..16aff87d --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java @@ -0,0 +1,334 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.noear.solon.net.http.textstream.ServerSentEvent; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.test.StepVerifier; + + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for the {@link WebRxSseClientTransport} class. + * + * @author Christian Tzolov + */ +@Timeout(15) +class WebRxSseClientTransportTests { + + static String host = "http://localhost:3001"; + + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + private TestSseClientTransport transport; + + private ObjectMapper objectMapper; + + // Test class to access protected methods + static class TestSseClientTransport extends WebRxSseClientTransport { + + private final AtomicInteger inboundMessageCount = new AtomicInteger(0); + + private Sinks.Many events = Sinks.many().unicast().onBackpressureBuffer(); + + public TestSseClientTransport(String baseUrl, ObjectMapper objectMapper) { + super(baseUrl, objectMapper); + } + + @Override + protected Flux eventStream() { + return super.eventStream().mergeWith(events.asFlux()); + } + + public String getLastEndpoint() { + return messageEndpointSink.asMono().block(); + } + + public int getInboundMessageCount() { + return inboundMessageCount.get(); + } + + public void simulateEndpointEvent(String jsonMessage) { + events.tryEmitNext(new ServerSentEvent(null, "endpoint", jsonMessage, null)); + inboundMessageCount.incrementAndGet(); + } + + public void simulateMessageEvent(String jsonMessage) { + events.tryEmitNext(new ServerSentEvent(null, "message", jsonMessage, null)); + inboundMessageCount.incrementAndGet(); + } + + } + + void startContainer() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @BeforeEach + void setUp() { + startContainer(); + objectMapper = new ObjectMapper(); + transport = new TestSseClientTransport(host, objectMapper); + transport.connect(Function.identity()).block(); + } + + @AfterEach + void afterEach() { + if (transport != null) { + assertThatCode(() -> transport.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); + } + cleanup(); + } + + void cleanup() { + container.stop(); + } + + @Test + void testEndpointEventHandling() { + assertThat(transport.getLastEndpoint()).startsWith("/message?"); + } + + @Test + void constructorValidation() { + assertThatThrownBy(() -> new WebRxSseClientTransport(null)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("WebClient.Builder must not be null"); + + assertThatThrownBy(() -> new WebRxSseClientTransport(host, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ObjectMapper must not be null"); + } + + @Test + void testBuilderPattern() { + // Test default builder + WebRxSseClientTransport transport1 = WebRxSseClientTransport.builder(host).build(); + assertThatCode(() -> transport1.closeGracefully().block()).doesNotThrowAnyException(); + + // Test builder with custom ObjectMapper + ObjectMapper customMapper = new ObjectMapper(); + WebRxSseClientTransport transport2 = WebRxSseClientTransport.builder(host) + .objectMapper(customMapper) + .build(); + assertThatCode(() -> transport2.closeGracefully().block()).doesNotThrowAnyException(); + + // Test builder with custom SSE endpoint + WebRxSseClientTransport transport3 = WebRxSseClientTransport.builder(host) + .sseEndpoint("/custom-sse") + .build(); + assertThatCode(() -> transport3.closeGracefully().block()).doesNotThrowAnyException(); + + // Test builder with all custom parameters + WebRxSseClientTransport transport4 = WebRxSseClientTransport.builder(host) + .objectMapper(customMapper) + .sseEndpoint("/custom-sse") + .build(); + assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException(); + } + + @Test + void testMessageProcessing() { + // Create a test message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Simulate receiving the message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "test-method", + "id": "test-id", + "params": {"key": "value"} + } + """); + + // Subscribe to messages and verify + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testResponseMessageProcessing() { + // Simulate receiving a response message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "id": "test-id", + "result": {"status": "success"} + } + """); + + // Create and send a request message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message handling + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testErrorMessageProcessing() { + // Simulate receiving an error message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "id": "test-id", + "error": { + "code": -32600, + "message": "Invalid Request" + } + } + """); + + // Create and send a request message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message handling + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testNotificationMessageProcessing() { + // Simulate receiving a notification message (no id) + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "update", + "params": {"status": "processing"} + } + """); + + // Verify the notification was processed + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testGracefulShutdown() { + // Test graceful shutdown + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + // Create a test message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message is not processed after shutdown + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + // Message count should remain 0 after shutdown + assertThat(transport.getInboundMessageCount()).isEqualTo(0); + } + + @Test + void testRetryBehavior() { + // Create a WebClient that simulates connection failures + WebRxSseClientTransport failingTransport = WebRxSseClientTransport.builder("http://non-existent-host").build(); + + // Verify that the transport attempts to reconnect + StepVerifier.create(Mono.delay(Duration.ofSeconds(2))).expectNextCount(1).verifyComplete(); + + // Clean up + failingTransport.closeGracefully().block(); + } + + @Test + void testMultipleMessageProcessing() { + // Simulate receiving multiple messages in sequence + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "method1", + "id": "id1", + "params": {"key": "value1"} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "method2", + "id": "id2", + "params": {"key": "value2"} + } + """); + + // Create and send corresponding messages + JSONRPCRequest message1 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method1", "id1", + Map.of("key", "value1")); + + JSONRPCRequest message2 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method2", "id2", + Map.of("key", "value2")); + + // Verify both messages are processed + StepVerifier.create(transport.sendMessage(message1).then(transport.sendMessage(message2))).verifyComplete(); + + // Verify message count + assertThat(transport.getInboundMessageCount()).isEqualTo(2); + } + + @Test + void testMessageOrderPreservation() { + // Simulate receiving messages in a specific order + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "first", + "id": "1", + "params": {"sequence": 1} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "second", + "id": "2", + "params": {"sequence": 2} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "third", + "id": "3", + "params": {"sequence": 3} + } + """); + + // Verify message count and order + assertThat(transport.getInboundMessageCount()).isEqualTo(3); + } +} \ No newline at end of file diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpServerTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpServerTests.java new file mode 100644 index 00000000..88be75fc --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpServerTests.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.server.transport.WebRxSseServerTransportProvider; +import io.modelcontextprotocol.spec.McpServerTransportProvider; +import org.junit.jupiter.api.Timeout; +import org.noear.solon.Solon; + +/** + * Tests for {@link McpSyncServer} using {@link WebRxSseServerTransportProvider}. + * + * @author Christian Tzolov + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebRxSseMcpServerTests extends AbstractMcpSyncServerTests { + + private static final int PORT = 8182; + + private static final String MESSAGE_ENDPOINT = "/mcp/message"; + + private WebRxSseServerTransportProvider transportProvider; + + @Override + protected McpServerTransportProvider createMcpTransportProvider() { + transportProvider = new WebRxSseServerTransportProvider.Builder().objectMapper(new ObjectMapper()) + .messageEndpoint(MESSAGE_ENDPOINT) + .build(); + + Solon.start(WebRxSseMcpServerTests.class, new String[]{"-server.port=" + PORT}, app -> { + transportProvider.toHttpHandler(app); + }); + + return transportProvider; + } + + @Override + protected void onStart() { + } + + @Override + protected void onClose() { + if (Solon.app() != null) { + Solon.stopBlock(); + } + } +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/transport/BlockingInputStream.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/transport/BlockingInputStream.java new file mode 100644 index 00000000..0ab72a99 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/transport/BlockingInputStream.java @@ -0,0 +1,69 @@ +/* +* Copyright 2024 - 2024 the original author or authors. +*/ +package io.modelcontextprotocol.server.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class BlockingInputStream extends InputStream { + + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + + private volatile boolean completed = false; + + private volatile boolean closed = false; + + @Override + public int read() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + + try { + Integer value = queue.poll(); + if (value == null) { + if (completed) { + return -1; + } + value = queue.take(); // Blocks until data is available + if (value == null && completed) { + return -1; + } + } + return value; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Read interrupted", e); + } + } + + public void write(int b) { + if (!closed && !completed) { + queue.offer(b); + } + } + + public void write(byte[] data) { + if (!closed && !completed) { + for (byte b : data) { + queue.offer((int) b & 0xFF); + } + } + } + + public void complete() { + this.completed = true; + } + + @Override + public void close() { + this.closed = true; + this.completed = true; + this.queue.clear(); + } + +} \ No newline at end of file diff --git a/mcp-spring/mcp-spring-webflux/pom.xml b/mcp-spring/mcp-spring-webflux/pom.xml index 186ade79..57d52e5c 100644 --- a/mcp-spring/mcp-spring-webflux/pom.xml +++ b/mcp-spring/mcp-spring-webflux/pom.xml @@ -11,7 +11,7 @@ mcp-spring-webflux jar - WebFlux implementation of the Java MCP SSE transport + Spring WebFlux implementation of the Java MCP SSE transport https://github.com/modelcontextprotocol/java-sdk diff --git a/pom.xml b/pom.xml index 8e7cca2a..8ad67981 100644 --- a/pom.xml +++ b/pom.xml @@ -93,12 +93,14 @@ 4.2.0 7.1.0 4.1.0 + 3.1.2-SNAPSHOT mcp-bom mcp + mcp-solon/mcp-solon-webrx mcp-spring/mcp-spring-webflux mcp-spring/mcp-spring-webmvc mcp-test @@ -106,20 +108,6 @@ - - io.spring.javaformat - spring-javaformat-maven-plugin - ${spring-javaformat-maven-plugin.version} - - - validate - true - - validate - - - - org.apache.maven.plugins maven-site-plugin @@ -328,6 +316,27 @@ + + spring-javaformat + + + + io.spring.javaformat + spring-javaformat-maven-plugin + ${spring-javaformat-maven-plugin.version} + + + validate + true + + validate + + + + + + + From a08ff405883f8ade996beb668d50e9ba53c86d2e Mon Sep 17 00:00:00 2001 From: noear Date: Tue, 1 Apr 2025 22:04:18 +0800 Subject: [PATCH 2/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/mcp-solon-webrx/pom.xml | 23 +- .../transport/WebRxSseClientTransport.java | 385 ------------------ .../WebRxSseIntegrationTests.java | 30 +- .../client/WebRxSseMcpAsyncClientTests.java | 52 --- .../client/WebRxSseMcpSyncClientTests.java | 53 --- .../WebRxSseClientTransportTests.java | 334 --------------- .../server/WebRxSseMcpAsyncServerTests.java | 55 +++ ...s.java => WebRxSseMcpSyncServerTests.java} | 11 +- 8 files changed, 99 insertions(+), 844 deletions(-) delete mode 100644 mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java delete mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java delete mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java delete mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpAsyncServerTests.java rename mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/{WebRxSseMcpServerTests.java => WebRxSseMcpSyncServerTests.java} (76%) diff --git a/mcp-solon/mcp-solon-webrx/pom.xml b/mcp-solon/mcp-solon-webrx/pom.xml index dae892d9..3cb16597 100644 --- a/mcp-solon/mcp-solon-webrx/pom.xml +++ b/mcp-solon/mcp-solon-webrx/pom.xml @@ -69,7 +69,7 @@ org.noear - solon-test + solon-boot-jetty ${solon.version} test @@ -129,4 +129,25 @@ test + + + + sonatype-nexus-snapshots + Sonatype Nexus Snapshots + https://oss.sonatype.org/content/repositories/snapshots + + false + + + + + + sonatype-nexus-snapshots + Sonatype Nexus Snapshots + https://oss.sonatype.org/content/repositories/snapshots + + false + + + diff --git a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java deleted file mode 100644 index 5e89dace..00000000 --- a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Copyright 2025 - 2025 the original author or authors. - */ -package io.modelcontextprotocol.client.transport; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpError; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage; -import io.modelcontextprotocol.util.Assert; -import org.noear.solon.core.util.MimeType; -import org.noear.solon.net.http.HttpUtils; -import org.noear.solon.net.http.textstream.ServerSentEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.SynchronousSink; -import reactor.core.scheduler.Schedulers; -import reactor.util.retry.Retry; -import reactor.util.retry.Retry.RetrySignal; - -import java.io.IOException; -import java.util.function.BiConsumer; -import java.util.function.Function; - -/** - * Server-Sent Events (SSE) implementation of the - * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE - * transport specification. - * - *

- * This transport establishes a bidirectional communication channel where: - *

    - *
  • Inbound messages are received through an SSE connection from the server
  • - *
  • Outbound messages are sent via HTTP POST requests to a server-provided - * endpoint
  • - *
- * - *

- * The message flow follows these steps: - *

    - *
  1. The client establishes an SSE connection to the server's /sse endpoint
  2. - *
  3. The server sends an 'endpoint' event containing the URI for sending messages
  4. - *
- * - * This implementation uses {@link org.noear.solon.net.http.HttpUtils} for HTTP communications and supports JSON - * serialization/deserialization of messages. - * - * @author Christian Tzolov - * @author noear - * @see MCP - * HTTP with SSE Transport Specification - */ -public class WebRxSseClientTransport implements McpClientTransport { - - private static final Logger logger = LoggerFactory.getLogger(WebRxSseClientTransport.class); - - /** - * Event type for JSON-RPC messages received through the SSE connection. The server - * sends messages with this event type to transmit JSON-RPC protocol data. - */ - private static final String MESSAGE_EVENT_TYPE = "message"; - - /** - * Event type for receiving the message endpoint URI from the server. The server MUST - * send this event when a client connects, providing the URI where the client should - * send its messages via HTTP POST. - */ - private static final String ENDPOINT_EVENT_TYPE = "endpoint"; - - /** - * Default SSE endpoint path as specified by the MCP transport specification. This - * endpoint is used to establish the SSE connection with the server. - */ - private static final String DEFAULT_SSE_ENDPOINT = "/sse"; - - /** - * ObjectMapper for serializing outbound messages and deserializing inbound messages. - * Handles conversion between JSON-RPC messages and their string representation. - */ - protected ObjectMapper objectMapper; - - /** - * Subscription for the SSE connection handling inbound messages. Used for cleanup - * during transport shutdown. - */ - private Disposable inboundSubscription; - - /** - * Flag indicating if the transport is in the process of shutting down. Used to - * prevent new operations during shutdown and handle cleanup gracefully. - */ - private volatile boolean isClosing = false; - - /** - * Sink for managing the message endpoint URI provided by the server. Stores the most - * recent endpoint URI and makes it available for outbound message processing. - */ - protected final Sinks.One messageEndpointSink = Sinks.one(); - - /** - * The SSE service base URL provided by the server. Used for sending outbound messages via - * HTTP POST requests. - */ - private String baseUrl; - - /** - * The SSE endpoint URI provided by the server. Used for sending outbound messages via - * HTTP POST requests. - */ - private String sseEndpoint; - - /** - * Constructs a new SseClientTransport with the specified WebClient builder. Uses a - * default ObjectMapper instance for JSON processing. - * instance - * @param baseUrl the SSE service base URL - * @throws IllegalArgumentException if webClientBuilder is null - */ - public WebRxSseClientTransport(String baseUrl) { - this(baseUrl,new ObjectMapper()); - } - - /** - * Constructs a new SseClientTransport with the specified WebClient builder and - * ObjectMapper. Initializes both inbound and outbound message processing pipelines. - * instance - * @param baseUrl the SSE service base URL - * @param objectMapper the ObjectMapper to use for JSON processing - * @throws IllegalArgumentException if either parameter is null - */ - public WebRxSseClientTransport(String baseUrl,ObjectMapper objectMapper) { - this(baseUrl, objectMapper, DEFAULT_SSE_ENDPOINT); - } - - /** - * Constructs a new SseClientTransport with the specified WebClient builder and - * ObjectMapper. Initializes both inbound and outbound message processing pipelines. - * instance - * @param baseUrl the SSE service base URL - * @param objectMapper the ObjectMapper to use for JSON processing - * @param sseEndpoint the SSE endpoint URI to use for establishing the connection - * @throws IllegalArgumentException if either parameter is null - */ - public WebRxSseClientTransport(String baseUrl,ObjectMapper objectMapper, - String sseEndpoint) { - Assert.notNull(objectMapper, "ObjectMapper must not be null"); - Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty"); - - this.baseUrl = baseUrl; - this.objectMapper = objectMapper; - this.sseEndpoint = sseEndpoint; - } - - /** - * Establishes a connection to the MCP server using Server-Sent Events (SSE). This - * method initiates the SSE connection and sets up the message processing pipeline. - * - *

- * The connection process follows these steps: - *

    - *
  1. Establishes an SSE connection to the server's /sse endpoint
  2. - *
  3. Waits for the server to send an 'endpoint' event with the message posting - * URI
  4. - *
  5. Sets up message handling for incoming JSON-RPC messages
  6. - *
- * - *

- * The connection is considered established only after receiving the endpoint event - * from the server. - * @param handler a function that processes incoming JSON-RPC messages and returns - * responses - * @return a Mono that completes when the connection is fully established - * @throws McpError if there's an error processing SSE events or if an unrecognized - * event type is received - */ - @Override - public Mono connect(Function, Mono> handler) { - Flux events = eventStream(); - this.inboundSubscription = events.concatMap(event -> Mono.just(event).handle((e, s) -> { - if (ENDPOINT_EVENT_TYPE.equals(event.event())) { - String messageEndpointUri = event.data(); - if (messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) { - s.complete(); - } - else { - // TODO: clarify with the spec if multiple events can be - // received - s.error(new McpError("Failed to handle SSE endpoint event")); - } - } - else if (MESSAGE_EVENT_TYPE.equals(event.event())) { - try { - JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data()); - s.next(message); - } - catch (IOException ioException) { - s.error(ioException); - } - } - else { - s.error(new McpError("Received unrecognized SSE event type: " + event.event())); - } - }).transform(handler)).subscribe(); - - // The connection is established once the server sends the endpoint event - return messageEndpointSink.asMono().then(); - } - - /** - * Sends a JSON-RPC message to the server using the endpoint provided during - * connection. - * - *

- * Messages are sent via HTTP POST requests to the server-provided endpoint URI. The - * message is serialized to JSON before transmission. If the transport is in the - * process of closing, the message send operation is skipped gracefully. - * @param message the JSON-RPC message to send - * @return a Mono that completes when the message has been sent successfully - * @throws RuntimeException if message serialization fails - */ - @Override - public Mono sendMessage(JSONRPCMessage message) { - // The messageEndpoint is the endpoint URI to send the messages - // It is provided by the server as part of the endpoint event - return messageEndpointSink.asMono().flatMap(messageEndpointUri -> { - if (isClosing) { - return Mono.empty(); - } - - try { - String jsonText = this.objectMapper.writeValueAsString(message); - - HttpUtils.http(baseUrl + messageEndpointUri) - .contentType(MimeType.APPLICATION_JSON_VALUE) - .bodyOfJson(jsonText) - .post(); - - logger.debug("Message sent successfully"); - } catch (Exception error) { - if (!isClosing) { - logger.error("Error sending message: {}", error.getMessage()); - } - } - - return Mono.empty(); - }).then(); // TODO: Consider non-200-ok response - } - - /** - * Initializes and starts the inbound SSE event processing. Establishes the SSE - * connection and sets up event handling for both message and endpoint events. - * Includes automatic retry logic for handling transient connection failures. - */ - // visible for tests - protected Flux eventStream() {// @formatter:off - return Flux.from(HttpUtils.http(baseUrl + this.sseEndpoint) - .accept(MimeType.TEXT_EVENT_STREAM_VALUE) - .execAsEventStream("GET")) - .retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler))); - } // @formatter:on - - /** - * Retry handler for the inbound SSE stream. Implements the retry logic for handling - * connection failures and other errors. - */ - private BiConsumer> inboundRetryHandler = (retrySpec, sink) -> { - if (isClosing) { - logger.debug("SSE connection closed during shutdown"); - sink.error(retrySpec.failure()); - return; - } - if (retrySpec.failure() instanceof IOException) { - logger.debug("Retrying SSE connection after IO error"); - sink.next(retrySpec); - return; - } - logger.error("Fatal SSE error, not retrying: {}", retrySpec.failure().getMessage()); - sink.error(retrySpec.failure()); - }; - - /** - * Implements graceful shutdown of the transport. Cleans up all resources including - * subscriptions and schedulers. Ensures orderly shutdown of both inbound and outbound - * message processing. - * @return a Mono that completes when shutdown is finished - */ - @Override - public Mono closeGracefully() { // @formatter:off - return Mono.fromRunnable(() -> { - isClosing = true; - - // Dispose of subscriptions - - if (inboundSubscription != null) { - inboundSubscription.dispose(); - } - - }) - .then() - .subscribeOn(Schedulers.boundedElastic()); - } // @formatter:on - - /** - * Unmarshalls data from a generic Object into the specified type using the configured - * ObjectMapper. - * - *

- * This method is particularly useful when working with JSON-RPC parameters or result - * objects that need to be converted to specific Java types. It leverages Jackson's - * type conversion capabilities to handle complex object structures. - * @param the target type to convert the data into - * @param data the source object to convert - * @param typeRef the TypeReference describing the target type - * @return the unmarshalled object of type T - * @throws IllegalArgumentException if the conversion cannot be performed - */ - @Override - public T unmarshalFrom(Object data, TypeReference typeRef) { - return this.objectMapper.convertValue(data, typeRef); - } - - /** - * Creates a new builder for {@link WebRxSseClientTransport}. - * instance - * @return a new builder instance - */ - public static Builder builder(String baseUrl) { - return new Builder(baseUrl); - } - - /** - * Builder for {@link WebRxSseClientTransport}. - */ - public static class Builder { - private String baseUrl; - - private String sseEndpoint = DEFAULT_SSE_ENDPOINT; - - private ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Creates a new builder with the specified WebClient.Builder. - */ - public Builder(String baseUrl) { - this.baseUrl = baseUrl; - } - - /** - * Sets the SSE endpoint path. - * @param sseEndpoint the SSE endpoint path - * @return this builder - */ - public Builder sseEndpoint(String sseEndpoint) { - Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); - this.sseEndpoint = sseEndpoint; - return this; - } - - /** - * Sets the object mapper for JSON serialization/deserialization. - * @param objectMapper the object mapper - * @return this builder - */ - public Builder objectMapper(ObjectMapper objectMapper) { - Assert.notNull(objectMapper, "objectMapper must not be null"); - this.objectMapper = objectMapper; - return this; - } - - /** - * Builds a new {@link WebRxSseClientTransport} instance. - * @return a new transport instance - */ - public WebRxSseClientTransport build() { - return new WebRxSseClientTransport(baseUrl,objectMapper, sseEndpoint); - } - } -} \ No newline at end of file diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java index 6fac1f60..e9f7dec4 100644 --- a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java @@ -13,7 +13,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; -import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; import io.modelcontextprotocol.server.McpServer; import io.modelcontextprotocol.server.McpServerFeatures; import io.modelcontextprotocol.server.transport.WebRxSseServerTransportProvider; @@ -34,6 +33,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.noear.solon.Solon; +import org.noear.solon.boot.http.HttpServerConfigure; import org.noear.solon.net.http.HttpUtils; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -68,17 +68,15 @@ public void before() { Solon.start(WebRxSseIntegrationTests.class, new String[]{"server.port=" + PORT}, app -> { mcpServerTransportProvider.toHttpHandler(app); + app.onEvent(HttpServerConfigure.class, event -> { + event.enableDebug(true); + }); }); clientBulders.put("httpclient", McpClient.sync(HttpClientSseClientTransport.builder("http://localhost:" + PORT) .sseEndpoint(CUSTOM_SSE_ENDPOINT) .build())); - clientBulders.put("webflux", - McpClient - .sync(WebRxSseClientTransport.builder("http://localhost:" + PORT) - .sseEndpoint(CUSTOM_SSE_ENDPOINT) - .build())); } @@ -93,7 +91,7 @@ public void after() { // Sampling Tests // --------------------------------------- @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testCreateMessageWithoutSamplingCapabilities(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -122,7 +120,7 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) { } @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testCreateMessageSuccess(String clientType) throws InterruptedException { // Client @@ -193,7 +191,7 @@ void testCreateMessageSuccess(String clientType) throws InterruptedException { // Roots Tests // --------------------------------------- @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testRootsSuccess(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -239,7 +237,7 @@ void testRootsSuccess(String clientType) { } @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testRootsWithoutCapability(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -273,7 +271,7 @@ void testRootsWithoutCapability(String clientType) { } @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testRootsNotifciationWithEmptyRootsList(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -300,7 +298,7 @@ void testRootsNotifciationWithEmptyRootsList(String clientType) { } @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testRootsWithMultipleHandlers(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -333,7 +331,7 @@ void testRootsWithMultipleHandlers(String clientType) { } @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testRootsServerCloseWithActiveSubscription(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -378,7 +376,7 @@ void testRootsServerCloseWithActiveSubscription(String clientType) { """; @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testToolCallSuccess(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -415,7 +413,7 @@ void testToolCallSuccess(String clientType) { } @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testToolListChangeHandlingSuccess(String clientType) { var clientBuilder = clientBulders.get(clientType); @@ -481,7 +479,7 @@ void testToolListChangeHandlingSuccess(String clientType) { } @ParameterizedTest(name = "{0} : {displayName} ") - @ValueSource(strings = {"httpclient", "webflux"}) + @ValueSource(strings = {"httpclient"}) void testInitialize(String clientType) { var clientBuilder = clientBulders.get(clientType); diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java deleted file mode 100644 index b92ee0c2..00000000 --- a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2024-2024 the original author or authors. - */ - -package io.modelcontextprotocol.client; - -import java.time.Duration; - -import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; -import io.modelcontextprotocol.spec.McpClientTransport; -import org.junit.jupiter.api.Timeout; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; - -/** - * Tests for the {@link McpAsyncClient} with {@link WebRxSseClientTransport}. - * - * @author Christian Tzolov - */ -@Timeout(15) // Giving extra time beyond the client timeout -class WebRxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests { - - static String host = "http://localhost:3001"; - - // Uses the https://github.com/tzolov/mcp-everything-server-docker-image - @SuppressWarnings("resource") - GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") - .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) - .withExposedPorts(3001) - .waitingFor(Wait.forHttp("/").forStatusCode(404)); - - @Override - protected McpClientTransport createMcpTransport() { - return WebRxSseClientTransport.builder(host).build(); - } - - @Override - protected void onStart() { - container.start(); - int port = container.getMappedPort(3001); - host = "http://" + container.getHost() + ":" + port; - } - - @Override - public void onClose() { - container.stop(); - } - - protected Duration getInitializationTimeout() { - return Duration.ofSeconds(1); - } -} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java deleted file mode 100644 index 05478290..00000000 --- a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2024-2024 the original author or authors. - */ - -package io.modelcontextprotocol.client; - -import java.time.Duration; - -import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; -import io.modelcontextprotocol.spec.McpClientTransport; -import org.junit.jupiter.api.Timeout; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; - -/** - * Tests for the {@link McpSyncClient} with {@link WebRxSseClientTransport}. - * - * @author Christian Tzolov - */ -@Timeout(15) // Giving extra time beyond the client timeout -class WebRxSseMcpSyncClientTests extends AbstractMcpSyncClientTests { - - static String host = "http://localhost:3001"; - - // Uses the https://github.com/tzolov/mcp-everything-server-docker-image - @SuppressWarnings("resource") - GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") - .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) - .withExposedPorts(3001) - .waitingFor(Wait.forHttp("/").forStatusCode(404)); - - @Override - protected McpClientTransport createMcpTransport() { - return WebRxSseClientTransport.builder(host).build(); - } - - @Override - protected void onStart() { - container.start(); - int port = container.getMappedPort(3001); - host = "http://" + container.getHost() + ":" + port; - } - - @Override - protected void onClose() { - container.stop(); - } - - protected Duration getInitializationTimeout() { - return Duration.ofSeconds(1); - } - -} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java deleted file mode 100644 index 16aff87d..00000000 --- a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Copyright 2024-2024 the original author or authors. - */ - -package io.modelcontextprotocol.client.transport; - -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.noear.solon.net.http.textstream.ServerSentEvent; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.test.StepVerifier; - - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** - * Tests for the {@link WebRxSseClientTransport} class. - * - * @author Christian Tzolov - */ -@Timeout(15) -class WebRxSseClientTransportTests { - - static String host = "http://localhost:3001"; - - @SuppressWarnings("resource") - GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") - .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) - .withExposedPorts(3001) - .waitingFor(Wait.forHttp("/").forStatusCode(404)); - - private TestSseClientTransport transport; - - private ObjectMapper objectMapper; - - // Test class to access protected methods - static class TestSseClientTransport extends WebRxSseClientTransport { - - private final AtomicInteger inboundMessageCount = new AtomicInteger(0); - - private Sinks.Many events = Sinks.many().unicast().onBackpressureBuffer(); - - public TestSseClientTransport(String baseUrl, ObjectMapper objectMapper) { - super(baseUrl, objectMapper); - } - - @Override - protected Flux eventStream() { - return super.eventStream().mergeWith(events.asFlux()); - } - - public String getLastEndpoint() { - return messageEndpointSink.asMono().block(); - } - - public int getInboundMessageCount() { - return inboundMessageCount.get(); - } - - public void simulateEndpointEvent(String jsonMessage) { - events.tryEmitNext(new ServerSentEvent(null, "endpoint", jsonMessage, null)); - inboundMessageCount.incrementAndGet(); - } - - public void simulateMessageEvent(String jsonMessage) { - events.tryEmitNext(new ServerSentEvent(null, "message", jsonMessage, null)); - inboundMessageCount.incrementAndGet(); - } - - } - - void startContainer() { - container.start(); - int port = container.getMappedPort(3001); - host = "http://" + container.getHost() + ":" + port; - } - - @BeforeEach - void setUp() { - startContainer(); - objectMapper = new ObjectMapper(); - transport = new TestSseClientTransport(host, objectMapper); - transport.connect(Function.identity()).block(); - } - - @AfterEach - void afterEach() { - if (transport != null) { - assertThatCode(() -> transport.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); - } - cleanup(); - } - - void cleanup() { - container.stop(); - } - - @Test - void testEndpointEventHandling() { - assertThat(transport.getLastEndpoint()).startsWith("/message?"); - } - - @Test - void constructorValidation() { - assertThatThrownBy(() -> new WebRxSseClientTransport(null)).isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("WebClient.Builder must not be null"); - - assertThatThrownBy(() -> new WebRxSseClientTransport(host, null)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("ObjectMapper must not be null"); - } - - @Test - void testBuilderPattern() { - // Test default builder - WebRxSseClientTransport transport1 = WebRxSseClientTransport.builder(host).build(); - assertThatCode(() -> transport1.closeGracefully().block()).doesNotThrowAnyException(); - - // Test builder with custom ObjectMapper - ObjectMapper customMapper = new ObjectMapper(); - WebRxSseClientTransport transport2 = WebRxSseClientTransport.builder(host) - .objectMapper(customMapper) - .build(); - assertThatCode(() -> transport2.closeGracefully().block()).doesNotThrowAnyException(); - - // Test builder with custom SSE endpoint - WebRxSseClientTransport transport3 = WebRxSseClientTransport.builder(host) - .sseEndpoint("/custom-sse") - .build(); - assertThatCode(() -> transport3.closeGracefully().block()).doesNotThrowAnyException(); - - // Test builder with all custom parameters - WebRxSseClientTransport transport4 = WebRxSseClientTransport.builder(host) - .objectMapper(customMapper) - .sseEndpoint("/custom-sse") - .build(); - assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException(); - } - - @Test - void testMessageProcessing() { - // Create a test message - JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", - Map.of("key", "value")); - - // Simulate receiving the message - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "method": "test-method", - "id": "test-id", - "params": {"key": "value"} - } - """); - - // Subscribe to messages and verify - StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); - - assertThat(transport.getInboundMessageCount()).isEqualTo(1); - } - - @Test - void testResponseMessageProcessing() { - // Simulate receiving a response message - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "id": "test-id", - "result": {"status": "success"} - } - """); - - // Create and send a request message - JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", - Map.of("key", "value")); - - // Verify message handling - StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); - - assertThat(transport.getInboundMessageCount()).isEqualTo(1); - } - - @Test - void testErrorMessageProcessing() { - // Simulate receiving an error message - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "id": "test-id", - "error": { - "code": -32600, - "message": "Invalid Request" - } - } - """); - - // Create and send a request message - JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", - Map.of("key", "value")); - - // Verify message handling - StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); - - assertThat(transport.getInboundMessageCount()).isEqualTo(1); - } - - @Test - void testNotificationMessageProcessing() { - // Simulate receiving a notification message (no id) - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "method": "update", - "params": {"status": "processing"} - } - """); - - // Verify the notification was processed - assertThat(transport.getInboundMessageCount()).isEqualTo(1); - } - - @Test - void testGracefulShutdown() { - // Test graceful shutdown - StepVerifier.create(transport.closeGracefully()).verifyComplete(); - - // Create a test message - JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", - Map.of("key", "value")); - - // Verify message is not processed after shutdown - StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); - - // Message count should remain 0 after shutdown - assertThat(transport.getInboundMessageCount()).isEqualTo(0); - } - - @Test - void testRetryBehavior() { - // Create a WebClient that simulates connection failures - WebRxSseClientTransport failingTransport = WebRxSseClientTransport.builder("http://non-existent-host").build(); - - // Verify that the transport attempts to reconnect - StepVerifier.create(Mono.delay(Duration.ofSeconds(2))).expectNextCount(1).verifyComplete(); - - // Clean up - failingTransport.closeGracefully().block(); - } - - @Test - void testMultipleMessageProcessing() { - // Simulate receiving multiple messages in sequence - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "method": "method1", - "id": "id1", - "params": {"key": "value1"} - } - """); - - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "method": "method2", - "id": "id2", - "params": {"key": "value2"} - } - """); - - // Create and send corresponding messages - JSONRPCRequest message1 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method1", "id1", - Map.of("key", "value1")); - - JSONRPCRequest message2 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method2", "id2", - Map.of("key", "value2")); - - // Verify both messages are processed - StepVerifier.create(transport.sendMessage(message1).then(transport.sendMessage(message2))).verifyComplete(); - - // Verify message count - assertThat(transport.getInboundMessageCount()).isEqualTo(2); - } - - @Test - void testMessageOrderPreservation() { - // Simulate receiving messages in a specific order - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "method": "first", - "id": "1", - "params": {"sequence": 1} - } - """); - - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "method": "second", - "id": "2", - "params": {"sequence": 2} - } - """); - - transport.simulateMessageEvent(""" - { - "jsonrpc": "2.0", - "method": "third", - "id": "3", - "params": {"sequence": 3} - } - """); - - // Verify message count and order - assertThat(transport.getInboundMessageCount()).isEqualTo(3); - } -} \ No newline at end of file diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpAsyncServerTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpAsyncServerTests.java new file mode 100644 index 00000000..6950e20a --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpAsyncServerTests.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.server.transport.WebRxSseServerTransportProvider; +import io.modelcontextprotocol.spec.McpServerTransportProvider; +import org.junit.jupiter.api.Timeout; +import org.noear.solon.Solon; +import org.noear.solon.boot.http.HttpServerConfigure; + +/** + * Tests for {@link McpSyncServer} using {@link WebRxSseServerTransportProvider}. + * + * @author Christian Tzolov + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebRxSseMcpAsyncServerTests extends AbstractMcpAsyncServerTests { + + private static final int PORT = 8181; + + private static final String MESSAGE_ENDPOINT = "/mcp/message"; + + private WebRxSseServerTransportProvider transportProvider; + + @Override + protected McpServerTransportProvider createMcpTransportProvider() { + transportProvider = new WebRxSseServerTransportProvider.Builder() + .objectMapper(new ObjectMapper()) + .messageEndpoint(MESSAGE_ENDPOINT) + .build(); + + Solon.start(WebRxSseMcpAsyncServerTests.class, new String[]{"-server.port=" + PORT}, app -> { + transportProvider.toHttpHandler(app); + app.onEvent(HttpServerConfigure.class, event -> { + event.enableDebug(true); + }); + }); + + return transportProvider; + } + + @Override + protected void onStart() { + } + + @Override + protected void onClose() { + if (Solon.app() != null) { + Solon.stopBlock(); + } + } +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpServerTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpSyncServerTests.java similarity index 76% rename from mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpServerTests.java rename to mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpSyncServerTests.java index 88be75fc..46c04f6f 100644 --- a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpServerTests.java +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/server/WebRxSseMcpSyncServerTests.java @@ -9,6 +9,7 @@ import io.modelcontextprotocol.spec.McpServerTransportProvider; import org.junit.jupiter.api.Timeout; import org.noear.solon.Solon; +import org.noear.solon.boot.http.HttpServerConfigure; /** * Tests for {@link McpSyncServer} using {@link WebRxSseServerTransportProvider}. @@ -16,7 +17,7 @@ * @author Christian Tzolov */ @Timeout(15) // Giving extra time beyond the client timeout -class WebRxSseMcpServerTests extends AbstractMcpSyncServerTests { +class WebRxSseMcpSyncServerTests extends AbstractMcpSyncServerTests { private static final int PORT = 8182; @@ -26,12 +27,16 @@ class WebRxSseMcpServerTests extends AbstractMcpSyncServerTests { @Override protected McpServerTransportProvider createMcpTransportProvider() { - transportProvider = new WebRxSseServerTransportProvider.Builder().objectMapper(new ObjectMapper()) + transportProvider = new WebRxSseServerTransportProvider.Builder() + .objectMapper(new ObjectMapper()) .messageEndpoint(MESSAGE_ENDPOINT) .build(); - Solon.start(WebRxSseMcpServerTests.class, new String[]{"-server.port=" + PORT}, app -> { + Solon.start(WebRxSseMcpSyncServerTests.class, new String[]{"-server.port=" + PORT}, app -> { transportProvider.toHttpHandler(app); + app.onEvent(HttpServerConfigure.class, event -> { + event.enableDebug(true); + }); }); return transportProvider; From 63f2653f68686991f64f0038fa50fd8a205ee5ef Mon Sep 17 00:00:00 2001 From: noear Date: Sun, 6 Apr 2025 10:57:24 +0800 Subject: [PATCH 3/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/mcp-solon-webrx/pom.xml | 10 ++-------- pom.xml | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/mcp-solon/mcp-solon-webrx/pom.xml b/mcp-solon/mcp-solon-webrx/pom.xml index 3cb16597..d3dd697c 100644 --- a/mcp-solon/mcp-solon-webrx/pom.xml +++ b/mcp-solon/mcp-solon-webrx/pom.xml @@ -51,6 +51,7 @@ org.noear solon-net-httputils ${solon.version} + test @@ -62,7 +63,7 @@ org.noear - solon-web + solon-lib ${solon.version} test @@ -115,13 +116,6 @@ test - - ch.qos.logback - logback-classic - ${logback.version} - test - - org.junit.jupiter junit-jupiter-params diff --git a/pom.xml b/pom.xml index 8ad67981..78f99b76 100644 --- a/pom.xml +++ b/pom.xml @@ -93,7 +93,7 @@ 4.2.0 7.1.0 4.1.0 - 3.1.2-SNAPSHOT + 3.1.2 From a4e1af5929bd0b7a9f73567c8c9a369811bc70c4 Mon Sep 17 00:00:00 2001 From: noear Date: Sun, 6 Apr 2025 11:06:14 +0800 Subject: [PATCH 4/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 mcp-solon/README.md diff --git a/mcp-solon/README.md b/mcp-solon/README.md new file mode 100644 index 00000000..0d6c40f4 --- /dev/null +++ b/mcp-solon/README.md @@ -0,0 +1,14 @@ +## solon is a java enterprise application development framework similar to spring. + +There is no java-ee. Compared to the spring framework: + +* more concurrency (700%), +* less memory (50%), +* faster startup (1000%), +* smaller packaging (10%), +* support for java8 ~ java24, native runtime. + + +github: + +* https://github.com/opensolon/solon \ No newline at end of file From b1b03efc3b3ed27028466b22531edc294654d6df Mon Sep 17 00:00:00 2001 From: noear Date: Sun, 6 Apr 2025 11:25:06 +0800 Subject: [PATCH 5/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/mcp-solon-webrx/README.md | 30 +++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 mcp-solon/mcp-solon-webrx/README.md diff --git a/mcp-solon/mcp-solon-webrx/README.md b/mcp-solon/mcp-solon-webrx/README.md new file mode 100644 index 00000000..3af5f057 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/README.md @@ -0,0 +1,30 @@ +# Solon WebRx SSE Server Transport + +```xml + + io.modelcontextprotocol.sdk + mcp-solon-webrx + +``` + + +```java +String MESSAGE_ENDPOINT = "/mcp/message"; + +@Configuration +static class MyConfig { + + @Bean + public WebRxSseServerTransportProvider webMvcSseServerTransport() { + return WebRxSseServerTransportProvider.builder() + .objectMapper(new ObjectMapper()) + .messageEndpoint(MESSAGE_ENDPOINT) + .build(); + } + + @Bean + public void routerFunction(WebRxSseServerTransportProvider transport, AppContext context) { + transport.toHttpHandler(context.app()); + } +} +``` From b6ec30c013b67e7f72f06115e20e4e7abf5be8a6 Mon Sep 17 00:00:00 2001 From: noear Date: Sun, 6 Apr 2025 12:13:36 +0800 Subject: [PATCH 6/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp-solon/README.md b/mcp-solon/README.md index 0d6c40f4..3c1df793 100644 --- a/mcp-solon/README.md +++ b/mcp-solon/README.md @@ -2,7 +2,7 @@ There is no java-ee. Compared to the spring framework: -* more concurrency (700%), +* more concurrency (300%), * less memory (50%), * faster startup (1000%), * smaller packaging (10%), From b79fd7421b82d344e83ccdc3c8e124df09def45a Mon Sep 17 00:00:00 2001 From: noear Date: Thu, 17 Apr 2025 17:16:32 +0800 Subject: [PATCH 7/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/mcp-solon-webrx/pom.xml | 3 +- .../transport/WebRxSseClientTransport.java | 319 ++++++++++++++++++ .../WebRxSseServerTransportProvider.java | 92 +++-- .../WebRxSseIntegrationTests.java | 5 +- .../client/WebRxSseMcpAsyncClientTests.java | 54 +++ .../client/WebRxSseMcpSyncClientTests.java | 54 +++ .../WebRxSseClientTransportTests.java | 278 +++++++++++++++ pom.xml | 2 +- 8 files changed, 767 insertions(+), 40 deletions(-) create mode 100644 mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java create mode 100644 mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java diff --git a/mcp-solon/mcp-solon-webrx/pom.xml b/mcp-solon/mcp-solon-webrx/pom.xml index d3dd697c..dd52ba1d 100644 --- a/mcp-solon/mcp-solon-webrx/pom.xml +++ b/mcp-solon/mcp-solon-webrx/pom.xml @@ -51,7 +51,6 @@ org.noear solon-net-httputils ${solon.version} - test @@ -70,7 +69,7 @@ org.noear - solon-boot-jetty + solon-boot-smarthttp ${solon.version} test diff --git a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java new file mode 100644 index 00000000..200d360f --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransport.java @@ -0,0 +1,319 @@ +/* + * Copyright 2024 - 2024 the original author or authors. + */ +package io.modelcontextprotocol.client.transport; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage; +import io.modelcontextprotocol.util.Assert; +import org.noear.solon.net.http.HttpResponse; +import org.noear.solon.net.http.HttpUtilsBuilder; +import org.noear.solon.net.http.textstream.ServerSentEvent; +import org.noear.solon.rx.SimpleSubscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +/** + * Server-Sent Events (SSE) implementation of the + * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE + * transport specification. + * + *

+ * This transport establishes a bidirectional communication channel where: + *

    + *
  • Inbound messages are received through an SSE connection from the server
  • + *
  • Outbound messages are sent via HTTP POST requests to a server-provided + * endpoint
  • + *
+ * + *

+ * The message flow follows these steps: + *

    + *
  1. The client establishes an SSE connection to the server's /sse endpoint
  2. + *
  3. The server sends an 'endpoint' event containing the URI for sending messages
  4. + *
+ * + * This implementation uses {@link HttpUtilsBuilder} for HTTP communications and supports JSON. and base JDK8 + * serialization/deserialization of messages. + * + * @author Christian Tzolov + * @author noear + * @see MCP + * HTTP with SSE Transport Specification + */ +public class WebRxSseClientTransport implements McpClientTransport { + + private static final Logger logger = LoggerFactory.getLogger(WebRxSseClientTransport.class); + + /** SSE event type for JSON-RPC messages */ + private static final String MESSAGE_EVENT_TYPE = "message"; + + /** SSE event type for endpoint discovery */ + private static final String ENDPOINT_EVENT_TYPE = "endpoint"; + + /** Default SSE endpoint path */ + private static final String DEFAULT_SSE_ENDPOINT = "/sse"; + + /** HttpUtils instance builder */ + private final HttpUtilsBuilder webBuilder; + + /** SSE endpoint path */ + private final String sseEndpoint; + + /** JSON object mapper for message serialization/deserialization */ + protected ObjectMapper objectMapper; + + /** Flag indicating if the transport is in closing state */ + private volatile boolean isClosing = false; + + /** Latch for coordinating endpoint discovery */ + private final CountDownLatch closeLatch = new CountDownLatch(1); + + /** Holds the discovered message endpoint URL */ + private final AtomicReference messageEndpoint = new AtomicReference<>(); + + /** Holds the SSE connection future */ + private final AtomicReference> connectionFuture = new AtomicReference<>(); + + /** + * Creates a new transport instance with default HTTP client and object mapper. + * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance + */ + public WebRxSseClientTransport(HttpUtilsBuilder webBuilder) { + this(webBuilder, new ObjectMapper()); + } + + /** + * Creates a new transport instance with custom HTTP client builder and object mapper. + * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance + * @param objectMapper the object mapper for JSON serialization/deserialization + * @throws IllegalArgumentException if objectMapper or clientBuilder is null + */ + public WebRxSseClientTransport(HttpUtilsBuilder webBuilder, ObjectMapper objectMapper) { + this(webBuilder, DEFAULT_SSE_ENDPOINT, objectMapper); + } + + /** + * Creates a new transport instance with custom HTTP client builder and object mapper. + * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance + * @param sseEndpoint the SSE endpoint path + * @param objectMapper the object mapper for JSON serialization/deserialization + * @throws IllegalArgumentException if objectMapper or clientBuilder is null + */ + public WebRxSseClientTransport(HttpUtilsBuilder webBuilder, String sseEndpoint, + ObjectMapper objectMapper) { + Assert.notNull(objectMapper, "ObjectMapper must not be null"); + Assert.notNull(webBuilder, "baseUri must not be empty"); + Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); + this.webBuilder = webBuilder; + this.sseEndpoint = sseEndpoint; + this.objectMapper = objectMapper; + } + + /** + * Creates a new builder for {@link WebRxSseClientTransport}. + * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance + * @return a new builder instance + */ + public static Builder builder(HttpUtilsBuilder webBuilder) { + return new Builder(webBuilder); + } + + /** + * Builder for {@link WebRxSseClientTransport}. + */ + public static class Builder { + + private final HttpUtilsBuilder webBuilder; + + private String sseEndpoint = DEFAULT_SSE_ENDPOINT; + + private ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates a new builder with the specified base URI. + * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance + */ + public Builder(HttpUtilsBuilder webBuilder) { + Assert.notNull(webBuilder, "webBuilder must not be empty"); + this.webBuilder = webBuilder; + } + + /** + * Sets the SSE endpoint path. + * @param sseEndpoint the SSE endpoint path + * @return this builder + */ + public Builder sseEndpoint(String sseEndpoint) { + Assert.hasText(sseEndpoint, "sseEndpoint must not be null"); + this.sseEndpoint = sseEndpoint; + return this; + } + + /** + * Sets the object mapper for JSON serialization/deserialization. + * @param objectMapper the object mapper + * @return this builder + */ + public Builder objectMapper(ObjectMapper objectMapper) { + Assert.notNull(objectMapper, "objectMapper must not be null"); + this.objectMapper = objectMapper; + return this; + } + + /** + * Builds a new {@link WebRxSseClientTransport} instance. + * @return a new transport instance + */ + public WebRxSseClientTransport build() { + return new WebRxSseClientTransport(webBuilder, sseEndpoint, objectMapper); + } + + } + + /** + * Establishes the SSE connection with the server and sets up message handling. + * + *

+ * This method: + *

    + *
  • Initiates the SSE connection
  • + *
  • Handles endpoint discovery events
  • + *
  • Processes incoming JSON-RPC messages
  • + *
+ * @param handler the function to process received JSON-RPC messages + * @return a Mono that completes when the connection is established + */ + @Override + public Mono connect(Function, Mono> handler) { + CompletableFuture future = new CompletableFuture<>(); + connectionFuture.set(future); + + webBuilder.build(this.sseEndpoint) + .execAsSseStream("GET") + .subscribe(new SimpleSubscriber() + .doOnNext(event -> { + if (isClosing) { + return; + } + + try { + if (ENDPOINT_EVENT_TYPE.equals(event.getEvent())) { + String endpoint = event.data(); + messageEndpoint.set(endpoint); + closeLatch.countDown(); + future.complete(null); + } else if (MESSAGE_EVENT_TYPE.equals(event.getEvent())) { + JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, event.data()); + handler.apply(Mono.just(message)).subscribe(); + } else { + logger.error("Received unrecognized SSE event type: {}", event.getEvent()); + } + } catch (IOException e) { + logger.error("Error processing SSE event", e); + future.completeExceptionally(e); + } + }).doOnError(error -> { + if (!isClosing) { + logger.warn("SSE connection error", error); + future.completeExceptionally(error); + } + })); + + return Mono.fromFuture(future); + } + + /** + * Sends a JSON-RPC message to the server. + * + *

+ * This method waits for the message endpoint to be discovered before sending the + * message. The message is serialized to JSON and sent as an HTTP POST request. + * @param message the JSON-RPC message to send + * @return a Mono that completes when the message is sent + * @throws McpError if the message endpoint is not available or the wait times out + */ + @Override + public Mono sendMessage(JSONRPCMessage message) { + if (isClosing) { + return Mono.empty(); + } + + try { + if (!closeLatch.await(10, TimeUnit.SECONDS)) { + return Mono.error(new McpError("Failed to wait for the message endpoint")); + } + } catch (InterruptedException e) { + return Mono.error(new McpError("Failed to wait for the message endpoint")); + } + + String endpoint = messageEndpoint.get(); + if (endpoint == null) { + return Mono.error(new McpError("No message endpoint available")); + } + + try { + String jsonText = this.objectMapper.writeValueAsString(message); + CompletableFuture future = webBuilder.build(endpoint) + .header("Content-Type", "application/json") + .bodyOfJson(jsonText) + .execAsync("POST"); + + return Mono.fromFuture(future.thenAccept(response -> { + if (response.code() != 200 && response.code() != 201 && response.code() != 202 + && response.code() != 206) { + logger.error("Error sending message: {}", response.code()); + } + })); + } catch (IOException e) { + if (!isClosing) { + return Mono.error(new RuntimeException("Failed to serialize message", e)); + } + return Mono.empty(); + } + } + + /** + * Gracefully closes the transport connection. + * + *

+ * Sets the closing flag and cancels any pending connection future. This prevents new + * messages from being sent and allows ongoing operations to complete. + * @return a Mono that completes when the closing process is initiated + */ + @Override + public Mono closeGracefully() { + return Mono.fromRunnable(() -> { + isClosing = true; + CompletableFuture future = connectionFuture.get(); + if (future != null && !future.isDone()) { + future.cancel(true); + } + }); + } + + /** + * Unmarshals data to the specified type using the configured object mapper. + * @param data the data to unmarshal + * @param typeRef the type reference for the target type + * @param the target type + * @return the unmarshalled object + */ + @Override + public T unmarshalFrom(Object data, TypeReference typeRef) { + return this.objectMapper.convertValue(data, typeRef); + } +} \ No newline at end of file diff --git a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java index 2746445f..6a2ab8d3 100644 --- a/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java +++ b/mcp-solon/mcp-solon-webrx/src/main/java/io/modelcontextprotocol/server/transport/WebRxSseServerTransportProvider.java @@ -52,7 +52,7 @@ *

* This implementation is thread-safe and can handle multiple concurrent client * connections. It uses {@link ConcurrentHashMap} for session management and Project - * Reactor's non-blocking APIs for message processing and delivery. + * Reactor's non-blocking APIs for message processing and delivery. and base JDK8 * * @author Christian Tzolov * @author Alexandros Pappas @@ -92,7 +92,7 @@ public class WebRxSseServerTransportProvider implements McpServerTransportProvid * Map of active client sessions, keyed by session ID. */ private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); - + private final ConcurrentHashMap sessionTransports = new ConcurrentHashMap<>(); /** * Flag indicating if the transport is shutting down. */ @@ -117,9 +117,25 @@ public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String message this.sseEndpoint = sseEndpoint; } + public void sendHeartbeat(){ + for (WebRxMcpSessionTransport transport : sessionTransports.values()) { + transport.sendHeartbeat(); + } + } + public void toHttpHandler(SolonApp app) { - app.get(this.sseEndpoint, this::handleSseConnection); - app.post(this.messageEndpoint, this::handleMessage); + if (app != null) { + app.get(this.sseEndpoint, this::handleSseConnection); + app.post(this.messageEndpoint, this::handleMessage); + } + } + + public String getSseEndpoint() { + return sseEndpoint; + } + + public String getMessageEndpoint() { + return messageEndpoint; } /** @@ -160,7 +176,7 @@ public void setSessionFactory(McpServerSession.Factory sessionFactory) { * errors if any session fails to receive the message */ @Override - public Mono notifyClients(String method, Map params) { + public Mono notifyClients(String method, Map params) { if (sessions.isEmpty()) { logger.debug("No active sessions to broadcast message to"); return Mono.empty(); @@ -169,11 +185,11 @@ public Mono notifyClients(String method, Map params) { logger.debug("Attempting to broadcast message to {} active sessions", sessions.size()); return Flux.fromStream(sessions.values().stream()) - .flatMap(session -> session.sendNotification(method, params) - .doOnError(e -> logger.error("Failed to " + "send message to session " + "{}: {}", session.getId(), - e.getMessage())) - .onErrorComplete()) - .then(); + .flatMap(session -> session.sendNotification(method, params) + .doOnError(e -> logger.error("Failed to " + "send message to session " + "{}: {}", session.getId(), + e.getMessage())) + .onErrorComplete()) + .then(); } // FIXME: This javadoc makes claims about using isClosing flag but it's not actually @@ -195,9 +211,9 @@ public Mono notifyClients(String method, Map params) { @Override public Mono closeGracefully() { return Flux.fromIterable(sessions.values()) - .doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size())) - .flatMap(McpServerSession::closeGracefully) - .then(); + .doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size())) + .flatMap(McpServerSession::closeGracefully) + .then(); } /** @@ -206,7 +222,7 @@ public Mono closeGracefully() { * @param ctx The incoming server context * @return A Mono which emits a response with the SSE event stream */ - private void handleSseConnection(Context ctx) throws Throwable{ + public void handleSseConnection(Context ctx) throws Throwable{ if (isClosing) { ctx.status(503); ctx.output("Server is shutting down"); @@ -221,6 +237,7 @@ private void handleSseConnection(Context ctx) throws Throwable{ logger.debug("Created new SSE connection for session: {}", sessionId); sessions.put(sessionId, session); + sessionTransports.put(sessionId, sessionTransport); // Send initial endpoint event logger.debug("Sending initial endpoint event to session: {}", sessionId); @@ -230,6 +247,7 @@ private void handleSseConnection(Context ctx) throws Throwable{ sink.onCancel(() -> { logger.debug("Session {} cancelled", sessionId); sessions.remove(sessionId); + sessionTransports.remove(sessionId); }); }); @@ -249,25 +267,25 @@ private void handleSseConnection(Context ctx) throws Throwable{ *

  • Returns appropriate HTTP responses based on processing results
  • *
  • Handles various error conditions with appropriate error responses
  • * - * @param request The incoming server request containing the JSON-RPC message + * @param ctx The incoming server request context containing the JSON-RPC message * @return A Mono emitting the response indicating the message processing result */ - private void handleMessage(Context request) throws Throwable { + public void handleMessage(Context ctx) throws Throwable { if (isClosing) { - request.status(503); - request.output("Server is shutting down"); + ctx.status(503); + ctx.output("Server is shutting down"); return; } - if (Utils.isEmpty(request.param("sessionId"))) { - request.status(404); - request.render(new McpError("Session ID missing in message endpoint")); + if (Utils.isEmpty(ctx.param("sessionId"))) { + ctx.status(404); + ctx.render(new McpError("Session ID missing in message endpoint")); return; } - McpServerSession session = sessions.get(request.param("sessionId")); + McpServerSession session = sessions.get(ctx.param("sessionId")); - String body = request.body(); + String body = ctx.body(); try { McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); @@ -276,18 +294,18 @@ private void handleMessage(Context request) throws Throwable { return Mono.just(new Entity()); }) .onErrorResume(error -> { - logger.error("Error processing message: {}", error.getMessage()); - // TODO: instead of signalling the error, just respond with 200 OK - // - the error is signalled on the SSE connection - // return ServerResponse.ok().build(); - return Mono.just(new Entity().status(500).body(new McpError(error.getMessage()))); - }); - - request.returnValue(mono); + logger.error("Error processing message: {}", error.getMessage()); + // TODO: instead of signalling the error, just respond with 200 OK + // - the error is signalled on the SSE connection + // return ServerResponse.ok().build(); + return Mono.just(new Entity().status(500).body(new McpError(error.getMessage()))); + }); + + ctx.returnValue(mono); } catch (IllegalArgumentException | IOException e) { logger.error("Failed to deserialize message: {}", e.getMessage()); - request.status(400); - request.render(new McpError("Invalid message format")); + ctx.status(400); + ctx.render(new McpError("Invalid message format")); } } @@ -299,6 +317,10 @@ public WebRxMcpSessionTransport(FluxSink sink) { this.sink = sink; } + public void sendHeartbeat() { + sink.next(new SseEvent().comment("heartbeat")); + } + @Override public Mono sendMessage(McpSchema.JSONRPCMessage message) { return Mono.fromSupplier(() -> { @@ -310,8 +332,8 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { } }).doOnNext(jsonText -> { SseEvent event = new SseEvent() - .name(MESSAGE_EVENT_TYPE) - .data(jsonText); + .name(MESSAGE_EVENT_TYPE) + .data(jsonText); sink.next(event); }).doOnError(e -> { // TODO log with sessionid diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java index e9f7dec4..91178bee 100644 --- a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/WebRxSseIntegrationTests.java @@ -12,7 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.client.McpClient; -import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; +import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; import io.modelcontextprotocol.server.McpServer; import io.modelcontextprotocol.server.McpServerFeatures; import io.modelcontextprotocol.server.transport.WebRxSseServerTransportProvider; @@ -35,6 +35,7 @@ import org.noear.solon.Solon; import org.noear.solon.boot.http.HttpServerConfigure; import org.noear.solon.net.http.HttpUtils; +import org.noear.solon.net.http.HttpUtilsBuilder; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -74,7 +75,7 @@ public void before() { }); clientBulders.put("httpclient", - McpClient.sync(HttpClientSseClientTransport.builder("http://localhost:" + PORT) + McpClient.sync(WebRxSseClientTransport.builder(new HttpUtilsBuilder().baseUri("http://localhost:" + PORT)) .sseEndpoint(CUSTOM_SSE_ENDPOINT) .build())); diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java new file mode 100644 index 00000000..7cb16567 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpAsyncClientTests.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; +import io.modelcontextprotocol.spec.McpClientTransport; +import org.junit.jupiter.api.Timeout; +import org.noear.solon.net.http.HttpUtilsBuilder; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.time.Duration; + +/** + * Tests for the {@link McpAsyncClient} with {@link WebRxSseClientTransport}. + * + * @author Christian Tzolov + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebRxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests { + + static String host = "http://localhost:3001"; + + // Uses the https://github.com/tzolov/mcp-everything-server-docker-image + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + @Override + protected McpClientTransport createMcpTransport() { + return WebRxSseClientTransport.builder(new HttpUtilsBuilder().baseUri(host)).build(); + } + + @Override + protected void onStart() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @Override + public void onClose() { + container.stop(); + } + + protected Duration getInitializationTimeout() { + return Duration.ofSeconds(1); + } + +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java new file mode 100644 index 00000000..feb2a6f2 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/WebRxSseMcpSyncClientTests.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import io.modelcontextprotocol.client.transport.WebRxSseClientTransport; +import io.modelcontextprotocol.spec.McpClientTransport; +import org.junit.jupiter.api.Timeout; +import org.noear.solon.net.http.HttpUtilsBuilder; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.time.Duration; + +/** + * Tests for the {@link McpSyncClient} with {@link WebRxSseClientTransport}. + * + * @author Christian Tzolov + */ +@Timeout(15) // Giving extra time beyond the client timeout +class WebRxSseMcpSyncClientTests extends AbstractMcpSyncClientTests { + + static String host = "http://localhost:3001"; + + // Uses the https://github.com/tzolov/mcp-everything-server-docker-image + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + @Override + protected McpClientTransport createMcpTransport() { + return WebRxSseClientTransport.builder(new HttpUtilsBuilder().baseUri(host)).build(); + } + + @Override + protected void onStart() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @Override + protected void onClose() { + container.stop(); + } + + protected Duration getInitializationTimeout() { + return Duration.ofSeconds(1); + } + +} diff --git a/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java new file mode 100644 index 00000000..83ec80f0 --- /dev/null +++ b/mcp-solon/mcp-solon-webrx/src/test/java/io/modelcontextprotocol/client/transport/WebRxSseClientTransportTests.java @@ -0,0 +1,278 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.noear.solon.net.http.HttpUtilsBuilder; +import org.noear.solon.net.http.textstream.ServerSentEvent; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** + * Tests for the {@link HttpClientSseClientTransport} class. + * + * @author Christian Tzolov + */ +@Timeout(15) +class WebRxSseClientTransportTests { + + static String host = "http://localhost:3001"; + + @SuppressWarnings("resource") + GenericContainer container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1") + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withExposedPorts(3001) + .waitingFor(Wait.forHttp("/").forStatusCode(404)); + + private TestWebRxSseClientTransport transport; + + // Test class to access protected methods + static class TestWebRxSseClientTransport extends WebRxSseClientTransport { + + private final AtomicInteger inboundMessageCount = new AtomicInteger(0); + + private Sinks.Many events = Sinks.many().unicast().onBackpressureBuffer(); + + public TestWebRxSseClientTransport(String baseUri) { + super(new HttpUtilsBuilder().baseUri(baseUri)); + } + + public int getInboundMessageCount() { + return inboundMessageCount.get(); + } + + public void simulateEndpointEvent(String jsonMessage) { + events.tryEmitNext(new ServerSentEvent(null,"endpoint",jsonMessage,null)); + inboundMessageCount.incrementAndGet(); + } + + public void simulateMessageEvent(String jsonMessage) { + events.tryEmitNext(new ServerSentEvent(null,"message",jsonMessage,null)); + inboundMessageCount.incrementAndGet(); + } + + } + + void startContainer() { + container.start(); + int port = container.getMappedPort(3001); + host = "http://" + container.getHost() + ":" + port; + } + + @BeforeEach + void setUp() { + startContainer(); + transport = new TestWebRxSseClientTransport(host); + transport.connect(Function.identity()).block(); + } + + @AfterEach + void afterEach() { + if (transport != null) { + assertThatCode(() -> transport.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException(); + } + cleanup(); + } + + void cleanup() { + container.stop(); + } + + @Test + void testMessageProcessing() { + // Create a test message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Simulate receiving the message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "test-method", + "id": "test-id", + "params": {"key": "value"} + } + """); + + // Subscribe to messages and verify + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testResponseMessageProcessing() { + // Simulate receiving a response message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "id": "test-id", + "result": {"status": "success"} + } + """); + + // Create and send a request message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message handling + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testErrorMessageProcessing() { + // Simulate receiving an error message + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "id": "test-id", + "error": { + "code": -32600, + "message": "Invalid Request" + } + } + """); + + // Create and send a request message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message handling + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testNotificationMessageProcessing() { + // Simulate receiving a notification message (no id) + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "update", + "params": {"status": "processing"} + } + """); + + // Verify the notification was processed + assertThat(transport.getInboundMessageCount()).isEqualTo(1); + } + + @Test + void testGracefulShutdown() { + // Test graceful shutdown + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + // Create a test message + JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id", + Map.of("key", "value")); + + // Verify message is not processed after shutdown + StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete(); + + // Message count should remain 0 after shutdown + assertThat(transport.getInboundMessageCount()).isEqualTo(0); + } + + @Test + void testRetryBehavior() { + // Create a client that simulates connection failures + HttpClientSseClientTransport failingTransport = new HttpClientSseClientTransport("http://non-existent-host"); + + // Verify that the transport attempts to reconnect + StepVerifier.create(Mono.delay(Duration.ofSeconds(2))).expectNextCount(1).verifyComplete(); + + // Clean up + failingTransport.closeGracefully().block(); + } + + @Test + void testMultipleMessageProcessing() { + // Simulate receiving multiple messages in sequence + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "method1", + "id": "id1", + "params": {"key": "value1"} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "method2", + "id": "id2", + "params": {"key": "value2"} + } + """); + + // Create and send corresponding messages + JSONRPCRequest message1 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method1", "id1", + Map.of("key", "value1")); + + JSONRPCRequest message2 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method2", "id2", + Map.of("key", "value2")); + + // Verify both messages are processed + StepVerifier.create(transport.sendMessage(message1).then(transport.sendMessage(message2))).verifyComplete(); + + // Verify message count + assertThat(transport.getInboundMessageCount()).isEqualTo(2); + } + + @Test + void testMessageOrderPreservation() { + // Simulate receiving messages in a specific order + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "first", + "id": "1", + "params": {"sequence": 1} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "second", + "id": "2", + "params": {"sequence": 2} + } + """); + + transport.simulateMessageEvent(""" + { + "jsonrpc": "2.0", + "method": "third", + "id": "3", + "params": {"sequence": 3} + } + """); + + // Verify message count and order + assertThat(transport.getInboundMessageCount()).isEqualTo(3); + } + +} diff --git a/pom.xml b/pom.xml index 78f99b76..6f5da1ec 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,7 @@ 1.5.15 2.17.0 6.2.1 + 3.2.0 3.11.0 @@ -93,7 +94,6 @@ 4.2.0 7.1.0 4.1.0 - 3.1.2 From 8c66ba9f7d0096614c1528e0cffea5a8a66eca6c Mon Sep 17 00:00:00 2001 From: noear Date: Sat, 26 Apr 2025 16:49:56 +0800 Subject: [PATCH 8/8] feat(solon-webrx): Add solon-webrx adaptive --- mcp-solon/mcp-solon-webrx/pom.xml | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mcp-solon/mcp-solon-webrx/pom.xml b/mcp-solon/mcp-solon-webrx/pom.xml index dd52ba1d..4850b0cd 100644 --- a/mcp-solon/mcp-solon-webrx/pom.xml +++ b/mcp-solon/mcp-solon-webrx/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 0.9.0-SNAPSHOT + 0.10.0-SNAPSHOT ../../pom.xml mcp-solon-webrx diff --git a/pom.xml b/pom.xml index ba4875a3..08c3bf78 100644 --- a/pom.xml +++ b/pom.xml @@ -68,7 +68,7 @@ 1.5.15 2.17.0 6.2.1 - 3.2.0 + 3.2.1 3.11.0