From 958eb78b8af8e969a5eb251f9eaafdafad1763fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 1 Mar 2022 22:47:04 +0100 Subject: [PATCH] feat: Support emitting JSON to topics (#246) --- .../javascript/partials/topic-eventing.adoc | 24 +++++++ .../proto/com/example/json/json_api.proto | 32 ++++++++++ samples/js/js-doc-snippets/src/myservice.js | 63 +++++++++++++++++++ sdk/bin/download-protoc.js | 3 + sdk/src/action-support.js | 15 ++++- sdk/test/action-handler-test.js | 36 +++++++++++ sdk/test/example.proto | 2 + 7 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 samples/js/js-doc-snippets/proto/com/example/json/json_api.proto create mode 100644 samples/js/js-doc-snippets/src/myservice.js diff --git a/docs/src/modules/javascript/partials/topic-eventing.adoc b/docs/src/modules/javascript/partials/topic-eventing.adoc index 72a233ef..a55b5a89 100644 --- a/docs/src/modules/javascript/partials/topic-eventing.adoc +++ b/docs/src/modules/javascript/partials/topic-eventing.adoc @@ -43,3 +43,27 @@ service ShoppingCartTopicService { } } ---- + +== Publishing JSON messages to a topic + +By default when publishig a message to a topic, the protobuf message is serialized to bytes and published with the `content-type`/`ce-datacontenttype` `application/protobuf`, and will also contain the metadata entry `ce-type` specifying the specific protobuf message type. + +This is convenient when the consumer is another Akka Serverless service that can handle protobuf messages. + +In many cases the consumer may be an external service though, and in such a case another serialization format for the messages can make sense. For such a use case the Akka Serverless Javascript SDK supports emitting JSON messages. + +To publish JSON messages to a topic, mark the return type of the message as `google.protobuf.Any`. The object returned with `replies.message()` will be serialized to string format and published to the topic with content type `Content-Type` attribute stating `application/json`. + +[source,protobuf,indent=0] +---- +include::example$js-doc-snippets/proto/com/example/json/json_api.proto[tag=service] +---- +<1> return `google.protobuf.Any` from the method +<2> annotate the Protobuf rpc method with `(akkaserverless.method).eventing` +<3> use `out` and `topic` to publish to a topic + +In the service implementation, return an arbitrary object, it will be serialized to JSON for the topic: +[source,javascript,indent=0] +---- +include::example$js-doc-snippets/src/myservice.js[tag=produce] +---- diff --git a/samples/js/js-doc-snippets/proto/com/example/json/json_api.proto b/samples/js/js-doc-snippets/proto/com/example/json/json_api.proto new file mode 100644 index 00000000..5fc83312 --- /dev/null +++ b/samples/js/js-doc-snippets/proto/com/example/json/json_api.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; +package com.example.json; + +// tag::service[] +import "google/protobuf/any.proto"; +import "google/protobuf/empty.proto"; +import "akkaserverless/annotations.proto"; + +message KeyValue { + string key = 1; + int32 value = 2; +} + +service MyService { + option (akkaserverless.codegen) = { + action: {} + }; + + rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { + option (akkaserverless.method).eventing.in = { + topic: "notifications" + }; + } + + rpc Produce(KeyValue) returns (google.protobuf.Any) { // <1> + option (akkaserverless.method).eventing.out = { // <2> + topic: "notifications" // <3> + }; + } + +} +// end::service[] diff --git a/samples/js/js-doc-snippets/src/myservice.js b/samples/js/js-doc-snippets/src/myservice.js new file mode 100644 index 00000000..7ca45ee7 --- /dev/null +++ b/samples/js/js-doc-snippets/src/myservice.js @@ -0,0 +1,63 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This code was initialised by Akka Serverless tooling. + * As long as this file exists it will not be re-generated. + * You are free to make changes to this file. + */ + +import { Action } from "@lightbend/akkaserverless-javascript-sdk"; +import { replies } from '@lightbend/akkaserverless-javascript-sdk'; +import * as grpc from '@grpc/grpc-js'; + +/** + * Type definitions. + * These types have been generated based on your proto source. + * A TypeScript aware editor such as VS Code will be able to leverage them to provide hinting and validation. + * + * DelegatingService; a strongly typed extension of Action derived from your proto source + * @typedef { import("../lib/generated/json/myservice").MyService } MyService + */ + +/** + * @type MyService + */ +const action = new Action( + [ + "com/example/json/json_api.proto", + ], + "com.example.json.MyService", + { + includeDirs: ["./proto"] + } +); + +action.commandHandlers = { + async Consume(request) { + console.log(request); + return replies.noReply(); + }, + // tag::produce[] + async Produce(request) { + return replies.message({ // <2> + arbitrary: "json" + }) + } + // end::produce[] +}; + +export default action; + diff --git a/sdk/bin/download-protoc.js b/sdk/bin/download-protoc.js index cc9b9252..5b3c4ff5 100755 --- a/sdk/bin/download-protoc.js +++ b/sdk/bin/download-protoc.js @@ -41,6 +41,9 @@ function determineDownloadFile() { return makeDownloadFile('osx-x86_32'); case 'x64': return makeDownloadFile('osx-x86_64'); + case 'arm64': + // use rosetta for now + return makeDownloadFile('osx-x86_64'); } break; } diff --git a/sdk/src/action-support.js b/sdk/src/action-support.js index 3744176a..87356d54 100644 --- a/sdk/src/action-support.js +++ b/sdk/src/action-support.js @@ -319,9 +319,18 @@ class ActionHandler { this.streamDebug('Sending reply'); this.ctx.alreadyReplied = true; if (message != null) { - const messageProto = - this.grpcMethod.resolvedResponseType.create(message); - const replyPayload = AnySupport.serialize(messageProto, false, false); + let replyPayload; + if ( + this.grpcMethod.resolvedResponseType.fullName === + '.google.protobuf.Any' + ) { + // special handling to emit JSON to topics by defining return type as proto Any + replyPayload = AnySupport.serialize(message, false, true); + } else { + const messageProto = + this.grpcMethod.resolvedResponseType.create(message); + replyPayload = AnySupport.serialize(messageProto, false, false); + } let replyMetadata = null; if (metadata && metadata.entries) { replyMetadata = { diff --git a/sdk/test/action-handler-test.js b/sdk/test/action-handler-test.js index edcd03c6..664546fa 100644 --- a/sdk/test/action-handler-test.js +++ b/sdk/test/action-handler-test.js @@ -35,10 +35,12 @@ const anySupport = new AnySupport(root); const In = root.lookupType('com.example.In'); const Out = root.lookupType('com.example.Out'); +const Any = root.lookupType('google.protobuf.Any'); const ExampleServiceName = 'com.example.ExampleService'; const ExampleService = root.lookupService(ExampleServiceName); const replies = require('../src/reply'); +const stableJsonStringify = require('json-stable-stringify'); class MockUnaryCall { constructor(request) { @@ -90,6 +92,7 @@ function createAction(handler) { service: ExampleService, commandHandlers: { DoSomething: handler, + PublishJsonToTopic: handler, }, }, allComponents, @@ -109,6 +112,18 @@ function callDoSomething(action, message) { return call; } +function callPublishJsonToTopic(action, message) { + const command = { + serviceName: ExampleServiceName, + name: 'PublishJsonToTopic', + payload: AnySupport.serialize(In.create(message)), + }; + const call = new MockUnaryCall(command); + const callback = (error, value) => call.write(value); + action.handleUnary(call, callback); + return call; +} + function testActionHandler(value, handler) { return callDoSomething(createAction(handler), { field: value }); } @@ -562,4 +577,25 @@ describe('ActionHandler', () => { 'nested:promised:async:async:something', ); }); + + it('should reply with Akkaserverless JSON for unary methods returning Any', () => { + let expectedReply = { arbitrary: 'object' }; + return callPublishJsonToTopic( + createAction((message, context) => { + return replies.message(expectedReply); + }), + { field: 'whatever' }, + ) + .value.then((response) => { + const payload = response.reply.payload; + payload.should.have.property( + 'type_url', + 'json.akkaserverless.com/object', + ); + return JSON.parse( + AnySupport.deserializePrimitive(payload.value, 'string'), + ); + }) + .should.eventually.deep.equal(expectedReply); + }); }); diff --git a/sdk/test/example.proto b/sdk/test/example.proto index 1dbcae2d..527944fc 100644 --- a/sdk/test/example.proto +++ b/sdk/test/example.proto @@ -16,6 +16,7 @@ syntax = "proto3"; package com.example; +import "google/protobuf/any.proto"; import "akkaserverless/annotations.proto"; message Example { @@ -40,6 +41,7 @@ message Out { service ExampleService { rpc DoSomething(In) returns (Out); rpc StreamSomething(In) returns (stream Out); + rpc PublishJsonToTopic(In) returns (google.protobuf.Any); } service ExampleServiceTwo {