1
+ /*
2
+ * Copyright 2024-2024 the original author or authors.
3
+ */
4
+
5
+ package io .modelcontextprotocol .server .transport ;
6
+
7
+ import java .time .Duration ;
8
+ import java .util .Map ;
9
+ import java .util .List ;
10
+ import java .util .concurrent .CountDownLatch ;
11
+ import java .util .concurrent .TimeUnit ;
12
+ import java .util .concurrent .atomic .AtomicReference ;
13
+
14
+ import com .fasterxml .jackson .databind .ObjectMapper ;
15
+ import io .modelcontextprotocol .client .McpClient ;
16
+ import io .modelcontextprotocol .client .transport .WebClientStreamableHttpTransport ;
17
+ import io .modelcontextprotocol .server .McpAsyncStreamableHttpServer ;
18
+ import io .modelcontextprotocol .server .McpServerFeatures ;
19
+ import io .modelcontextprotocol .server .transport .StreamableHttpServerTransportProvider ;
20
+ import io .modelcontextprotocol .spec .McpSchema ;
21
+ import io .modelcontextprotocol .spec .McpSchema .CallToolResult ;
22
+ import io .modelcontextprotocol .spec .McpSchema .InitializeResult ;
23
+
24
+ import org .apache .catalina .LifecycleException ;
25
+ import org .apache .catalina .LifecycleState ;
26
+ import org .apache .catalina .startup .Tomcat ;
27
+ import org .junit .jupiter .api .AfterEach ;
28
+ import org .junit .jupiter .api .BeforeEach ;
29
+ import org .junit .jupiter .api .Disabled ;
30
+ import org .junit .jupiter .api .Test ;
31
+ import org .springframework .web .reactive .function .client .WebClient ;
32
+ import reactor .core .publisher .Flux ;
33
+ import reactor .core .publisher .Mono ;
34
+
35
+ import static org .assertj .core .api .Assertions .assertThat ;
36
+
37
+ /**
38
+ * Integration tests for @link{StreamableHttpServerTransportProvider} with
39
+ *
40
+ * @link{WebClientStreamableHttpTransport}.
41
+ */
42
+ class StreamableHttpTransportIntegrationTest {
43
+
44
+ private static final int PORT = TomcatTestUtil .findAvailablePort ();
45
+
46
+ private static final String ENDPOINT = "/mcp" ;
47
+
48
+ private StreamableHttpServerTransportProvider serverTransportProvider ;
49
+
50
+ private McpClient .AsyncSpec clientBuilder ;
51
+
52
+ private Tomcat tomcat ;
53
+
54
+ @ BeforeEach
55
+ void setUp () {
56
+ serverTransportProvider = new StreamableHttpServerTransportProvider (new ObjectMapper (), ENDPOINT , null );
57
+
58
+ // Set up session factory with proper server capabilities
59
+ McpSchema .ServerCapabilities serverCapabilities = new McpSchema .ServerCapabilities (null , null , null , null , null ,
60
+ null );
61
+ serverTransportProvider .setStreamableHttpSessionFactory (
62
+ sessionId -> new io .modelcontextprotocol .spec .McpStreamableHttpServerSession (sessionId ,
63
+ java .time .Duration .ofSeconds (30 ),
64
+ request -> reactor .core .publisher .Mono .just (new McpSchema .InitializeResult ("2025-06-18" ,
65
+ serverCapabilities , new McpSchema .Implementation ("Test Server" , "1.0.0" ), null )),
66
+ () -> reactor .core .publisher .Mono .empty (), java .util .Map .of (), java .util .Map .of ()));
67
+
68
+ tomcat = TomcatTestUtil .createTomcatServer ("" , PORT , serverTransportProvider );
69
+ try {
70
+ tomcat .start ();
71
+ assertThat (tomcat .getServer ().getState ()).isEqualTo (LifecycleState .STARTED );
72
+ }
73
+ catch (Exception e ) {
74
+ throw new RuntimeException ("Failed to start Tomcat" , e );
75
+ }
76
+
77
+ WebClientStreamableHttpTransport clientTransport = WebClientStreamableHttpTransport
78
+ .builder (WebClient .builder ().baseUrl ("http://localhost:" + PORT ))
79
+ .endpoint (ENDPOINT )
80
+ .objectMapper (new ObjectMapper ())
81
+ .build ();
82
+
83
+ clientBuilder = McpClient .async (clientTransport )
84
+ .clientInfo (new McpSchema .Implementation ("Test Client" , "1.0.0" ));
85
+ }
86
+
87
+ @ AfterEach
88
+ void tearDown () {
89
+ if (serverTransportProvider != null ) {
90
+ serverTransportProvider .closeGracefully ().block ();
91
+ }
92
+ if (tomcat != null ) {
93
+ try {
94
+ tomcat .stop ();
95
+ tomcat .destroy ();
96
+ }
97
+ catch (LifecycleException e ) {
98
+ throw new RuntimeException ("Failed to stop Tomcat" , e );
99
+ }
100
+ }
101
+ }
102
+
103
+ @ Test
104
+ void shouldInitializeSuccessfully () {
105
+ // The server is already configured via the session factory in setUp
106
+ var mcpClient = clientBuilder .build ();
107
+ try {
108
+ InitializeResult result = mcpClient .initialize ().block ();
109
+ assertThat (result ).isNotNull ();
110
+ assertThat (result .serverInfo ().name ()).isEqualTo ("Test Server" );
111
+ }
112
+ finally {
113
+ mcpClient .close ();
114
+ }
115
+ }
116
+
117
+ @ Test
118
+ void shouldCallImmediateToolSuccessfully () {
119
+ var callResponse = new CallToolResult (List .of (new McpSchema .TextContent ("Tool executed successfully" )), null );
120
+ String emptyJsonSchema = """
121
+ {
122
+ "$schema": "http://json-schema.org/draft-07/schema#",
123
+ "type": "object",
124
+ "properties": {}
125
+ }
126
+ """ ;
127
+ McpServerFeatures .AsyncToolSpecification tool = new McpServerFeatures .AsyncToolSpecification (
128
+ new McpSchema .Tool ("test-tool" , "Test tool description" , emptyJsonSchema ),
129
+ (exchange , request ) -> Mono .just (callResponse ));
130
+
131
+ // Configure session factory with tool handler
132
+ McpSchema .ServerCapabilities serverCapabilities = new McpSchema .ServerCapabilities (null , null , null , null , null ,
133
+ new McpSchema .ServerCapabilities .ToolCapabilities (true ));
134
+ serverTransportProvider
135
+ .setStreamableHttpSessionFactory (sessionId -> new io .modelcontextprotocol .spec .McpStreamableHttpServerSession (
136
+ sessionId , java .time .Duration .ofSeconds (30 ),
137
+ request -> reactor .core .publisher .Mono .just (new McpSchema .InitializeResult ("2025-06-18" ,
138
+ serverCapabilities , new McpSchema .Implementation ("Test Server" , "1.0.0" ), null )),
139
+ () -> reactor .core .publisher .Mono .empty (),
140
+ java .util .Map .of ("tools/call" ,
141
+ (io .modelcontextprotocol .spec .McpStreamableHttpServerSession .RequestHandler <CallToolResult >) (
142
+ exchange , params ) -> tool .call ().apply (exchange , (Map <String , Object >) params )),
143
+ java .util .Map .of ()));
144
+
145
+ var mcpClient = clientBuilder .build ();
146
+ try {
147
+ mcpClient .initialize ().block ();
148
+ CallToolResult result = mcpClient .callTool (new McpSchema .CallToolRequest ("test-tool" , Map .of ())).block ();
149
+ assertThat (result ).isNotNull ();
150
+ assertThat (result .content ()).hasSize (1 );
151
+ assertThat (((McpSchema .TextContent ) result .content ().get (0 )).text ())
152
+ .isEqualTo ("Tool executed successfully" );
153
+ }
154
+ finally {
155
+ mcpClient .close ();
156
+ }
157
+ }
158
+
159
+ @ Test
160
+ void shouldCallStreamingToolSuccessfully () {
161
+ String emptyJsonSchema = """
162
+ {
163
+ "$schema": "http://json-schema.org/draft-07/schema#",
164
+ "type": "object",
165
+ "properties": {}
166
+ }
167
+ """ ;
168
+ McpServerFeatures .AsyncStreamingToolSpecification streamingTool = new McpServerFeatures .AsyncStreamingToolSpecification (
169
+ new McpSchema .Tool ("streaming-tool" , "Streaming test tool" , emptyJsonSchema ),
170
+ (exchange , request ) -> Flux .range (1 , 3 )
171
+ .map (i -> new CallToolResult (List .of (new McpSchema .TextContent ("Step " + i )), null )));
172
+
173
+ // Configure session factory with streaming tool handler
174
+ McpSchema .ServerCapabilities serverCapabilities = new McpSchema .ServerCapabilities (null , null , null , null , null ,
175
+ new McpSchema .ServerCapabilities .ToolCapabilities (true ));
176
+ serverTransportProvider
177
+ .setStreamableHttpSessionFactory (sessionId -> new io .modelcontextprotocol .spec .McpStreamableHttpServerSession (
178
+ sessionId , java .time .Duration .ofSeconds (30 ),
179
+ request -> reactor .core .publisher .Mono .just (new McpSchema .InitializeResult ("2025-06-18" ,
180
+ serverCapabilities , new McpSchema .Implementation ("Test Server" , "1.0.0" ), null )),
181
+ () -> reactor .core .publisher .Mono .empty (), java .util .Map .of ("tools/call" ,
182
+ (io .modelcontextprotocol .spec .McpStreamableHttpServerSession .StreamingRequestHandler <CallToolResult >) new io .modelcontextprotocol .spec .McpStreamableHttpServerSession .StreamingRequestHandler <CallToolResult >() {
183
+ @ Override
184
+ public Mono <CallToolResult > handle (
185
+ io .modelcontextprotocol .server .McpAsyncServerExchange exchange , Object params ) {
186
+ return streamingTool .call ().apply (exchange , (Map <String , Object >) params ).next ();
187
+ }
188
+
189
+ @ Override
190
+ public Flux <CallToolResult > handleStreaming (
191
+ io .modelcontextprotocol .server .McpAsyncServerExchange exchange , Object params ) {
192
+ return streamingTool .call ().apply (exchange , (Map <String , Object >) params );
193
+ }
194
+ }),
195
+ java .util .Map .of ()));
196
+
197
+ var mcpClient = clientBuilder .build ();
198
+ try {
199
+ mcpClient .initialize ().block ();
200
+ CallToolResult result = mcpClient .callTool (new McpSchema .CallToolRequest ("streaming-tool" , Map .of ()))
201
+ .block ();
202
+ assertThat (result ).isNotNull ();
203
+ assertThat (result .content ()).hasSize (1 );
204
+ assertThat (((McpSchema .TextContent ) result .content ().get (0 )).text ()).startsWith ("Step" );
205
+ }
206
+ finally {
207
+ mcpClient .close ();
208
+ }
209
+ }
210
+
211
+ @ Test
212
+ void shouldReceiveNotificationThroughGetStream () throws InterruptedException {
213
+ CountDownLatch notificationLatch = new CountDownLatch (1 );
214
+ AtomicReference <String > receivedEvent = new AtomicReference <>();
215
+ AtomicReference <String > sessionId = new AtomicReference <>();
216
+
217
+ WebClient webClient = WebClient .create ("http://localhost:" + PORT );
218
+ String initMessage = "{\" jsonrpc\" :\" 2.0\" ,\" id\" :1,\" method\" :\" initialize\" ,\" params\" :{\" protocolVersion\" :\" 2025-06-18\" ,\" capabilities\" :{},\" clientInfo\" :{\" name\" :\" Test\" ,\" version\" :\" 1.0\" }}}" ;
219
+
220
+ // Initialize and get session ID
221
+ webClient .post ()
222
+ .uri (ENDPOINT )
223
+ .header ("Accept" , "application/json, text/event-stream" )
224
+ .header ("Content-Type" , "application/json" )
225
+ .bodyValue (initMessage )
226
+ .retrieve ()
227
+ .toBodilessEntity ()
228
+ .doOnNext (response -> sessionId .set (response .getHeaders ().getFirst ("Mcp-Session-Id" )))
229
+ .block ();
230
+
231
+ // Establish SSE stream
232
+ webClient .get ()
233
+ .uri (ENDPOINT )
234
+ .header ("Accept" , "text/event-stream" )
235
+ .header ("Mcp-Session-Id" , sessionId .get ())
236
+ .retrieve ()
237
+ .bodyToFlux (String .class )
238
+ .filter (line -> line .contains ("test/notification" ))
239
+ .doOnNext (event -> {
240
+ receivedEvent .set (event );
241
+ notificationLatch .countDown ();
242
+ })
243
+ .subscribe ();
244
+
245
+ // Send notification after delay
246
+ Mono .delay (Duration .ofMillis (200 ))
247
+ .then (serverTransportProvider .notifyClients ("test/notification" , "test message" ))
248
+ .subscribe ();
249
+
250
+ assertThat (notificationLatch .await (5 , TimeUnit .SECONDS )).isTrue ();
251
+ assertThat (receivedEvent .get ()).isNotNull ();
252
+ assertThat (receivedEvent .get ()).contains ("test/notification" );
253
+ }
254
+
255
+ }
0 commit comments