Skip to content

Commit 3352db1

Browse files
committed
feat: add JSON-RPC method to ServerCallContext.state
* Added a `method` field to `ServerCallContext.state`, similar to `headers`. Issue: #352 Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent 8e83576 commit 3352db1

File tree

4 files changed

+87
-26
lines changed

4 files changed

+87
-26
lines changed

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package io.a2a.server.apps.quarkus;
22

3+
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.HEADERS_KEY;
4+
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY;
35
import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
46
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
7+
import static jakarta.ws.rs.core.MediaType.SERVER_SENT_EVENTS;
58

69
import java.util.HashMap;
710
import java.util.List;
@@ -91,14 +94,20 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
9194
JSONRPCResponse<?> nonStreamingResponse = null;
9295
Multi<? extends JSONRPCResponse<?>> streamingResponse = null;
9396
JSONRPCErrorResponse error = null;
94-
9597
try {
96-
if (isStreamingRequest(body)) {
97-
streaming = true;
98-
StreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.readValue(body, StreamingJSONRPCRequest.class);
98+
JsonNode node = Utils.OBJECT_MAPPER.readTree(body);
99+
JsonNode method = node != null ? node.get("method") : null;
100+
streaming = method != null && (SendStreamingMessageRequest.METHOD.equals(method.asText())
101+
|| TaskResubscriptionRequest.METHOD.equals(method.asText()));
102+
String methodName = (method != null && method.isTextual()) ? method.asText() : null;
103+
if (methodName != null) {
104+
context.getState().put(METHOD_NAME_KEY, methodName);
105+
}
106+
if (streaming) {
107+
StreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.treeToValue(node, StreamingJSONRPCRequest.class);
99108
streamingResponse = processStreamingRequest(request, context);
100109
} else {
101-
NonStreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.readValue(body, NonStreamingJSONRPCRequest.class);
110+
NonStreamingJSONRPCRequest<?> request = Utils.OBJECT_MAPPER.treeToValue(node, NonStreamingJSONRPCRequest.class);
102111
nonStreamingResponse = processNonStreamingRequest(request, context);
103112
}
104113
} catch (JsonProcessingException e) {
@@ -204,6 +213,7 @@ private JSONRPCResponse<?> generateErrorResponse(JSONRPCRequest<?> request, JSON
204213
private static boolean isStreamingRequest(String requestBody) {
205214
try {
206215
JsonNode node = Utils.OBJECT_MAPPER.readTree(requestBody);
216+
207217
JsonNode method = node != null ? node.get("method") : null;
208218
return method != null && (SendStreamingMessageRequest.METHOD.equals(method.asText())
209219
|| TaskResubscriptionRequest.METHOD.equals(method.asText()));
@@ -243,7 +253,7 @@ public String getUsername() {
243253
Map<String, String> headers = new HashMap<>();
244254
Set<String> headerNames = rc.request().headers().names();
245255
headerNames.forEach(name -> headers.put(name, rc.request().getHeader(name)));
246-
state.put("headers", headers);
256+
state.put(HEADERS_KEY, headers);
247257

248258
// Extract requested extensions from X-A2A-Extensions header
249259
List<String> extensionHeaderValues = rc.request().headers().getAll(A2AHeaders.X_A2A_EXTENSIONS);
@@ -266,8 +276,8 @@ private MultiSseSupport() {
266276
private static void initialize(HttpServerResponse response) {
267277
if (response.bytesWritten() == 0) {
268278
MultiMap headers = response.headers();
269-
if (headers.get("content-type") == null) {
270-
headers.set("content-type", "text/event-stream");
279+
if (headers.get(CONTENT_TYPE) == null) {
280+
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
271281
}
272282
response.setChunked(true);
273283
}
@@ -340,8 +350,8 @@ public Buffer apply(Object o) {
340350
private static void endOfStream(HttpServerResponse response) {
341351
if (response.bytesWritten() == 0) { // No item
342352
MultiMap headers = response.headers();
343-
if (headers.get("content-type") == null) {
344-
headers.set("content-type", "text/event-stream");
353+
if (headers.get(CONTENT_TYPE) == null) {
354+
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
345355
}
346356
}
347357
response.end();

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package io.a2a.server.rest.quarkus;
22

3+
import static io.a2a.transport.rest.context.RestContextKeys.HEADERS_KEY;
4+
import static io.a2a.transport.rest.context.RestContextKeys.METHOD_NAME_KEY;
35
import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
46
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
7+
import static jakarta.ws.rs.core.MediaType.SERVER_SENT_EVENTS;
58

69
import java.util.concurrent.Executor;
710
import java.util.concurrent.Flow;
@@ -65,7 +68,7 @@ public class A2AServerRoutes {
6568

6669
@Route(regex = "^/v1/message:send$", order = 1, methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
6770
public void sendMessage(@Body String body, RoutingContext rc) {
68-
ServerCallContext context = createCallContext(rc);
71+
ServerCallContext context = createCallContext(rc, "message/send");
6972
HTTPRestResponse response = null;
7073
try {
7174
response = jsonRestHandler.sendMessage(body, context);
@@ -78,7 +81,7 @@ public void sendMessage(@Body String body, RoutingContext rc) {
7881

7982
@Route(regex = "^/v1/message:stream$", order = 1, methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
8083
public void sendMessageStreaming(@Body String body, RoutingContext rc) {
81-
ServerCallContext context = createCallContext(rc);
84+
ServerCallContext context = createCallContext(rc, "message/stream");
8285
HTTPRestStreamingResponse streamingResponse = null;
8386
HTTPRestResponse error = null;
8487
try {
@@ -104,7 +107,7 @@ public void sendMessageStreaming(@Body String body, RoutingContext rc) {
104107
@Route(path = "/v1/tasks/:id", order = 1, methods = {Route.HttpMethod.GET}, type = Route.HandlerType.BLOCKING)
105108
public void getTask(RoutingContext rc) {
106109
String taskId = rc.pathParam("id");
107-
ServerCallContext context = createCallContext(rc);
110+
ServerCallContext context = createCallContext(rc, "task/get");
108111
HTTPRestResponse response = null;
109112
try {
110113
if (taskId == null || taskId.isEmpty()) {
@@ -128,7 +131,7 @@ public void getTask(RoutingContext rc) {
128131
@Route(regex = "^/v1/tasks/([^/]+):cancel$", order = 1, methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING)
129132
public void cancelTask(RoutingContext rc) {
130133
String taskId = rc.pathParam("param0");
131-
ServerCallContext context = createCallContext(rc);
134+
ServerCallContext context = createCallContext(rc, "tasks/cancel");
132135
HTTPRestResponse response = null;
133136
try {
134137
if (taskId == null || taskId.isEmpty()) {
@@ -161,7 +164,7 @@ private void sendResponse(RoutingContext rc, @Nullable HTTPRestResponse response
161164
@Route(regex = "^/v1/tasks/([^/]+):subscribe$", order = 1, methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING)
162165
public void resubscribeTask(RoutingContext rc) {
163166
String taskId = rc.pathParam("param0");
164-
ServerCallContext context = createCallContext(rc);
167+
ServerCallContext context = createCallContext(rc, "tasks/resubscribe");
165168
HTTPRestStreamingResponse streamingResponse = null;
166169
HTTPRestResponse error = null;
167170
try {
@@ -191,7 +194,7 @@ public void resubscribeTask(RoutingContext rc) {
191194
@Route(path = "/v1/tasks/:id/pushNotificationConfigs", order = 1, methods = {Route.HttpMethod.POST}, consumes = {APPLICATION_JSON}, type = Route.HandlerType.BLOCKING)
192195
public void setTaskPushNotificationConfiguration(@Body String body, RoutingContext rc) {
193196
String taskId = rc.pathParam("id");
194-
ServerCallContext context = createCallContext(rc);
197+
ServerCallContext context = createCallContext(rc, "tasks/pushNotificationConfig/set");
195198
HTTPRestResponse response = null;
196199
try {
197200
if (taskId == null || taskId.isEmpty()) {
@@ -210,7 +213,7 @@ public void setTaskPushNotificationConfiguration(@Body String body, RoutingConte
210213
public void getTaskPushNotificationConfiguration(RoutingContext rc) {
211214
String taskId = rc.pathParam("id");
212215
String configId = rc.pathParam("configId");
213-
ServerCallContext context = createCallContext(rc);
216+
ServerCallContext context = createCallContext(rc, "tasks/pushNotificationConfig/get");
214217
HTTPRestResponse response = null;
215218
try {
216219
if (taskId == null || taskId.isEmpty()) {
@@ -228,7 +231,7 @@ public void getTaskPushNotificationConfiguration(RoutingContext rc) {
228231
@Route(path = "/v1/tasks/:id/pushNotificationConfigs", order = 1, methods = {Route.HttpMethod.GET}, type = Route.HandlerType.BLOCKING)
229232
public void listTaskPushNotificationConfigurations(RoutingContext rc) {
230233
String taskId = rc.pathParam("id");
231-
ServerCallContext context = createCallContext(rc);
234+
ServerCallContext context = createCallContext(rc, "tasks/pushNotificationConfig/list");
232235
HTTPRestResponse response = null;
233236
try {
234237
if (taskId == null || taskId.isEmpty()) {
@@ -247,7 +250,7 @@ public void listTaskPushNotificationConfigurations(RoutingContext rc) {
247250
public void deleteTaskPushNotificationConfiguration(RoutingContext rc) {
248251
String taskId = rc.pathParam("id");
249252
String configId = rc.pathParam("configId");
250-
ServerCallContext context = createCallContext(rc);
253+
ServerCallContext context = createCallContext(rc, "tasks/pushNotificationConfig/delete");
251254
HTTPRestResponse response = null;
252255
try {
253256
if (taskId == null || taskId.isEmpty()) {
@@ -294,8 +297,7 @@ static void setStreamingMultiSseSupportSubscribedRunnable(Runnable runnable) {
294297
streamingMultiSseSupportSubscribedRunnable = runnable;
295298
}
296299

297-
private ServerCallContext createCallContext(RoutingContext rc) {
298-
300+
private ServerCallContext createCallContext(RoutingContext rc, String jsonRpcMethodName) {
299301
if (callContextFactory.isUnsatisfied()) {
300302
User user;
301303
if (rc.user() == null) {
@@ -328,7 +330,8 @@ String getUsername() {
328330
Map<String, String> headers = new HashMap<>();
329331
Set<String> headerNames = rc.request().headers().names();
330332
headerNames.forEach(name -> headers.put(name, rc.request().getHeader(name)));
331-
state.put("headers", headers);
333+
state.put(HEADERS_KEY, headers);
334+
state.put(METHOD_NAME_KEY, jsonRpcMethodName);
332335

333336
// Extract requested extensions from X-A2A-Extensions header
334337
List<String> extensionHeaderValues = rc.request().headers().getAll(A2AHeaders.X_A2A_EXTENSIONS);
@@ -351,8 +354,8 @@ private MultiSseSupport() {
351354
private static void initialize(HttpServerResponse response) {
352355
if (response.bytesWritten() == 0) {
353356
MultiMap headers = response.headers();
354-
if (headers.get("content-type") == null) {
355-
headers.set("content-type", "text/event-stream");
357+
if (headers.get(CONTENT_TYPE) == null) {
358+
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
356359
}
357360
response.setChunked(true);
358361
}
@@ -426,8 +429,8 @@ public Buffer apply(Object o) {
426429
private static void endOfStream(HttpServerResponse response) {
427430
if (response.bytesWritten() == 0) { // No item
428431
MultiMap headers = response.headers();
429-
if (headers.get("content-type") == null) {
430-
headers.set("content-type", "text/event-stream");
432+
if (headers.get(CONTENT_TYPE) == null) {
433+
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
431434
}
432435
}
433436
response.end();
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.a2a.transport.jsonrpc.context;
2+
3+
/**
4+
* Shared JSON-RPC context keys for A2A protocol data.
5+
*
6+
* These keys provide access to JSON-RPC context information,
7+
* enabling rich context access in service method implementations.
8+
*/
9+
public final class JSONRPCContextKeys {
10+
11+
/**
12+
* Context key for storing the headers.
13+
*/
14+
public static final String HEADERS_KEY = "headers";
15+
16+
/**
17+
* Context key for storing the method name being called.
18+
*/
19+
public static final String METHOD_NAME_KEY = "method";
20+
21+
private JSONRPCContextKeys() {
22+
// Utility class
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.a2a.transport.rest.context;
2+
3+
/**
4+
* Shared REST context keys for A2A protocol data.
5+
*
6+
* These keys provide access to REST context information,
7+
* enabling rich context access in service method implementations.
8+
*/
9+
public final class RestContextKeys {
10+
11+
/**
12+
* Context key for storing the headers.
13+
*/
14+
public static final String HEADERS_KEY = "headers";
15+
16+
/**
17+
* Context key for storing the method name being called.
18+
*/
19+
public static final String METHOD_NAME_KEY = "method";
20+
21+
private RestContextKeys() {
22+
// Utility class
23+
}
24+
}

0 commit comments

Comments
 (0)