Skip to content

Commit

Permalink
Register protobuf message descriptors for deserialization, #698 (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Oct 10, 2022
1 parent 473b184 commit 64858b3
Show file tree
Hide file tree
Showing 22 changed files with 574 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
GrpcReadJournal(
GrpcQuerySettings(
streamId = streamId,
protoClassMapping = Map.empty,
additionalRequestMetadata = Some(new MetadataBuilder().addText("x-secret", "top_secret").build())),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", grpcPort)
.withTls(false)),
.withTls(false),
protobufDescriptors = Nil),
// FIXME: error prone that it needs to be passed both to GrpcReadJournal and here?
// but on the consuming side we don't know about the producing side entity types
streamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
lazy val entity = spawn(TestEntity(pid))

lazy val grpcReadJournal = GrpcReadJournal(
GrpcQuerySettings(streamId, Map.empty, None),
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")))
GrpcQuerySettings(streamId, None),
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
protobufDescriptors = Nil)
}

override protected def beforeAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
lazy val entity = spawn(TestEntity(pid))

lazy val grpcReadJournal = GrpcReadJournal(
GrpcQuerySettings(streamId, Map.empty, None),
GrpcQuerySettings(streamId, None),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
.withTls(false))
.withTls(false),
protobufDescriptors = Nil)
}

override protected def beforeAll(): Unit = {
Expand Down
5 changes: 0 additions & 5 deletions akka-projection-grpc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ akka.projection.grpc {
# exposed by the producing/publishing side
stream-id = ""

# Mapping between full Protobuf message names and Java/Scala class names that are used when deserializing
# Protobuf events.
proto-class-mapping {
}

# Pass these additional request headers as string values in each request to the producer
# can be used for example for authorization in combination with an interceptor in the producer.
# Example "x-auth-header": "secret"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

package akka.projection.grpc.consumer

import java.util.Optional

import scala.compat.java8.OptionConverters._

import akka.annotation.ApiMayChange
import akka.grpc.scaladsl.Metadata
import akka.grpc.scaladsl.MetadataBuilder
import com.typesafe.config.Config
import akka.util.ccompat.JavaConverters._

import java.util.Optional
import scala.compat.java8.OptionConverters._

@ApiMayChange
object GrpcQuerySettings {
Expand All @@ -21,14 +21,6 @@ object GrpcQuerySettings {
streamId != "",
"Configuration property [stream-id] must be an id exposed by the producing side but was undefined on the consuming side.")

val protoClassMapping: Map[String, String] = {
import scala.jdk.CollectionConverters._
config.getConfig("proto-class-mapping").root.unwrapped.asScala.toMap.map {
case (k, v) => k -> v.toString
}

}

val additionalHeaders: Option[Metadata] = {
import scala.jdk.CollectionConverters._
val map = config.getConfig("additional-request-headers").root.unwrapped.asScala.toMap.map {
Expand All @@ -45,47 +37,34 @@ object GrpcQuerySettings {
.build())
}

new GrpcQuerySettings(streamId, protoClassMapping, additionalHeaders)
new GrpcQuerySettings(streamId, additionalHeaders)
}

/**
* Scala API: Programmatic construction of GrpcQuerySettings
*
* @param streamId The stream id to consume
* @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used
* when deserializing Protobuf events.
* @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request
* on the remote side.
*/
def apply(
streamId: String,
protoClassMapping: Map[String, String],
additionalRequestMetadata: Option[Metadata]): GrpcQuerySettings = {
new GrpcQuerySettings(streamId, protoClassMapping, additionalRequestMetadata)
def apply(streamId: String, additionalRequestMetadata: Option[Metadata]): GrpcQuerySettings = {
new GrpcQuerySettings(streamId, additionalRequestMetadata)
}

/**
* Java API: Programmatic construction of GrpcQuerySettings
*
* @param streamId The stream id to consume
* @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used
* when deserializing Protobuf events.
* @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request
* on the remote side.
*/
def create(
streamId: String,
protoClassMapping: java.util.Map[String, String],
additionalRequestMetadata: Optional[akka.grpc.javadsl.Metadata]): GrpcQuerySettings = {
new GrpcQuerySettings(streamId, protoClassMapping.asScala.toMap, additionalRequestMetadata.asScala.map(_.asScala))
def create(streamId: String, additionalRequestMetadata: Optional[akka.grpc.javadsl.Metadata]): GrpcQuerySettings = {
new GrpcQuerySettings(streamId, additionalRequestMetadata.asScala.map(_.asScala))
}
}

@ApiMayChange
final class GrpcQuerySettings(
val streamId: String,
val protoClassMapping: Map[String, String],
val additionalRequestMetadata: Option[Metadata]) {
final class GrpcQuerySettings(val streamId: String, val additionalRequestMetadata: Option[Metadata]) {
require(
streamId != "",
"Configuration property [stream-id] must be an id exposed by the streaming side (but was empty).")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.projection.grpc.consumer

import akka.actor.ExtendedActorSystem
import akka.persistence.query.ReadJournalProvider
import akka.projection.grpc.internal.ProtoAnySerialization
import com.typesafe.config.Config

final class GrpcReadJournalProvider(system: ExtendedActorSystem, config: Config, cfgPath: String)
Expand All @@ -14,5 +15,6 @@ final class GrpcReadJournalProvider(system: ExtendedActorSystem, config: Config,
new scaladsl.GrpcReadJournal(system, config, cfgPath)

override val javadslReadJournal: javadsl.GrpcReadJournal =
new javadsl.GrpcReadJournal(scaladslReadJournal)
new javadsl.GrpcReadJournal(
new scaladsl.GrpcReadJournal(system, config, cfgPath, ProtoAnySerialization.Prefer.Java))
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,47 @@ import java.time.Instant
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.compat.java8.OptionConverters._

import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._

import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.grpc.GrpcClientSettings
import akka.japi.Pair
import akka.persistence.query.Offset
import akka.persistence.query.javadsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.javadsl.EventTimestampQuery
import akka.persistence.query.typed.javadsl.EventsBySliceQuery
import akka.persistence.query.typed.javadsl.LoadEventQuery
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.stream.javadsl.Source
import com.google.protobuf.Descriptors

@ApiMayChange
object GrpcReadJournal {
val Identifier: String = scaladsl.GrpcReadJournal.Identifier

/**
* Construct a gRPC read journal for the given stream-id and explicit `GrpcClientSettings` to control
* how to reach the Akka Projection gRPC producer service (host, port etc).
*/
def create(
system: ClassicActorSystemProvider,
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
protobufDescriptors: java.util.List[Descriptors.FileDescriptor]): GrpcReadJournal = {
import akka.util.ccompat.JavaConverters._
new GrpcReadJournal(scaladsl
.GrpcReadJournal(settings, clientSettings, protobufDescriptors.asScala.toList, ProtoAnySerialization.Prefer.Java)(
system))
}

}

@ApiMayChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ package akka.projection.grpc.consumer.scaladsl

import java.time.Instant
import java.util.concurrent.TimeUnit

import scala.collection.immutable
import scala.concurrent.Future

import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.grpc.GrpcClientSettings
import akka.grpc.scaladsl.SingleResponseRequestBuilder
import akka.grpc.scaladsl.BytesEntry
import akka.grpc.scaladsl.SingleResponseRequestBuilder
import akka.grpc.scaladsl.StreamResponseRequestBuilder
import akka.grpc.scaladsl.StringEntry
import akka.persistence.Persistence
Expand All @@ -31,6 +34,7 @@ import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.persistence.typed.PersistenceId
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.proto
import akka.projection.grpc.internal.proto.Event
Expand All @@ -44,6 +48,7 @@ import akka.projection.grpc.internal.proto.PersistenceIdSeqNr
import akka.projection.grpc.internal.proto.StreamIn
import akka.projection.grpc.internal.proto.StreamOut
import akka.stream.scaladsl.Source
import com.google.protobuf.Descriptors
import com.google.protobuf.timestamp.Timestamp
import com.typesafe.config.Config
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
Expand All @@ -61,18 +66,35 @@ object GrpcReadJournal {
* Construct a gRPC read journal for the given settings and explicit `GrpcClientSettings` to control
* how to reach the Akka Projection gRPC producer service (host, port etc).
*/
def apply(settings: GrpcQuerySettings, clientSettings: GrpcClientSettings)(
implicit system: ClassicActorSystemProvider) = {
def apply(settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor])( // FIXME should we support the scalaDescriptor?
implicit system: ClassicActorSystemProvider): GrpcReadJournal =
apply(settings, clientSettings, protobufDescriptors, ProtoAnySerialization.Prefer.Scala)

val clientSettingsWithOverrides =
// compose with potential user overrides to allow overriding our defaults
clientSettings.withChannelBuilderOverrides(
channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides))
/**
* INTERNAL API
*/
@InternalApi private[akka] def apply(
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings,
protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor],
protobufPrefer: ProtoAnySerialization.Prefer)(implicit system: ClassicActorSystemProvider): GrpcReadJournal = {

// FIXME This probably means that one GrpcReadJournal instance is created for each Projection instance,
// and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings?

val protoAnySerialization =
new ProtoAnySerialization(system.classicSystem.toTyped, protobufDescriptors, protobufPrefer)

new scaladsl.GrpcReadJournal(
system.classicSystem.asInstanceOf[ExtendedActorSystem],
settings,
clientSettingsWithOverrides)
withChannelBuilderOverrides(clientSettings),
protoAnySerialization)
}

private def withChannelBuilderOverrides(clientSettings: GrpcClientSettings): GrpcClientSettings = {
// compose with potential user overrides to allow overriding our defaults
clientSettings.withChannelBuilderOverrides(channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides))
}

private def channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder =
Expand All @@ -86,21 +108,33 @@ object GrpcReadJournal {
final class GrpcReadJournal private (
system: ExtendedActorSystem,
settings: GrpcQuerySettings,
clientSettings: GrpcClientSettings)
clientSettings: GrpcClientSettings,
protoAnySerialization: ProtoAnySerialization)
extends ReadJournal
with EventsBySliceQuery
with EventTimestampQuery
with LoadEventQuery {
import GrpcReadJournal.log

// when used as delegate in javadsl
private[akka] def this(
system: ExtendedActorSystem,
config: Config,
cfgPath: String,
protoAnyPrefer: ProtoAnySerialization.Prefer) =
this(
system,
GrpcQuerySettings(config),
withChannelBuilderOverrides(GrpcClientSettings.fromConfig(config.getConfig("client"))(system)),
// FIXME can/should we load descriptors from config?
new ProtoAnySerialization(system.toTyped, descriptors = Nil, protoAnyPrefer))

// entry point when created through Akka Persistence
def this(system: ExtendedActorSystem, config: Config, cfgPath: String) =
this(system, GrpcQuerySettings(config), GrpcClientSettings.fromConfig(config.getConfig("client"))(system))
this(system, config, cfgPath, ProtoAnySerialization.Prefer.Scala)

private implicit val typedSystem = system.toTyped
private val persistenceExt = Persistence(system)
private val protoAnySerialization =
new ProtoAnySerialization(system.toTyped, settings.protoClassMapping)

private val client = EventProducerServiceClient(clientSettings)
private val additionalRequestHeaders = settings.additionalRequestMetadata match {
Expand Down Expand Up @@ -238,7 +272,7 @@ final class GrpcReadJournal private (
require(streamId == settings.streamId, s"Stream id mismatch, was [$streamId], expected [${settings.streamId}]")
val eventOffset = timestampOffset(event.offset.get)
val evt =
event.payload.map(protoAnySerialization.decode(_).asInstanceOf[Evt])
event.payload.map(protoAnySerialization.deserialize(_).asInstanceOf[Evt])

new EventEnvelope(
eventOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ import scala.annotation.nowarn
s"No events by slices query defined for stream id [${s.streamId}]")
}

private val protoAnySerialization =
new ProtoAnySerialization(system, protoClassMapping = Map.empty)
private val protoAnySerialization = new ProtoAnySerialization(system)

private val streamIdToSourceMap: Map[String, EventProducer.EventProducerSource] =
sources.map(s => s.streamId -> s).toMap
Expand Down Expand Up @@ -207,7 +206,7 @@ import scala.annotation.nowarn

f(event).map {
_.map { transformedEvent =>
val protoEvent = protoAnySerialization.encode(transformedEvent)
val protoEvent = protoAnySerialization.serialize(transformedEvent)
Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), Some(protoEvent))
}
}
Expand Down
Loading

0 comments on commit 64858b3

Please sign in to comment.