Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gRPC client provider for cross service and external calls #173

Draft
wants to merge 4 commits into
base: grpc-endpoints
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.grpc;

import akka.annotation.DoNotInherit;
import akka.grpc.javadsl.AkkaGrpcClient;

/**
* Not for user extension, instances are available through injection in selected component types.
*/
@DoNotInherit
public interface GrpcClientProvider {
/**
* Returns a gRPC client to interact with the specified service.
*
* <p>If the serviceName is a service name without protocol or domain the client will be configured to connect to
* another service deployed with that name on the same Akka project. The runtime will take care of
* routing requests to the service and keeping the data safe by encrypting the connection between services and identifying
* the client as coming from this service.
*
* <p>If it is a full dns name prefixed with "http://" or "https://" it will connect to services available on the
* public internet.
*
* <p>The client lifecycle is managed by the runtime, service code should not call {@code close() on the client.
*
* // FIXME what about powerapi client interface for request headers?
* @param serviceClass A gRPC service client interface generated by Akka gRPC
*/
<T extends AkkaGrpcClient> T grpcClientFor(Class<T> serviceClass, String serviceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
public interface HttpClientProvider {

/**
* Returns a {@link HttpClient} configured to connect to another service deployed on the same
* project.
* Returns a {@link HttpClient} to interact with the specified HTTP service.
*
* <p>The service is identified only by the name it has been deployed. The runtime takes care of
* routing requests to the service and keeping the data safe by encrypting the connection.
* <p>If the serviceName is a service name without protocol or domain the client will be
* configured to connect to another service deployed with that name on the same Akka project. The
* runtime will take care of routing requests to the service and keeping the data safe by
* encrypting the connection between services and identifying the client as coming from this
* service.
*
* <p>If it is a full dns name prefixed with "http://" or "https://" it will connect to services
* available on the public internet.
*/
HttpClient httpClientFor(String serviceName);
}
9 changes: 9 additions & 0 deletions akka-javasdk/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ akka.javasdk {
pass-along-env-allow = []
}

grpc.client {
# Specify entries for the full service DNS names to apply
# customizations for interacting with external gRPC services
# Fixme specify what settings are possible right here
# "com.example" {
# use-tls = true
# }
}

telemetry {
tracing {
collector-endpoint = ""
Expand Down
41 changes: 26 additions & 15 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import akka.javasdk.client.ComponentClient
import akka.javasdk.consumer.Consumer
import akka.javasdk.eventsourcedentity.EventSourcedEntity
import akka.javasdk.eventsourcedentity.EventSourcedEntityContext
import akka.javasdk.grpc.GrpcClientProvider
import akka.javasdk.http.AbstractHttpEndpoint
import akka.javasdk.http.HttpClientProvider
import akka.javasdk.http.RequestContext
Expand All @@ -51,6 +52,7 @@ import akka.javasdk.impl.Validations.Validation
import akka.javasdk.impl.client.ComponentClientImpl
import akka.javasdk.impl.consumer.ConsumerImpl
import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl
import akka.javasdk.impl.grpc.GrpcClientProviderImpl
import akka.javasdk.impl.http.HttpClientProviderImpl
import akka.javasdk.impl.http.JwtClaimsImpl
import akka.javasdk.impl.keyvalueentity.KeyValueEntityImpl
Expand Down Expand Up @@ -290,6 +292,18 @@ private[javasdk] object Sdk {
dependencyProvider: Option[DependencyProvider],
httpClientProvider: HttpClientProvider,
serializer: JsonSerializer)

private val platformManagedDependency = Set[Class[_]](
classOf[ComponentClient],
classOf[TimerScheduler],
classOf[HttpClientProvider],
classOf[GrpcClientProvider],
classOf[Tracer],
classOf[Span],
classOf[Config],
classOf[WorkflowContext],
classOf[EventSourcedEntityContext],
classOf[KeyValueEntityContext])
}

/**
Expand All @@ -308,6 +322,8 @@ private final class Sdk(
disabledComponents: Set[Class[_]],
startedPromise: Promise[StartupContext],
serviceNameOverride: Option[String]) {
import Sdk._

private val logger = LoggerFactory.getLogger(getClass)
private val serializer = new JsonSerializer
private val ComponentLocator.LocatedClasses(componentClasses, maybeServiceClass) =
Expand All @@ -319,7 +335,7 @@ private final class Sdk(

private val sdkTracerFactory = () => tracerFactory(TraceInstrumentation.InstrumentationScopeName)

private val httpClientProvider = new HttpClientProviderImpl(
private lazy val httpClientProvider = new HttpClientProviderImpl(
system,
None,
remoteIdentification.map(ri => RawHeader(ri.headerName, ri.headerValue)),
Expand All @@ -328,12 +344,18 @@ private final class Sdk(
private lazy val userServiceConfig = {
// hiding these paths from the config provided to user
val sensitivePaths = List("akka", "kalix.meta", "kalix.proxy", "kalix.runtime", "system")
val sdkConfig = applicationConfig.getConfig("akka.javasdk")
val sdkConfig = applicationConfig.getObject("akka.javasdk")
sensitivePaths
.foldLeft(applicationConfig) { (conf, toHide) => conf.withoutPath(toHide) }
.withFallback(sdkConfig)
.withValue("akka.javasdk", sdkConfig)
}

private lazy val grpcClientProvider = new GrpcClientProviderImpl(
system,
sdkSettings,
userServiceConfig,
remoteIdentification.map(ri => GrpcClientProviderImpl.AuthHeaders(ri.headerName, ri.headerValue)))

// validate service classes before instantiating
private val validation = componentClasses.foldLeft(Valid: Validation) { case (validations, cls) =>
validations ++ Validations.validate(cls)
Expand Down Expand Up @@ -580,6 +602,7 @@ private final class Sdk(
// remember to update component type API doc and docs if changing the set of injectables
case p if p == classOf[ComponentClient] => componentClient(span)
case h if h == classOf[HttpClientProvider] => httpClientProvider(span)
case g if g == classOf[GrpcClientProvider] => grpcClientProvider // FIXME trace propagation
case t if t == classOf[TimerScheduler] => timerScheduler(span)
case m if m == classOf[Materializer] => sdkMaterializer
}
Expand Down Expand Up @@ -775,18 +798,6 @@ private final class Sdk(
}
}

private def platformManagedDependency(anyOther: Class[_]) = {
anyOther == classOf[ComponentClient] ||
anyOther == classOf[TimerScheduler] ||
anyOther == classOf[HttpClientProvider] ||
anyOther == classOf[Tracer] ||
anyOther == classOf[Span] ||
anyOther == classOf[Config] ||
anyOther == classOf[WorkflowContext] ||
anyOther == classOf[EventSourcedEntityContext] ||
anyOther == classOf[KeyValueEntityContext]
}

private def componentClient(openTelemetrySpan: Option[Span]): ComponentClient = {
ComponentClientImpl(runtimeComponentClients, serializer, openTelemetrySpan)(sdkExecutionContext)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.impl.grpc

import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.actor.CoordinatedShutdown
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.discovery.Discovery
import akka.grpc.GrpcClientSettings
import akka.grpc.javadsl.AkkaGrpcClient
import akka.javasdk.grpc.GrpcClientProvider
import akka.javasdk.impl.Settings
import akka.javasdk.impl.grpc.GrpcClientProviderImpl.AuthHeaders
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executor
import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.util.control.NonFatal

/**
* INTERNAL API
*/
@InternalApi
private[akka] object GrpcClientProviderImpl {
final case class AuthHeaders(headerName: String, headerValue: String)
private final case class ClientKey(clientClass: Class[_], serviceName: String)

private def isAkkaService(serviceName: String): Boolean = !(serviceName.contains('.') || serviceName.contains(':'))

@nowarn("msg=deprecated")
private def settingsWithCallCredentials(key: String, value: String)(
settings: GrpcClientSettings): GrpcClientSettings = {

import io.grpc.{ CallCredentials, Metadata }
val headers = new Metadata()
headers.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
settings.withCallCredentials(new CallCredentials {
override def applyRequestMetadata(
requestInfo: CallCredentials.RequestInfo,
appExecutor: Executor,
applier: CallCredentials.MetadataApplier): Unit = {
applier.apply(headers)
}
override def thisUsesUnstableApi(): Unit = ()
})

}

}

/**
* INTERNAL API
*/
@InternalApi
private[akka] final class GrpcClientProviderImpl(
system: ActorSystem[_],
settings: Settings,
userServiceConfig: Config,
remoteIdentificationHeader: Option[AuthHeaders])
extends GrpcClientProvider {
import GrpcClientProviderImpl._
import system.executionContext

private val log = LoggerFactory.getLogger(classOf[GrpcClientProvider])

private val clients = new ConcurrentHashMap[ClientKey, AkkaGrpcClient]()

CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseServiceStop, "stop-grpc-clients")(() =>
Future
.traverse(clients.values().asScala)(_.close().asScala)
.map(_ => Done))

override def grpcClientFor[T <: AkkaGrpcClient](serviceClass: Class[T], serviceName: String): T = {
val clientKey = ClientKey(serviceClass, serviceName)
clients.computeIfAbsent(clientKey, createNewClientFor _).asInstanceOf[T]
}

// FIXME for testkit
// /** This gets called by the testkit, and should impersonate the given principal. */
// def impersonatingGrpcClient[T <: AkkaGrpcClient](serviceClass: Class[T], service: String, port: Int, impersonate: String): T =
// getGrpcClient(serviceClass, service, port, Some("impersonate-kalix-service" -> impersonate))

private def createNewClientFor(clientKey: ClientKey): AkkaGrpcClient = {
val clientSettings = {
// FIXME the old impl would look in config first and always choose that if present
if (isAkkaService(clientKey.serviceName)) {
val akkaServiceClientSettings = if (settings.devModeSettings.isDefined) {
// local service discovery when running locally
// dev mode, other service name, use Akka discovery to find it
// the runtime has set up a mechanism that finds locally running
// services. Since in dev mode blocking is probably fine for now.
try {
val result = Await.result(Discovery(system).discovery.lookup(clientKey.serviceName, 5.seconds), 5.seconds)
val address = result.addresses.head
// port is always set
val port = address.port.get
log.debug(
"Creating dev mode gRPC client for Akka service [{}] found at [{}:{}]",
clientKey.serviceName,
address.address,
port)
GrpcClientSettings
.connectToServiceAt(address.host, port)(system)
// (No TLS locally)
.withTls(false)
} catch {
case NonFatal(ex) =>
throw new RuntimeException(
s"Failed to look up service [${clientKey.serviceName}] in dev-mode, make sure that it is also running " +
"with a separate port and service name correctly defined in its application.conf under 'akka.javasdk.dev-mode.service-name' " +
"if it differs from the maven project name",
ex)
}
} else {
log.debug("Creating gRPC client for Akka service [{}]", clientKey.serviceName)
GrpcClientSettings
.connectToServiceAt(clientKey.serviceName, 80)(system)
// (TLS is handled for us by Kalix infra)
.withTls(false)
}

remoteIdentificationHeader match {
case Some(auth) => settingsWithCallCredentials(auth.headerName, auth.headerValue)(akkaServiceClientSettings)
case None => akkaServiceClientSettings
}
} else {
// external/public gRPC service
log.debug("Creating gRPC client for external service [{}]", clientKey.serviceName)

// FIXME we should probably not allow any grpc client setting but a subset?
// external service, details defined in user config
GrpcClientSettings.fromConfig(
clientKey.serviceName,
userServiceConfig
.getConfig("akka.javasdk.grpc.client")
// this config overload requires there to be an entry for the name, but then falls back to defaults
.withFallback(ConfigFactory.parseString(s""""${clientKey.serviceName}" = {}""")))(system)
}
}

// Java API - static create
val create =
clientKey.clientClass.getMethod("create", classOf[GrpcClientSettings], classOf[ClassicActorSystemProvider])
val client = create.invoke(null, clientSettings, system).asInstanceOf[AkkaGrpcClient]

client.closed().asScala.foreach { _ =>
// user should not close client, but just to be sure we don't keep it around if they do
clients.remove(clientKey, client)
}

client
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private[akka] final class HttpClientProviderImpl(
case NonFatal(ex) =>
throw new RuntimeException(
s"Failed to look up service [$name] in dev-mode, make sure that it is also running " +
"with a separate port and service name correctly defined in its application.conf under 'akka.javasdk.dev-mode.service-name'",
"with a separate port and service name correctly defined in its application.conf under 'akka.javasdk.dev-mode.service-name' " +
"if it differs from the maven project name.",
ex)
}
} else {
Expand Down
Loading
Loading