Skip to content

Commit

Permalink
feat: gRPC endpoint support
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Jan 16, 2025
1 parent c599636 commit db89db6
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package com.example;

import akka.javasdk.annotations.GrpcEndpoint;

@GrpcEndpoint
public class UserGrpcServiceImpl {
// would extend generated service stub and implement grpc methods but that's not important for this test

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import org.scalatest.wordspec.AnyWordSpec
class EndpointsDescriptorSpec extends AnyWordSpec with Matchers {

"akka-javasdk-components.conf" should {
"contain http endpoints components" in {
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")

"contain http endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.http-endpoint")
endpointComponents.size() shouldBe 2
endpointComponents should contain("com.example.HelloController")
endpointComponents should contain("com.example.UserRegistryController")
}

"contain grpc endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.grpc-endpoint")
endpointComponents.size() shouldBe 1
endpointComponents should contain("com.example.UserGrpcServiceImpl")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@SupportedAnnotationTypes(
{
"akka.javasdk.annotations.http.HttpEndpoint",
"akka.javasdk.annotations.GrpcEndpoint",
// all components will have this
"akka.javasdk.annotations.ComponentId",
// central config/lifecycle class
Expand All @@ -53,6 +54,7 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {

// key of each component type under that parent path, containing a string list of concrete component classes
private static final String HTTP_ENDPOINT_KEY = "http-endpoint";
private static final String GRPC_ENDPOINT_KEY = "grpc-endpoint";
private static final String EVENT_SOURCED_ENTITY_KEY = "event-sourced-entity";
private static final String VALUE_ENTITY_KEY = "key-value-entity";
private static final String TIMED_ACTION_KEY = "timed-action";
Expand All @@ -61,7 +63,9 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {
private static final String WORKFLOW_KEY = "workflow";
private static final String SERVICE_SETUP_KEY = "service-setup";

private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY, SERVICE_SETUP_KEY);
private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, GRPC_ENDPOINT_KEY,
EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY,
SERVICE_SETUP_KEY);


private final boolean debugEnabled;
Expand Down Expand Up @@ -124,6 +128,7 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
private String componentTypeFor(Element annotatedClass, TypeElement annotation) {
return switch (annotation.getQualifiedName().toString()) {
case "akka.javasdk.annotations.http.HttpEndpoint" -> HTTP_ENDPOINT_KEY;
case "akka.javasdk.annotations.GrpcEndpoint" -> GRPC_ENDPOINT_KEY;
case "akka.javasdk.annotations.Setup" -> SERVICE_SETUP_KEY;
case "akka.javasdk.annotations.ComponentId" -> componentType(annotatedClass);
default -> throw new IllegalArgumentException("Unknown annotation type: " + annotation.getQualifiedName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.grpc;

import akka.javasdk.annotations.GrpcEndpoint;
import akkajavasdk.protocol.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@GrpcEndpoint
public class TestGrpcServiceImpl implements TestGrpcService {
@Override
public CompletionStage<TestGrpcServiceOuterClass.Out> simple(TestGrpcServiceOuterClass.In in) {
return CompletableFuture.completedFuture(
TestGrpcServiceOuterClass.Out.newBuilder().setData(in.getData()).build()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>

// gRPC interface for a gRPC endpoint component

syntax = "proto3";

package akkajavasdk;

option java_package = "akkajavasdk.protocol";

message In {
string data = 1;
}

message Out {
string data = 1;
}

service TestGrpcService {
rpc Simple(In) returns (Out) {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.annotations;

import java.lang.annotation.*;

/**
* Mark a class to be made available as a gRPC endpoint. The annotated class should extend a gRPC service interface
* generated using Akka gRPC, be public and have a public constructor.
* <p>
* Annotated classes can accept the following types to the constructor:
* <ul>
* <li>{@link akka.javasdk.client.ComponentClient}</li>
* <li>{@link akka.javasdk.http.HttpClientProvider}</li>
* <li>{@link akka.javasdk.timer.TimerScheduler}</li>
* <li>{@link akka.stream.Materializer}</li>
* <li>{@link com.typesafe.config.Config}</li>
* <li>Custom types provided by a {@link akka.javasdk.DependencyProvider} from the service setup</li>
* </ul>
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GrpcEndpoint {
}
53 changes: 51 additions & 2 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import java.lang.reflect.Method
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand All @@ -21,11 +20,13 @@ import scala.jdk.OptionConverters.RichOption
import scala.jdk.OptionConverters.RichOptional
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.http.javadsl.model.HttpHeader
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.headers.RawHeader
import akka.javasdk.BuildInfo
import akka.javasdk.DependencyProvider
Expand Down Expand Up @@ -421,6 +422,47 @@ private final class Sdk(
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}

final class GrpcServiceDescriptor(
fileDescriptor: com.google.protobuf.Descriptors.FileDescriptor,
partialHandlerFactory: () => PartialFunction[HttpRequest, Future[HttpResponse]])

@nowarn("msg=never used")
private val grpcEndpointDescriptors = componentClasses
.filter(Reflect.isGrpcEndpoint)
.filterNot(isDisabled)
.map { grpcEndpointClass =>
// FIXME validate that it is a grpc service
val serviceDefinitionClass: Class[_] = grpcEndpointClass.getSuperclass

// Pick up generated companion object for file descriptor (for reflection) and creating router
val companion = system.dynamicAccess.getObjectFor[akka.grpc.ServiceDescription](grpcEndpointClass.getName).get

// FIXME creating router is concrete method, not any interface, would be better to have an interface and not do reflectively
// FIXME service lifecycle - one instance vs per request for HTTP endpoint, should we align?
val handlerFactory: Any =
system.dynamicAccess.createInstanceFor[Any](grpcEndpointClass.getName + "HandlerFactory", Nil).get

// (Service impl, String prefix, ClassicActorSystemProvider) => Function<akka.http.javadsl.model.HttpRequest, CompletionStage<akka.http.javadsl.model.HttpResponse>>
val factoryMethod: Method = handlerFactory.getClass.getMethod(
"partial",
serviceDefinitionClass,
classOf[String],
classOf[ClassicActorSystemProvider])

val serviceInstance: AnyRef = grpcEndpointFactory(grpcEndpointClass)

// FIXME we need a component descriptor type
new GrpcServiceDescriptor(
companion.descriptor,
// FIXME call factory here, or hand it to runtime
// FIXME instance per call (for isolation, but also for tracing/request context) instead of instance per grpc service
// FIXME turn java partial Function into PartialFunction (is that even possible?)
() =>
factoryMethod
.invoke(handlerFactory, serviceInstance, system)
.asInstanceOf[PartialFunction[HttpRequest, Future[HttpResponse]]])
}

private var eventSourcedEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var keyValueEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var workflowDescriptors = Vector.empty[WorkflowDescriptor]
Expand Down Expand Up @@ -696,6 +738,13 @@ private final class Sdk(
instance
}

private def grpcEndpointFactory[E](grpcEndpointClass: Class[E]): () => E = () => {
wiredInstance(grpcEndpointClass) {
// FIXME missing span from request
sideEffectingComponentInjects(None)
}
}

private def wiredInstance[T](clz: Class[T])(partial: PartialFunction[Class[_], Any]): T = {
// only one constructor allowed
require(clz.getDeclaredConstructors.length == 1, s"Class [${clz.getSimpleName}] must have only one constructor.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.javasdk.impl.reflection

import akka.annotation.InternalApi
import akka.javasdk.annotations.GrpcEndpoint
import akka.javasdk.annotations.http.HttpEndpoint
import akka.javasdk.client.ComponentClient
import akka.javasdk.consumer.Consumer
Expand All @@ -15,14 +16,14 @@ import akka.javasdk.timedaction.TimedAction
import akka.javasdk.view.TableUpdater
import akka.javasdk.view.View
import akka.javasdk.workflow.Workflow

import java.lang.annotation.Annotation
import java.lang.reflect.AnnotatedElement
import java.lang.reflect.Method
import java.lang.reflect.Modifier
import java.lang.reflect.ParameterizedType
import java.util
import java.util.Optional

import scala.annotation.tailrec
import scala.reflect.ClassTag

Expand Down Expand Up @@ -60,6 +61,9 @@ private[impl] object Reflect {
def isRestEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[HttpEndpoint]) != null

def isGrpcEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[GrpcEndpoint]) != null

def isEntity(cls: Class[_]): Boolean =
classOf[EventSourcedEntity[_, _]].isAssignableFrom(cls) ||
classOf[KeyValueEntity[_]].isAssignableFrom(cls)
Expand Down
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lazy val akkaJavaSdkTestKit =

lazy val akkaJavaSdkTests =
Project(id = "akka-javasdk-tests", base = file("akka-javasdk-tests"))
.enablePlugins(AkkaGrpcPlugin)
.dependsOn(akkaJavaSdk, akkaJavaSdkTestKit)
.settings(
name := "akka-javasdk-testkit",
Expand All @@ -67,7 +68,12 @@ lazy val akkaJavaSdkTests =
Test / javacOptions ++= Seq("-parameters"),
// only tests here
publish / skip := true,
doc / sources := Seq.empty)
doc / sources := Seq.empty,
// generating test service
Test / akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java),
Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client, AkkaGrpc.Server),
Test / PB.protoSources ++= (Compile / PB.protoSources).value
)
.settings(inConfig(Test)(JupiterPlugin.scopedSettings))
.settings(Dependencies.tests)

Expand Down

0 comments on commit db89db6

Please sign in to comment.