Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gRPC endpoint support #161

Merged
merged 7 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package com.example;

import akka.javasdk.annotations.GrpcEndpoint;

@GrpcEndpoint
public class UserGrpcServiceImpl {
// would extend generated service stub and implement grpc methods but that's not important for this test

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import org.scalatest.wordspec.AnyWordSpec
class EndpointsDescriptorSpec extends AnyWordSpec with Matchers {

"akka-javasdk-components.conf" should {
"contain http endpoints components" in {
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")

"contain http endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.http-endpoint")
endpointComponents.size() shouldBe 2
endpointComponents should contain("com.example.HelloController")
endpointComponents should contain("com.example.UserRegistryController")
}

"contain grpc endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.grpc-endpoint")
endpointComponents.size() shouldBe 1
endpointComponents should contain("com.example.UserGrpcServiceImpl")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@SupportedAnnotationTypes(
{
"akka.javasdk.annotations.http.HttpEndpoint",
"akka.javasdk.annotations.GrpcEndpoint",
// all components will have this
"akka.javasdk.annotations.ComponentId",
// central config/lifecycle class
Expand All @@ -53,6 +54,7 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {

// key of each component type under that parent path, containing a string list of concrete component classes
private static final String HTTP_ENDPOINT_KEY = "http-endpoint";
private static final String GRPC_ENDPOINT_KEY = "grpc-endpoint";
private static final String EVENT_SOURCED_ENTITY_KEY = "event-sourced-entity";
private static final String VALUE_ENTITY_KEY = "key-value-entity";
private static final String TIMED_ACTION_KEY = "timed-action";
Expand All @@ -61,7 +63,9 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {
private static final String WORKFLOW_KEY = "workflow";
private static final String SERVICE_SETUP_KEY = "service-setup";

private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY, SERVICE_SETUP_KEY);
private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, GRPC_ENDPOINT_KEY,
EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY,
SERVICE_SETUP_KEY);


private final boolean debugEnabled;
Expand Down Expand Up @@ -124,6 +128,7 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
private String componentTypeFor(Element annotatedClass, TypeElement annotation) {
return switch (annotation.getQualifiedName().toString()) {
case "akka.javasdk.annotations.http.HttpEndpoint" -> HTTP_ENDPOINT_KEY;
case "akka.javasdk.annotations.GrpcEndpoint" -> GRPC_ENDPOINT_KEY;
case "akka.javasdk.annotations.Setup" -> SERVICE_SETUP_KEY;
case "akka.javasdk.annotations.ComponentId" -> componentType(annotatedClass);
default -> throw new IllegalArgumentException("Unknown annotation type: " + annotation.getQualifiedName());
Expand Down
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0</kalix-runtime.version>
<kalix-runtime.version>1.3.1-6cd7992</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.grpc;

import akka.javasdk.annotations.GrpcEndpoint;
import akkajavasdk.protocol.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@GrpcEndpoint
public class TestGrpcServiceImpl implements TestGrpcService {
@Override
public CompletionStage<TestGrpcServiceOuterClass.Out> simple(TestGrpcServiceOuterClass.In in) {
return CompletableFuture.completedFuture(
TestGrpcServiceOuterClass.Out.newBuilder().setData(in.getData()).build()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>

// gRPC interface for a gRPC endpoint component

syntax = "proto3";

package akkajavasdk;

option java_package = "akkajavasdk.protocol";

message In {
string data = 1;
}

message Out {
string data = 1;
}

service TestGrpcService {
rpc Simple(In) returns (Out) {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.annotations;

import java.lang.annotation.*;

/**
* Mark a class to be made available as a gRPC endpoint. The annotated class should extend a gRPC service interface
* generated using Akka gRPC, be public and have a public constructor.
* <p>
* Annotated classes can accept the following types to the constructor:
* <ul>
* <li>{@link akka.javasdk.client.ComponentClient}</li>
* <li>{@link akka.javasdk.http.HttpClientProvider}</li>
* <li>{@link akka.javasdk.timer.TimerScheduler}</li>
* <li>{@link akka.stream.Materializer}</li>
* <li>{@link com.typesafe.config.Config}</li>
* <li>Custom types provided by a {@link akka.javasdk.DependencyProvider} from the service setup</li>
* </ul>
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GrpcEndpoint {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.impl

import akka.actor.typed.ActorSystem
import akka.grpc.ServiceDescription
import akka.grpc.scaladsl.InstancePerRequestFactory
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.runtime.sdk.spi.GrpcEndpointDescriptor
import akka.runtime.sdk.spi.GrpcEndpointRequestConstructionContext

import scala.concurrent.Future

object GrpcEndpointDescriptorFactory {

def apply[T](grpcEndpointClass: Class[T], factory: () => T)(implicit
system: ActorSystem[_]): GrpcEndpointDescriptor[T] = {
// FIXME now way right now to know that it is a gRPC service interface
val serviceDefinitionClass: Class[_] = {
val interfaces = grpcEndpointClass.getInterfaces
if (interfaces.length != 1) {
throw new IllegalArgumentException(
s"Class [${grpcEndpointClass.getName}] must implement exactly one interface, the gRPC service generated by Akka gRPC.")
}
interfaces(0)
}

// FIXME a derivative should be injectable into user code as well
val instanceFactory = { (_: GrpcEndpointRequestConstructionContext) =>
factory()
}

val handlerFactory =
system.dynamicAccess
.createInstanceFor[InstancePerRequestFactory[T]](serviceDefinitionClass.getName + "ScalaHandlerFactory", Nil)
.get

// Pick up generated companion object for file descriptor (for reflection) and creating router
// static akka.grpc.ServiceDescription description in generated service interface
val description = serviceDefinitionClass.getField("description").get(null).asInstanceOf[ServiceDescription]
if (description eq null)
throw new RuntimeException(
s"Could not access static description from gRPC service interface [${serviceDefinitionClass.getName}]")

val routeFactory: (HttpRequest => T) => PartialFunction[HttpRequest, Future[HttpResponse]] = { serviceFactory =>
handlerFactory.partialInstancePerRequest(
serviceFactory,
description.name,
// FIXME default error handler, is it fine to leave like this, should runtime define?
PartialFunction.empty,
system)
}

new GrpcEndpointDescriptor[T](
grpcEndpointClass.getName,
description.name,
description.descriptor,
instanceFactory,
routeFactory)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will certainly need to revisit this magic section later...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the "integrate two generated code blocks with logic in the runtime" part is a bit tricky...


}
22 changes: 19 additions & 3 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import java.lang.reflect.Method
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand All @@ -21,7 +20,6 @@ import scala.jdk.OptionConverters.RichOption
import scala.jdk.OptionConverters.RichOptional
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand All @@ -34,6 +32,7 @@ import akka.javasdk.Principals
import akka.javasdk.ServiceSetup
import akka.javasdk.Tracing
import akka.javasdk.annotations.ComponentId
import akka.javasdk.annotations.GrpcEndpoint
import akka.javasdk.annotations.Setup
import akka.javasdk.annotations.http.HttpEndpoint
import akka.javasdk.client.ComponentClient
Expand Down Expand Up @@ -201,6 +200,7 @@ private object ComponentType {
val KeyValueEntity = "key-value-entity"
val Workflow = "workflow"
val HttpEndpoint = "http-endpoint"
val GrpcEndpoint = "grpc-endpoint"
val Consumer = "consumer"
val TimedAction = "timed-action"
val View = "view"
Expand All @@ -225,6 +225,7 @@ private object ComponentLocator {
val kalixComponentTypeAndBaseClasses: Map[String, Class[_]] =
Map(
ComponentType.HttpEndpoint -> classOf[AnyRef],
ComponentType.GrpcEndpoint -> classOf[AnyRef],
ComponentType.TimedAction -> classOf[TimedAction],
ComponentType.Consumer -> classOf[Consumer],
ComponentType.EventSourcedEntity -> classOf[EventSourcedEntity[_, _]],
Expand Down Expand Up @@ -349,7 +350,7 @@ private final class Sdk(
true
} else {
//additional check to skip logging for endpoints
if (!clz.hasAnnotation[HttpEndpoint]) {
if (!clz.hasAnnotation[HttpEndpoint] && !clz.hasAnnotation[GrpcEndpoint]) {
//this could happen when we remove the @ComponentId annotation from the class,
//the file descriptor generated by annotation processor might still have this class entry,
//for instance when working with IDE and incremental compilation (without clean)
Expand Down Expand Up @@ -415,6 +416,13 @@ private final class Sdk(
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}

private val grpcEndpointDescriptors = componentClasses
.filter(Reflect.isGrpcEndpoint)
.map { grpcEndpointClass =>
val anyRefClass = grpcEndpointClass.asInstanceOf[Class[AnyRef]]
GrpcEndpointDescriptorFactory(anyRefClass, grpcEndpointFactory(anyRefClass))(system)
}

private var eventSourcedEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var keyValueEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var workflowDescriptors = Vector.empty[WorkflowDescriptor]
Expand Down Expand Up @@ -596,6 +604,7 @@ private final class Sdk(
(eventSourcedEntityDescriptors ++
keyValueEntityDescriptors ++
httpEndpointDescriptors ++
grpcEndpointDescriptors ++
timedActionDescriptors ++
consumerDescriptors ++
viewDescriptors ++
Expand Down Expand Up @@ -709,6 +718,13 @@ private final class Sdk(
instance
}

private def grpcEndpointFactory[E](grpcEndpointClass: Class[E]): () => E = () => {
wiredInstance(grpcEndpointClass) {
// FIXME missing span from request
sideEffectingComponentInjects(None)
}
}

private def wiredInstance[T](clz: Class[T])(partial: PartialFunction[Class[_], Any]): T = {
// only one constructor allowed
require(clz.getDeclaredConstructors.length == 1, s"Class [${clz.getSimpleName}] must have only one constructor.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.javasdk.impl.reflection

import akka.annotation.InternalApi
import akka.javasdk.annotations.GrpcEndpoint
import akka.javasdk.annotations.http.HttpEndpoint
import akka.javasdk.client.ComponentClient
import akka.javasdk.consumer.Consumer
Expand All @@ -15,14 +16,14 @@ import akka.javasdk.timedaction.TimedAction
import akka.javasdk.view.TableUpdater
import akka.javasdk.view.View
import akka.javasdk.workflow.Workflow

import java.lang.annotation.Annotation
import java.lang.reflect.AnnotatedElement
import java.lang.reflect.Method
import java.lang.reflect.Modifier
import java.lang.reflect.ParameterizedType
import java.util
import java.util.Optional

import scala.annotation.tailrec
import scala.reflect.ClassTag

Expand Down Expand Up @@ -60,6 +61,9 @@ private[impl] object Reflect {
def isRestEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[HttpEndpoint]) != null

def isGrpcEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[GrpcEndpoint]) != null

def isEntity(cls: Class[_]): Boolean =
classOf[EventSourcedEntity[_, _]].isAssignableFrom(cls) ||
classOf[KeyValueEntity[_]].isAssignableFrom(cls)
Expand Down
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lazy val akkaJavaSdkTestKit =

lazy val akkaJavaSdkTests =
Project(id = "akka-javasdk-tests", base = file("akka-javasdk-tests"))
.enablePlugins(AkkaGrpcPlugin)
.dependsOn(akkaJavaSdk, akkaJavaSdkTestKit)
.settings(
name := "akka-javasdk-testkit",
Expand All @@ -67,7 +68,11 @@ lazy val akkaJavaSdkTests =
Test / javacOptions ++= Seq("-parameters"),
// only tests here
publish / skip := true,
doc / sources := Seq.empty)
doc / sources := Seq.empty,
// generating test service
Test / akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java),
Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client, AkkaGrpc.Server),
Test / PB.protoSources ++= (Compile / PB.protoSources).value)
.settings(inConfig(Test)(JupiterPlugin.scopedSettings))
.settings(Dependencies.tests)

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.1-6cd7992")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down
3 changes: 2 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
resolvers += "Akka repository".at("https://repo.akka.io/maven")

addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.0.1")
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.4.4")
// Note: akka-grpc must be carefully kept in sync with the version used in the runtime
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.5.1")
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.7.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.7.0")
Expand Down
Loading
Loading