Skip to content

Commit ba7f9fe

Browse files
authored
feat: add JSON-RPC method to ServerCallContext.state (#353)
* Added a `method` field to `ServerCallContext.state`, similar to `headers`. Issue: #352 Fixes #352 🦕 Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 5f4a277 commit ba7f9fe

File tree

8 files changed

+864
-37
lines changed

8 files changed

+864
-37
lines changed

reference/jsonrpc/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,15 @@
7777
<artifactId>rest-assured</artifactId>
7878
<scope>test</scope>
7979
</dependency>
80+
<dependency>
81+
<groupId>org.mockito</groupId>
82+
<artifactId>mockito-core</artifactId>
83+
<scope>test</scope>
84+
</dependency>
85+
<dependency>
86+
<groupId>org.mockito</groupId>
87+
<artifactId>mockito-junit-jupiter</artifactId>
88+
<scope>test</scope>
89+
</dependency>
8090
</dependencies>
8191
</project>

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package io.a2a.server.apps.quarkus;
22

3+
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.HEADERS_KEY;
4+
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY;
35
import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
46
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
7+
import static jakarta.ws.rs.core.MediaType.SERVER_SENT_EVENTS;
58

69
import java.util.HashMap;
710
import java.util.List;
@@ -91,14 +94,20 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
9194
JSONRPCResponse<?> nonStreamingResponse = null;
9295
Multi<? extends JSONRPCResponse<?>> streamingResponse = null;
9396
JSONRPCErrorResponse error = null;
94-
9597
try {
96-
if (isStreamingRequest(body)) {
97-
streaming = true;
98-
StreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.readValue(body, StreamingJSONRPCRequest.class);
98+
JsonNode node = Utils.OBJECT_MAPPER.readTree(body);
99+
JsonNode method = node != null ? node.get("method") : null;
100+
streaming = method != null && (SendStreamingMessageRequest.METHOD.equals(method.asText())
101+
|| TaskResubscriptionRequest.METHOD.equals(method.asText()));
102+
String methodName = (method != null && method.isTextual()) ? method.asText() : null;
103+
if (methodName != null) {
104+
context.getState().put(METHOD_NAME_KEY, methodName);
105+
}
106+
if (streaming) {
107+
StreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.treeToValue(node, StreamingJSONRPCRequest.class);
99108
streamingResponse = processStreamingRequest(request, context);
100109
} else {
101-
NonStreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.readValue(body, NonStreamingJSONRPCRequest.class);
110+
NonStreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.treeToValue(node, NonStreamingJSONRPCRequest.class);
102111
nonStreamingResponse = processNonStreamingRequest(request, context);
103112
}
104113
} catch (JsonProcessingException e) {
@@ -201,17 +210,6 @@ private JSONRPCResponse<?> generateErrorResponse(JSONRPCRequest<?> request, JSON
201210
return new JSONRPCErrorResponse(request.getId(), error);
202211
}
203212

204-
private static boolean isStreamingRequest(String requestBody) {
205-
try {
206-
JsonNode node = Utils.OBJECT_MAPPER.readTree(requestBody);
207-
JsonNode method = node != null ? node.get("method") : null;
208-
return method != null && (SendStreamingMessageRequest.METHOD.equals(method.asText())
209-
|| TaskResubscriptionRequest.METHOD.equals(method.asText()));
210-
} catch (Exception e) {
211-
return false;
212-
}
213-
}
214-
215213
static void setStreamingMultiSseSupportSubscribedRunnable(Runnable runnable) {
216214
streamingMultiSseSupportSubscribedRunnable = runnable;
217215
}
@@ -243,7 +241,7 @@ public String getUsername() {
243241
Map<String, String> headers = new HashMap<>();
244242
Set<String> headerNames = rc.request().headers().names();
245243
headerNames.forEach(name -> headers.put(name, rc.request().getHeader(name)));
246-
state.put("headers", headers);
244+
state.put(HEADERS_KEY, headers);
247245

248246
// Extract requested extensions from X-A2A-Extensions header
249247
List<String> extensionHeaderValues = rc.request().headers().getAll(A2AHeaders.X_A2A_EXTENSIONS);
@@ -266,8 +264,8 @@ private MultiSseSupport() {
266264
private static void initialize(HttpServerResponse response) {
267265
if (response.bytesWritten() == 0) {
268266
MultiMap headers = response.headers();
269-
if (headers.get("content-type") == null) {
270-
headers.set("content-type", "text/event-stream");
267+
if (headers.get(CONTENT_TYPE) == null) {
268+
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
271269
}
272270
response.setChunked(true);
273271
}
@@ -340,8 +338,8 @@ public Buffer apply(Object o) {
340338
private static void endOfStream(HttpServerResponse response) {
341339
if (response.bytesWritten() == 0) { // No item
342340
MultiMap headers = response.headers();
343-
if (headers.get("content-type") == null) {
344-
headers.set("content-type", "text/event-stream");
341+
if (headers.get(CONTENT_TYPE) == null) {
342+
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
345343
}
346344
}
347345
response.end();

0 commit comments

Comments
 (0)