From 36fe868cf33b35b375a00f5bf0de7be242df62b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 24 Jan 2025 15:42:11 +0100 Subject: [PATCH 1/4] feat: gRPC client provider for cross service and external calls --- .../akka/javasdk/grpc/GrpcClientProvider.java | 32 ++++ .../akka/javasdk/http/HttpClientProvider.java | 12 +- .../scala/akka/javasdk/impl/SdkRunner.scala | 45 +++-- .../impl/grpc/GrpcClientProviderImpl.scala | 158 ++++++++++++++++++ .../impl/http/HttpClientProviderImpl.scala | 3 +- .../pages/component-and-service-calls.adoc | 56 ++++++- .../pages/setup-and-dependency-injection.adoc | 4 +- .../CallExternalGrpcEndpointImpl.java | 28 ++++ .../DelegatingGrpcEndpointImpl.java | 30 ++++ .../DelegatingServiceEndpoint.java | 1 - .../com/example/call_grpc_endpoint.proto | 19 +++ 11 files changed, 361 insertions(+), 27 deletions(-) create mode 100644 akka-javasdk/src/main/java/akka/javasdk/grpc/GrpcClientProvider.java create mode 100644 akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala create mode 100644 samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java create mode 100644 samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java create mode 100644 samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto diff --git a/akka-javasdk/src/main/java/akka/javasdk/grpc/GrpcClientProvider.java b/akka-javasdk/src/main/java/akka/javasdk/grpc/GrpcClientProvider.java new file mode 100644 index 000000000..b09f8ec19 --- /dev/null +++ b/akka-javasdk/src/main/java/akka/javasdk/grpc/GrpcClientProvider.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.grpc; + +import akka.annotation.DoNotInherit; +import akka.grpc.javadsl.AkkaGrpcClient; + +/** + * Not for user extension, instances are available through injection in selected component types. + */ +@DoNotInherit +public interface GrpcClientProvider { + /** + * Returns a gRPC client to interact with the specified service. + * + *

If the serviceName is a service name without protocol or domain the client will be configured to connect to + * another service deployed with that name on the same Akka project. The runtime will take care of + * routing requests to the service and keeping the data safe by encrypting the connection between services and identifying + * the client as coming from this service. + * + *

If it is a full dns name prefixed with "http://" or "https://" it will connect to services available on the + * public internet. + * + *

The client lifecycle is managed by the runtime, service code should not call {@code close() on the client. + * + * // FIXME what about powerapi client interface for request headers? + * @param serviceClass A gRPC service client interface generated by Akka gRPC + */ + T grpcClientFor(Class serviceClass, String serviceName); +} diff --git a/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java b/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java index 653fb81bd..53a4e23a2 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java +++ b/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java @@ -13,11 +13,15 @@ public interface HttpClientProvider { /** - * Returns a {@link HttpClient} configured to connect to another service deployed on the same - * project. + * Returns a {@link HttpClient} to interact with the specified HTTP service. * - *

The service is identified only by the name it has been deployed. The runtime takes care of - * routing requests to the service and keeping the data safe by encrypting the connection. + *

If the serviceName is a service name without protocol or domain the client will be configured to connect to + * another service deployed with that name on the same Akka project. The runtime will take care of + * routing requests to the service and keeping the data safe by encrypting the connection between services and identifying + * the client as coming from this service. + * + *

If it is a full dns name prefixed with "http://" or "https://" it will connect to services available on the + * public internet. */ HttpClient httpClientFor(String serviceName); } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 16b5bb692..968b6c6e1 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -39,6 +39,7 @@ import akka.javasdk.client.ComponentClient import akka.javasdk.consumer.Consumer import akka.javasdk.eventsourcedentity.EventSourcedEntity import akka.javasdk.eventsourcedentity.EventSourcedEntityContext +import akka.javasdk.grpc.GrpcClientProvider import akka.javasdk.http.AbstractHttpEndpoint import akka.javasdk.http.HttpClientProvider import akka.javasdk.http.RequestContext @@ -51,6 +52,7 @@ import akka.javasdk.impl.Validations.Validation import akka.javasdk.impl.client.ComponentClientImpl import akka.javasdk.impl.consumer.ConsumerImpl import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl +import akka.javasdk.impl.grpc.GrpcClientProviderImpl import akka.javasdk.impl.http.HttpClientProviderImpl import akka.javasdk.impl.http.JwtClaimsImpl import akka.javasdk.impl.keyvalueentity.KeyValueEntityImpl @@ -290,6 +292,18 @@ private[javasdk] object Sdk { dependencyProvider: Option[DependencyProvider], httpClientProvider: HttpClientProvider, serializer: JsonSerializer) + + private val platformManagedDependency = Set[Class[_]]( + classOf[ComponentClient], + classOf[TimerScheduler], + classOf[HttpClientProvider], + classOf[GrpcClientProvider], + classOf[Tracer], + classOf[Span], + classOf[Config], + classOf[WorkflowContext], + classOf[EventSourcedEntityContext], + classOf[KeyValueEntityContext]) } /** @@ -308,6 +322,8 @@ private final class Sdk( disabledComponents: Set[Class[_]], startedPromise: Promise[StartupContext], serviceNameOverride: Option[String]) { + import Sdk._ + private val logger = LoggerFactory.getLogger(getClass) private val serializer = new JsonSerializer private val ComponentLocator.LocatedClasses(componentClasses, maybeServiceClass) = @@ -319,12 +335,18 @@ private final class Sdk( private val sdkTracerFactory = () => tracerFactory(TraceInstrumentation.InstrumentationScopeName) - private val httpClientProvider = new HttpClientProviderImpl( + private lazy val httpClientProvider = new HttpClientProviderImpl( system, None, remoteIdentification.map(ri => RawHeader(ri.headerName, ri.headerValue)), sdkSettings) + private lazy val grpcClientProvider = new GrpcClientProviderImpl( + system, + sdkSettings, + applicationConfig, + remoteIdentification.map(ri => GrpcClientProviderImpl.AuthHeaders(ri.headerName, ri.headerValue))) + private lazy val userServiceConfig = { // hiding these paths from the config provided to user val sensitivePaths = List("akka", "kalix.meta", "kalix.proxy", "kalix.runtime", "system") @@ -578,10 +600,11 @@ private final class Sdk( // Note: config is also always available through the combination with user DI way down below private def sideEffectingComponentInjects(span: Option[Span]): PartialFunction[Class[_], Any] = { // remember to update component type API doc and docs if changing the set of injectables - case p if p == classOf[ComponentClient] => componentClient(span) - case h if h == classOf[HttpClientProvider] => httpClientProvider(span) - case t if t == classOf[TimerScheduler] => timerScheduler(span) - case m if m == classOf[Materializer] => sdkMaterializer + case p if p == classOf[ComponentClient] => componentClient(span) + case h if h == classOf[HttpClientProvider] => httpClientProvider(span) + case g if g == classOf[GrpcClientProviderImpl] => grpcClientProvider // FIXME trace propagation + case t if t == classOf[TimerScheduler] => timerScheduler(span) + case m if m == classOf[Materializer] => sdkMaterializer } val spiComponents: SpiComponents = { @@ -775,18 +798,6 @@ private final class Sdk( } } - private def platformManagedDependency(anyOther: Class[_]) = { - anyOther == classOf[ComponentClient] || - anyOther == classOf[TimerScheduler] || - anyOther == classOf[HttpClientProvider] || - anyOther == classOf[Tracer] || - anyOther == classOf[Span] || - anyOther == classOf[Config] || - anyOther == classOf[WorkflowContext] || - anyOther == classOf[EventSourcedEntityContext] || - anyOther == classOf[KeyValueEntityContext] - } - private def componentClient(openTelemetrySpan: Option[Span]): ComponentClient = { ComponentClientImpl(runtimeComponentClients, serializer, openTelemetrySpan)(sdkExecutionContext) } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala new file mode 100644 index 000000000..0ac01f408 --- /dev/null +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl.grpc + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.actor.CoordinatedShutdown +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.discovery.Discovery +import akka.grpc.GrpcClientSettings +import akka.grpc.javadsl.AkkaGrpcClient +import akka.javasdk.grpc.GrpcClientProvider +import akka.javasdk.impl.Settings +import akka.javasdk.impl.grpc.GrpcClientProviderImpl.AuthHeaders +import com.typesafe.config.Config +import org.slf4j.LoggerFactory + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executor +import scala.annotation.nowarn +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object GrpcClientProviderImpl { + final case class AuthHeaders(headerName: String, headerValue: String) + private final case class ClientKey(clientClass: Class[_], serviceName: String) + + private def isAkkaService(serviceName: String): Boolean = !(serviceName.contains('.') || serviceName.contains(':')) + + @nowarn("msg=deprecated") + private def settingsWithCallCredentials(key: String, value: String)( + settings: GrpcClientSettings): GrpcClientSettings = { + + import io.grpc.{ CallCredentials, Metadata } + val headers = new Metadata() + headers.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value) + settings.withCallCredentials(new CallCredentials { + override def applyRequestMetadata( + requestInfo: CallCredentials.RequestInfo, + appExecutor: Executor, + applier: CallCredentials.MetadataApplier): Unit = { + applier.apply(headers) + } + override def thisUsesUnstableApi(): Unit = () + }) + + } + +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class GrpcClientProviderImpl( + system: ActorSystem[_], + settings: Settings, + applicationConfig: Config, + remoteIdentificationHeader: Option[AuthHeaders]) + extends GrpcClientProvider { + import GrpcClientProviderImpl._ + import system.executionContext + + private val log = LoggerFactory.getLogger(classOf[GrpcClientProvider]) + + private val clients = new ConcurrentHashMap[ClientKey, AkkaGrpcClient]() + + CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseServiceStop, "stop-grpc-clients")(() => + Future + .traverse(clients.values().asScala)(_.close().asScala) + .map(_ => Done)) + + override def grpcClientFor[T <: AkkaGrpcClient](serviceClass: Class[T], serviceName: String): T = { + val clientKey = ClientKey(serviceClass, serviceName) + clients.computeIfAbsent(clientKey, createNewClientFor _).asInstanceOf[T] + } + + // FIXME for testkit + // /** This gets called by the testkit, and should impersonate the given principal. */ + // def impersonatingGrpcClient[T <: AkkaGrpcClient](serviceClass: Class[T], service: String, port: Int, impersonate: String): T = + // getGrpcClient(serviceClass, service, port, Some("impersonate-kalix-service" -> impersonate)) + + private def createNewClientFor(clientKey: ClientKey): AkkaGrpcClient = { + val clientSettings = + if (isAkkaService(clientKey.serviceName)) { + val akkaServiceClientSettings = if (settings.devModeSettings.isDefined) { + // local service discovery when running locally + // dev mode, other service name, use Akka discovery to find it + // the runtime has set up a mechanism that finds locally running + // services. Since in dev mode blocking is probably fine for now. + try { + val result = Await.result(Discovery(system).discovery.lookup(clientKey.serviceName, 5.seconds), 5.seconds) + val address = result.addresses.head + // port is always set + val port = address.port.get + log.debug( + "Creating dev mode gRPC client for Akka service [{}] found at [{}:{}]", + clientKey.serviceName, + address.address, + port) + GrpcClientSettings + .connectToServiceAt(address.host, port)(system) + // (No TLS locally) + .withTls(false) + } catch { + case NonFatal(ex) => + throw new RuntimeException( + s"Failed to look up service [${clientKey.serviceName}] in dev-mode, make sure that it is also running " + + "with a separate port and service name correctly defined in its application.conf under 'akka.javasdk.dev-mode.service-name' " + + "if it differs from the maven project name", + ex) + } + } else { + log.debug("Creating gRPC client for Akka service [{}]", clientKey.serviceName) + GrpcClientSettings + .connectToServiceAt(clientKey.serviceName, 80)(system) + // (TLS is handled for us by Kalix infra) + .withTls(false) + } + + remoteIdentificationHeader match { + case Some(auth) => settingsWithCallCredentials(auth.headerName, auth.headerValue)(akkaServiceClientSettings) + case None => akkaServiceClientSettings + } + } else { + // external/public gRPC service + log.debug("Creating gRPC client for external service [{}]", clientKey.serviceName) + + // FIXME we should probably not allow any grpc client setting but a subset? + // external service, details defined in user config + GrpcClientSettings.fromConfig(clientKey.serviceName, applicationConfig)(system) + } + + // Java API - static create + val create = + clientKey.clientClass.getMethod("create", classOf[GrpcClientSettings], classOf[ClassicActorSystemProvider]) + val client = create.invoke(null, clientSettings, system).asInstanceOf[AkkaGrpcClient] + + client.closed().asScala.foreach { _ => + // user should not close client, but just to be sure we don't keep it around if they do + clients.remove(clientKey, client) + } + + client + } + +} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientProviderImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientProviderImpl.scala index 1798cdefb..747c4fd63 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientProviderImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/http/HttpClientProviderImpl.scala @@ -71,7 +71,8 @@ private[akka] final class HttpClientProviderImpl( case NonFatal(ex) => throw new RuntimeException( s"Failed to look up service [$name] in dev-mode, make sure that it is also running " + - "with a separate port and service name correctly defined in its application.conf under 'akka.javasdk.dev-mode.service-name'", + "with a separate port and service name correctly defined in its application.conf under 'akka.javasdk.dev-mode.service-name' " + + "if it differs from the maven project name.", ex) } } else { diff --git a/docs/src/modules/java/pages/component-and-service-calls.adoc b/docs/src/modules/java/pages/component-and-service-calls.adoc index 3fc199bd7..14b914594 100644 --- a/docs/src/modules/java/pages/component-and-service-calls.adoc +++ b/docs/src/modules/java/pages/component-and-service-calls.adoc @@ -31,7 +31,11 @@ The component client is available for injection only in Service Setup, Endpoints == Akka services -Calling other Akka services in the same project is done by invoking them using an HTTP client. The service is identified by the name it has been deployed. Akka takes care of routing requests to the service and keeping the data safe by encrypting the connection and handling authentication for you. +Calling other Akka services in the same project is done by invoking them using an HTTP or a GRPC client depending on what type +of endpoints the service provides. + +=== Over HTTP +The service is identified by the name it has been deployed. Akka takes care of routing requests to the service and keeping the data safe by encrypting the connection and handling authentication for you. In this sample we will make an action that does a call to the xref:key-value-entities.adoc[Key Value Entity Counter] service, deployed with the service name `counter`. @@ -50,9 +54,9 @@ include::java:example$doc-snippets/src/main/java/com/example/callanotherservice/ <4> Invoking the call will return a `CompletionStage>` with details about the result as well as the deserialized response body. <5> Handle the response, which may be successful, or an error -The HTTP client provider client is only available for injection in the following types of components: HTTP Endpoints, Workflows, Consumers and Timed Actions. +The HTTP client provider is only available for injection in the following types of components: HTTP Endpoints, gRPC endpoints, Workflows, Consumers and Timed Actions. -== External services +=== External HTTP services Calling HTTP services deployed on *different* Akka projects or any other external HTTP server is also done with the `HttpClientProvider`. Instead of a service name, the protocol and full server name is used when calling `httpClientFor`. For example `https://example.com` or `http://example.com`. @@ -67,3 +71,49 @@ include::java:example$doc-snippets/src/main/java/com/example/callanotherservice/ <4> Specify a class to parse the response body into <5> Once the call completes, handle the response. <6> Return an adapted result object which will be turned into a JSON response. + +=== Over gRPC +The service is identified by the name it has been deployed. Akka takes care of routing requests to the service and keeping the data safe by encrypting the connection and handling authentication for you. + +ifdef::todo[TODO: a more realistic sample here, possibly aligned with the HTTP sample above] +ifdef::todo[TODO: cover streaming calls with an example?] + +In this sample we will make an action that does a call to the xref:grpc-endpoints.adoc[gRPC endpoints example] service, deployed with the service name `doc-snippets`. + +The SDK provides `akka.javasdk.grpc.GrpcClientProvider` which provides gRPC client instances for calling other services. + +To consume a gRPC service, the service protobuf descriptor must be added in the `src/proto` directory of the project, this +triggers generation of a client interface and Java classes for all the message types used as requests and responses for +methods in the service. + +In our delegating service implementation: + +[source,java,indent=0] +.{sample-base-url}/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpoint.java[DelegatingGrpcEndpoint.java] +---- +include::java:example$doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpoint.java[tag=delegating-endpoint] +---- +<1> Accept a `GrpcClientProvider` parameter for the constructor +<2> Use the generated gRPC client interface for the service `ExampleGrpcEndpointClient.class` and the service name `doc-snippets` to look up a client. +<3> Use the client to call the other service and return a `CompletionStage` + +Since the called service and the `DelegatingGrpcEndpoint` share request and response protocol, no further transformation +of the request or response is needed here. + +The gRPC client provider is only available for injection in the following types of components: HTTP Endpoints, gRPC endpoints, Workflows, Consumers and Timed Actions. + +=== External gRPC services + +Calling gRPC services deployed on *different* Akka projects or any other external gRPC server is also done with the `GrpcClientProvider`. Instead of a service name, the protocol and full server name is used when calling `grpcClient`. For example `https://example.com` or `http://example.com`. + +[source,java,indent=0] +.{sample-base-url}/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java[CallExternalGrpcEndpointImpl.java] +---- +include::java:example$doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java[tag=call-external-endpoint] +---- +<1> Accept a `GrpcClientProvider` parameter for the constructor +<2> Use the generated gRPC client interface for the service `ExampleGrpcEndpointClient.class` and the service name `doc-snippets` to look up a client. `ExampleGrpcEndpointClient.class`. +<3> Use the client to call the other service and return a `CompletionStage` + +Since the called service and the `DelegatingGrpcEndpoint` share request and response protocol, no further transformation +of the request or response is needed here. diff --git a/docs/src/modules/java/pages/setup-and-dependency-injection.adoc b/docs/src/modules/java/pages/setup-and-dependency-injection.adoc index ac1cc1cef..cbd174059 100644 --- a/docs/src/modules/java/pages/setup-and-dependency-injection.adoc +++ b/docs/src/modules/java/pages/setup-and-dependency-injection.adoc @@ -43,7 +43,7 @@ The Akka SDK provides injection of types related to capabilities the SDK provide Injection is done as constructor parameters for the component implementation class. -The following types can be injected in Service Setup, Endpoints, Consumers, Timed Actions, and Workflows: +The following types can be injected in Service Setup, HTTP Endpoints, gRPC Endpoints, Consumers, Timed Actions, and Workflows: [options="header" cols="1,1"] |=== @@ -54,6 +54,8 @@ The following types can be injected in Service Setup, Endpoints, Consumers, Time | for interaction between components, see xref:component-and-service-calls.adoc[] | `akka.javasdk.http.HttpClientProvider` | for creating clients to make calls between Akka services and also to other HTTP servers, see xref:component-and-service-calls.adoc[] +| `akka.javasdk.grpc.GrpcClientProvider` +| for creating clients to make calls between Akka services and also to other gRPC servers, see xref:component-and-service-calls.adoc[] | `akka.javasdk.timer.TimerScheduler` | for scheduling timed actions, see xref:timed-actions.adoc[] | `akka.stream.Materializer` diff --git a/samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java b/samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java new file mode 100644 index 000000000..32660ab3f --- /dev/null +++ b/samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java @@ -0,0 +1,28 @@ +package com.example.callanotherservice; + +import akka.javasdk.annotations.Acl; +import akka.javasdk.annotations.GrpcEndpoint; +import akka.javasdk.grpc.GrpcClientProvider; +import com.example.grpc.CallExternalGrpcEndpoint; +import com.example.grpc.ExampleGrpcEndpointClient; +import com.example.grpc.HelloReply; +import com.example.grpc.HelloRequest; + +import java.util.concurrent.CompletionStage; + +// tag::call-external-endpoint[] +@GrpcEndpoint +@Acl(allow = @Acl.Matcher(principal = Acl.Principal.ALL)) +public class CallExternalGrpcEndpointImpl implements CallExternalGrpcEndpoint { + private final ExampleGrpcEndpointClient external; + + public CallExternalGrpcEndpointImpl(GrpcClientProvider clientProvider) { // <1> + external = clientProvider.grpcClientFor(ExampleGrpcEndpointClient.class, "hellogrpc.example.com"); // <2> + } + + @Override + public CompletionStage callExternalService(HelloRequest in) { + return external.sayHello(in); // <3> + } +} +// end::call-external-endpoint[] diff --git a/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java b/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java new file mode 100644 index 000000000..047a882f4 --- /dev/null +++ b/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java @@ -0,0 +1,30 @@ +package com.example.callanotherservice; + +import akka.javasdk.annotations.Acl; +import akka.javasdk.annotations.GrpcEndpoint; +import akka.javasdk.grpc.GrpcClientProvider; +import com.example.grpc.DelegatingGrpcEndpoint; +import com.example.grpc.ExampleGrpcEndpointClient; +import com.example.grpc.HelloReply; +import com.example.grpc.HelloRequest; + +import java.util.concurrent.CompletionStage; + +// tag::delegating-endpoint[] +@GrpcEndpoint +@Acl(allow = @Acl.Matcher(principal = Acl.Principal.ALL)) +public class DelegatingGrpcEndpointImpl implements DelegatingGrpcEndpoint { + + private final ExampleGrpcEndpointClient akkaService; + + public DelegatingGrpcEndpointImpl(GrpcClientProvider clientProvider) { // <1> + akkaService = clientProvider.grpcClientFor(ExampleGrpcEndpointClient.class, "doc-snippets"); // <2> + } + + @Override + public CompletionStage callAkkaService(HelloRequest in) { + return akkaService.sayHello(in); // <3> + } + +} +// end::delegating-endpoint[] diff --git a/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingServiceEndpoint.java b/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingServiceEndpoint.java index 9662cfe45..016e83523 100644 --- a/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingServiceEndpoint.java +++ b/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingServiceEndpoint.java @@ -5,7 +5,6 @@ import akka.javasdk.annotations.http.Post; import akka.javasdk.http.HttpClient; import akka.javasdk.http.HttpClientProvider; -import akka.javasdk.http.StrictResponse; import java.util.concurrent.CompletionStage; diff --git a/samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto b/samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto new file mode 100644 index 000000000..92865e602 --- /dev/null +++ b/samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +import "com/example/example_grpc_endpoint.proto"; + +option java_multiple_files = true; +option java_package = "com.example.grpc"; +option java_outer_classname = "DelegationExampleGrpc"; + +package com.example; + +// Note: re-using the ExampleGrpcEndpoint protocol +service DelegatingGrpcEndpoint { + rpc CallAkkaService (HelloRequest) returns (HelloReply) {} +} + +service CallExternalGrpcEndpoint { + rpc CallExternalService(HelloRequest) returns (HelloReply) {} +} + From 5088754086a8d945e477597fa48e7bd971a1d27b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 24 Jan 2025 15:44:20 +0100 Subject: [PATCH 2/4] fromatting --- .../java/akka/javasdk/http/HttpClientProvider.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java b/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java index 53a4e23a2..d757a99b4 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java +++ b/akka-javasdk/src/main/java/akka/javasdk/http/HttpClientProvider.java @@ -15,13 +15,14 @@ public interface HttpClientProvider { /** * Returns a {@link HttpClient} to interact with the specified HTTP service. * - *

If the serviceName is a service name without protocol or domain the client will be configured to connect to - * another service deployed with that name on the same Akka project. The runtime will take care of - * routing requests to the service and keeping the data safe by encrypting the connection between services and identifying - * the client as coming from this service. + *

If the serviceName is a service name without protocol or domain the client will be + * configured to connect to another service deployed with that name on the same Akka project. The + * runtime will take care of routing requests to the service and keeping the data safe by + * encrypting the connection between services and identifying the client as coming from this + * service. * - *

If it is a full dns name prefixed with "http://" or "https://" it will connect to services available on the - * public internet. + *

If it is a full dns name prefixed with "http://" or "https://" it will connect to services + * available on the public internet. */ HttpClient httpClientFor(String serviceName); } From c16feb63e48f779e0427ce805edffc2a98079e27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 24 Jan 2025 15:51:17 +0100 Subject: [PATCH 3/4] wrong DI interface, wrong proto package --- .../src/main/scala/akka/javasdk/impl/SdkRunner.scala | 10 +++++----- .../CallExternalGrpcEndpointImpl.java | 8 ++++---- .../callanotherservice/DelegatingGrpcEndpointImpl.java | 8 ++++---- .../main/proto/com/example/call_grpc_endpoint.proto | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 968b6c6e1..2a797d932 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -600,11 +600,11 @@ private final class Sdk( // Note: config is also always available through the combination with user DI way down below private def sideEffectingComponentInjects(span: Option[Span]): PartialFunction[Class[_], Any] = { // remember to update component type API doc and docs if changing the set of injectables - case p if p == classOf[ComponentClient] => componentClient(span) - case h if h == classOf[HttpClientProvider] => httpClientProvider(span) - case g if g == classOf[GrpcClientProviderImpl] => grpcClientProvider // FIXME trace propagation - case t if t == classOf[TimerScheduler] => timerScheduler(span) - case m if m == classOf[Materializer] => sdkMaterializer + case p if p == classOf[ComponentClient] => componentClient(span) + case h if h == classOf[HttpClientProvider] => httpClientProvider(span) + case g if g == classOf[GrpcClientProvider] => grpcClientProvider // FIXME trace propagation + case t if t == classOf[TimerScheduler] => timerScheduler(span) + case m if m == classOf[Materializer] => sdkMaterializer } val spiComponents: SpiComponents = { diff --git a/samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java b/samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java index 32660ab3f..767b323ec 100644 --- a/samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java +++ b/samples/doc-snippets/src/main/java/com/example/callanotherservice/CallExternalGrpcEndpointImpl.java @@ -3,10 +3,10 @@ import akka.javasdk.annotations.Acl; import akka.javasdk.annotations.GrpcEndpoint; import akka.javasdk.grpc.GrpcClientProvider; -import com.example.grpc.CallExternalGrpcEndpoint; -import com.example.grpc.ExampleGrpcEndpointClient; -import com.example.grpc.HelloReply; -import com.example.grpc.HelloRequest; +import com.example.proto.CallExternalGrpcEndpoint; +import com.example.proto.ExampleGrpcEndpointClient; +import com.example.proto.HelloReply; +import com.example.proto.HelloRequest; import java.util.concurrent.CompletionStage; diff --git a/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java b/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java index 047a882f4..44c869340 100644 --- a/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java +++ b/samples/doc-snippets/src/main/java/com/example/callanotherservice/DelegatingGrpcEndpointImpl.java @@ -3,10 +3,10 @@ import akka.javasdk.annotations.Acl; import akka.javasdk.annotations.GrpcEndpoint; import akka.javasdk.grpc.GrpcClientProvider; -import com.example.grpc.DelegatingGrpcEndpoint; -import com.example.grpc.ExampleGrpcEndpointClient; -import com.example.grpc.HelloReply; -import com.example.grpc.HelloRequest; +import com.example.proto.DelegatingGrpcEndpoint; +import com.example.proto.ExampleGrpcEndpointClient; +import com.example.proto.HelloReply; +import com.example.proto.HelloRequest; import java.util.concurrent.CompletionStage; diff --git a/samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto b/samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto index 92865e602..9e4c3959e 100644 --- a/samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto +++ b/samples/doc-snippets/src/main/proto/com/example/call_grpc_endpoint.proto @@ -3,7 +3,7 @@ syntax = "proto3"; import "com/example/example_grpc_endpoint.proto"; option java_multiple_files = true; -option java_package = "com.example.grpc"; +option java_package = "com.example.proto"; option java_outer_classname = "DelegationExampleGrpc"; package com.example; From ef2bf2b8134ef561cddbd9b6008fed99d8887a89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 24 Jan 2025 17:09:16 +0100 Subject: [PATCH 4/4] Trying to get override config for external services working, no success --- akka-javasdk/src/main/resources/reference.conf | 9 +++++++++ .../main/scala/akka/javasdk/impl/SdkRunner.scala | 16 ++++++++-------- .../impl/grpc/GrpcClientProviderImpl.scala | 14 +++++++++++--- .../src/main/resources/application.conf | 7 +++++++ 4 files changed, 35 insertions(+), 11 deletions(-) diff --git a/akka-javasdk/src/main/resources/reference.conf b/akka-javasdk/src/main/resources/reference.conf index 67f30f081..b32302510 100644 --- a/akka-javasdk/src/main/resources/reference.conf +++ b/akka-javasdk/src/main/resources/reference.conf @@ -86,6 +86,15 @@ akka.javasdk { pass-along-env-allow = [] } + grpc.client { + # Specify entries for the full service DNS names to apply + # customizations for interacting with external gRPC services + # Fixme specify what settings are possible right here + # "com.example" { + # use-tls = true + # } + } + telemetry { tracing { collector-endpoint = "" diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 2a797d932..4bb4ea285 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -341,21 +341,21 @@ private final class Sdk( remoteIdentification.map(ri => RawHeader(ri.headerName, ri.headerValue)), sdkSettings) - private lazy val grpcClientProvider = new GrpcClientProviderImpl( - system, - sdkSettings, - applicationConfig, - remoteIdentification.map(ri => GrpcClientProviderImpl.AuthHeaders(ri.headerName, ri.headerValue))) - private lazy val userServiceConfig = { // hiding these paths from the config provided to user val sensitivePaths = List("akka", "kalix.meta", "kalix.proxy", "kalix.runtime", "system") - val sdkConfig = applicationConfig.getConfig("akka.javasdk") + val sdkConfig = applicationConfig.getObject("akka.javasdk") sensitivePaths .foldLeft(applicationConfig) { (conf, toHide) => conf.withoutPath(toHide) } - .withFallback(sdkConfig) + .withValue("akka.javasdk", sdkConfig) } + private lazy val grpcClientProvider = new GrpcClientProviderImpl( + system, + sdkSettings, + userServiceConfig, + remoteIdentification.map(ri => GrpcClientProviderImpl.AuthHeaders(ri.headerName, ri.headerValue))) + // validate service classes before instantiating private val validation = componentClasses.foldLeft(Valid: Validation) { case (validations, cls) => validations ++ Validations.validate(cls) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala index 0ac01f408..36a311170 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/grpc/GrpcClientProviderImpl.scala @@ -16,6 +16,7 @@ import akka.javasdk.grpc.GrpcClientProvider import akka.javasdk.impl.Settings import akka.javasdk.impl.grpc.GrpcClientProviderImpl.AuthHeaders import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory import org.slf4j.LoggerFactory import java.util.concurrent.ConcurrentHashMap @@ -66,7 +67,7 @@ private[akka] object GrpcClientProviderImpl { private[akka] final class GrpcClientProviderImpl( system: ActorSystem[_], settings: Settings, - applicationConfig: Config, + userServiceConfig: Config, remoteIdentificationHeader: Option[AuthHeaders]) extends GrpcClientProvider { import GrpcClientProviderImpl._ @@ -92,7 +93,8 @@ private[akka] final class GrpcClientProviderImpl( // getGrpcClient(serviceClass, service, port, Some("impersonate-kalix-service" -> impersonate)) private def createNewClientFor(clientKey: ClientKey): AkkaGrpcClient = { - val clientSettings = + val clientSettings = { + // FIXME the old impl would look in config first and always choose that if present if (isAkkaService(clientKey.serviceName)) { val akkaServiceClientSettings = if (settings.devModeSettings.isDefined) { // local service discovery when running locally @@ -139,8 +141,14 @@ private[akka] final class GrpcClientProviderImpl( // FIXME we should probably not allow any grpc client setting but a subset? // external service, details defined in user config - GrpcClientSettings.fromConfig(clientKey.serviceName, applicationConfig)(system) + GrpcClientSettings.fromConfig( + clientKey.serviceName, + userServiceConfig + .getConfig("akka.javasdk.grpc.client") + // this config overload requires there to be an entry for the name, but then falls back to defaults + .withFallback(ConfigFactory.parseString(s""""${clientKey.serviceName}" = {}""")))(system) } + } // Java API - static create val create = diff --git a/samples/doc-snippets/src/main/resources/application.conf b/samples/doc-snippets/src/main/resources/application.conf index 0138d0d17..a1990a69e 100644 --- a/samples/doc-snippets/src/main/resources/application.conf +++ b/samples/doc-snippets/src/main/resources/application.conf @@ -1,4 +1,11 @@ my-app { some-feature-flag = true environment = "test" +} + +akka.javasdk.grpc.client."hellogrpc.example.com" { + # configure external call, to call back to self + host = "localhost" + port = 9000 + use-tls = false } \ No newline at end of file