Skip to content

Commit

Permalink
feat: Support emitting JSON to topics (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Mar 1, 2022
1 parent d890be6 commit 958eb78
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 3 deletions.
24 changes: 24 additions & 0 deletions docs/src/modules/javascript/partials/topic-eventing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,27 @@ service ShoppingCartTopicService {
}
}
----

== Publishing JSON messages to a topic

By default when publishig a message to a topic, the protobuf message is serialized to bytes and published with the `content-type`/`ce-datacontenttype` `application/protobuf`, and will also contain the metadata entry `ce-type` specifying the specific protobuf message type.

This is convenient when the consumer is another Akka Serverless service that can handle protobuf messages.

In many cases the consumer may be an external service though, and in such a case another serialization format for the messages can make sense. For such a use case the Akka Serverless Javascript SDK supports emitting JSON messages.

To publish JSON messages to a topic, mark the return type of the message as `google.protobuf.Any`. The object returned with `replies.message()` will be serialized to string format and published to the topic with content type `Content-Type` attribute stating `application/json`.

[source,protobuf,indent=0]
----
include::example$js-doc-snippets/proto/com/example/json/json_api.proto[tag=service]
----
<1> return `google.protobuf.Any` from the method
<2> annotate the Protobuf rpc method with `(akkaserverless.method).eventing`
<3> use `out` and `topic` to publish to a topic

In the service implementation, return an arbitrary object, it will be serialized to JSON for the topic:
[source,javascript,indent=0]
----
include::example$js-doc-snippets/src/myservice.js[tag=produce]
----
32 changes: 32 additions & 0 deletions samples/js/js-doc-snippets/proto/com/example/json/json_api.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
syntax = "proto3";
package com.example.json;

// tag::service[]
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "akkaserverless/annotations.proto";

message KeyValue {
string key = 1;
int32 value = 2;
}

service MyService {
option (akkaserverless.codegen) = {
action: {}
};

rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) {
option (akkaserverless.method).eventing.in = {
topic: "notifications"
};
}

rpc Produce(KeyValue) returns (google.protobuf.Any) { // <1>
option (akkaserverless.method).eventing.out = { // <2>
topic: "notifications" // <3>
};
}

}
// end::service[]
63 changes: 63 additions & 0 deletions samples/js/js-doc-snippets/src/myservice.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* This code was initialised by Akka Serverless tooling.
* As long as this file exists it will not be re-generated.
* You are free to make changes to this file.
*/

import { Action } from "@lightbend/akkaserverless-javascript-sdk";
import { replies } from '@lightbend/akkaserverless-javascript-sdk';
import * as grpc from '@grpc/grpc-js';

/**
* Type definitions.
* These types have been generated based on your proto source.
* A TypeScript aware editor such as VS Code will be able to leverage them to provide hinting and validation.
*
* DelegatingService; a strongly typed extension of Action derived from your proto source
* @typedef { import("../lib/generated/json/myservice").MyService } MyService
*/

/**
* @type MyService
*/
const action = new Action(
[
"com/example/json/json_api.proto",
],
"com.example.json.MyService",
{
includeDirs: ["./proto"]
}
);

action.commandHandlers = {
async Consume(request) {
console.log(request);
return replies.noReply();
},
// tag::produce[]
async Produce(request) {
return replies.message({ // <2>
arbitrary: "json"
})
}
// end::produce[]
};

export default action;

3 changes: 3 additions & 0 deletions sdk/bin/download-protoc.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ function determineDownloadFile() {
return makeDownloadFile('osx-x86_32');
case 'x64':
return makeDownloadFile('osx-x86_64');
case 'arm64':
// use rosetta for now
return makeDownloadFile('osx-x86_64');
}
break;
}
Expand Down
15 changes: 12 additions & 3 deletions sdk/src/action-support.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,18 @@ class ActionHandler {
this.streamDebug('Sending reply');
this.ctx.alreadyReplied = true;
if (message != null) {
const messageProto =
this.grpcMethod.resolvedResponseType.create(message);
const replyPayload = AnySupport.serialize(messageProto, false, false);
let replyPayload;
if (
this.grpcMethod.resolvedResponseType.fullName ===
'.google.protobuf.Any'
) {
// special handling to emit JSON to topics by defining return type as proto Any
replyPayload = AnySupport.serialize(message, false, true);
} else {
const messageProto =
this.grpcMethod.resolvedResponseType.create(message);
replyPayload = AnySupport.serialize(messageProto, false, false);
}
let replyMetadata = null;
if (metadata && metadata.entries) {
replyMetadata = {
Expand Down
36 changes: 36 additions & 0 deletions sdk/test/action-handler-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ const anySupport = new AnySupport(root);

const In = root.lookupType('com.example.In');
const Out = root.lookupType('com.example.Out');
const Any = root.lookupType('google.protobuf.Any');
const ExampleServiceName = 'com.example.ExampleService';
const ExampleService = root.lookupService(ExampleServiceName);

const replies = require('../src/reply');
const stableJsonStringify = require('json-stable-stringify');

class MockUnaryCall {
constructor(request) {
Expand Down Expand Up @@ -90,6 +92,7 @@ function createAction(handler) {
service: ExampleService,
commandHandlers: {
DoSomething: handler,
PublishJsonToTopic: handler,
},
},
allComponents,
Expand All @@ -109,6 +112,18 @@ function callDoSomething(action, message) {
return call;
}

function callPublishJsonToTopic(action, message) {
const command = {
serviceName: ExampleServiceName,
name: 'PublishJsonToTopic',
payload: AnySupport.serialize(In.create(message)),
};
const call = new MockUnaryCall(command);
const callback = (error, value) => call.write(value);
action.handleUnary(call, callback);
return call;
}

function testActionHandler(value, handler) {
return callDoSomething(createAction(handler), { field: value });
}
Expand Down Expand Up @@ -562,4 +577,25 @@ describe('ActionHandler', () => {
'nested:promised:async:async:something',
);
});

it('should reply with Akkaserverless JSON for unary methods returning Any', () => {
let expectedReply = { arbitrary: 'object' };
return callPublishJsonToTopic(
createAction((message, context) => {
return replies.message(expectedReply);
}),
{ field: 'whatever' },
)
.value.then((response) => {
const payload = response.reply.payload;
payload.should.have.property(
'type_url',
'json.akkaserverless.com/object',
);
return JSON.parse(
AnySupport.deserializePrimitive(payload.value, 'string'),
);
})
.should.eventually.deep.equal(expectedReply);
});
});
2 changes: 2 additions & 0 deletions sdk/test/example.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ syntax = "proto3";

package com.example;

import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

message Example {
Expand All @@ -40,6 +41,7 @@ message Out {
service ExampleService {
rpc DoSomething(In) returns (Out);
rpc StreamSomething(In) returns (stream Out);
rpc PublishJsonToTopic(In) returns (google.protobuf.Any);
}

service ExampleServiceTwo {
Expand Down

0 comments on commit 958eb78

Please sign in to comment.