From 64858b30724d5f8483e1864741e31a576d4e0f9b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 10 Oct 2022 17:33:18 +0200 Subject: [PATCH] Register protobuf message descriptors for deserialization, #698 (#701) --- .../projection/grpc/IntegrationSpec.scala | 4 +- .../scaladsl/EventTimestampQuerySpec.scala | 5 +- .../scaladsl/LoadEventQuerySpec.scala | 5 +- .../src/main/resources/reference.conf | 5 - .../grpc/consumer/GrpcQuerySettings.scala | 41 +-- .../consumer/GrpcReadJournalProvider.scala | 4 +- .../consumer/javadsl/GrpcReadJournal.scala | 25 +- .../consumer/scaladsl/GrpcReadJournal.scala | 60 +++- .../internal/EventProducerServiceImpl.scala | 5 +- .../grpc/internal/ProtoAnySerialization.scala | 326 +++++++++++++++--- .../grpc/producer/javadsl/EventProducer.scala | 18 +- .../grpc/internal/shoppingcart_api.proto | 50 +++ .../grpc/consumer/GrpcQuerySettingsSpec.scala | 4 - .../internal/ProtoAnySerializationSpec.scala | 108 ++++-- .../internal/TestOffsetStoreAdapter.scala | 2 +- docs/src/main/paradox/grpc.md | 5 +- .../analytics/ShoppingCartEventConsumer.java | 14 +- .../src/main/resources/grpc.conf | 7 - .../build.sbt | 10 +- .../src/main/resources/grpc.conf | 7 - .../analytics/ShoppingCartEventConsumer.scala | 17 +- .../shopping-cart-service-scala/build.sbt | 10 +- 22 files changed, 574 insertions(+), 158 deletions(-) create mode 100644 akka-projection-grpc/src/test/protobuf/akka/projection/grpc/internal/shoppingcart_api.proto diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala index 4c7d9d646..3926cad04 100644 --- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala +++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala @@ -136,11 +136,11 @@ class IntegrationSpec(testContainerConf: TestContainerConf) GrpcReadJournal( GrpcQuerySettings( streamId = streamId, - protoClassMapping = Map.empty, additionalRequestMetadata = Some(new MetadataBuilder().addText("x-secret", "top_secret").build())), GrpcClientSettings .connectToServiceAt("127.0.0.1", grpcPort) - .withTls(false)), + .withTls(false), + protobufDescriptors = Nil), // FIXME: error prone that it needs to be passed both to GrpcReadJournal and here? // but on the consuming side we don't know about the producing side entity types streamId, diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala index a41fb34ac..f20604814 100644 --- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala +++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala @@ -59,8 +59,9 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf) lazy val entity = spawn(TestEntity(pid)) lazy val grpcReadJournal = GrpcReadJournal( - GrpcQuerySettings(streamId, Map.empty, None), - GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client"))) + GrpcQuerySettings(streamId, None), + GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")), + protobufDescriptors = Nil) } override protected def beforeAll(): Unit = { diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala index 7208f6449..04678bf01 100644 --- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala +++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala @@ -61,10 +61,11 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf) lazy val entity = spawn(TestEntity(pid)) lazy val grpcReadJournal = GrpcReadJournal( - GrpcQuerySettings(streamId, Map.empty, None), + GrpcQuerySettings(streamId, None), GrpcClientSettings .connectToServiceAt("127.0.0.1", testContainerConf.grpcPort) - .withTls(false)) + .withTls(false), + protobufDescriptors = Nil) } override protected def beforeAll(): Unit = { diff --git a/akka-projection-grpc/src/main/resources/reference.conf b/akka-projection-grpc/src/main/resources/reference.conf index f9c4d274c..a084844f2 100644 --- a/akka-projection-grpc/src/main/resources/reference.conf +++ b/akka-projection-grpc/src/main/resources/reference.conf @@ -15,11 +15,6 @@ akka.projection.grpc { # exposed by the producing/publishing side stream-id = "" - # Mapping between full Protobuf message names and Java/Scala class names that are used when deserializing - # Protobuf events. - proto-class-mapping { - } - # Pass these additional request headers as string values in each request to the producer # can be used for example for authorization in combination with an interceptor in the producer. # Example "x-auth-header": "secret" diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala index 1943b74af..92e6841d3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcQuerySettings.scala @@ -4,14 +4,14 @@ package akka.projection.grpc.consumer +import java.util.Optional + +import scala.compat.java8.OptionConverters._ + import akka.annotation.ApiMayChange import akka.grpc.scaladsl.Metadata import akka.grpc.scaladsl.MetadataBuilder import com.typesafe.config.Config -import akka.util.ccompat.JavaConverters._ - -import java.util.Optional -import scala.compat.java8.OptionConverters._ @ApiMayChange object GrpcQuerySettings { @@ -21,14 +21,6 @@ object GrpcQuerySettings { streamId != "", "Configuration property [stream-id] must be an id exposed by the producing side but was undefined on the consuming side.") - val protoClassMapping: Map[String, String] = { - import scala.jdk.CollectionConverters._ - config.getConfig("proto-class-mapping").root.unwrapped.asScala.toMap.map { - case (k, v) => k -> v.toString - } - - } - val additionalHeaders: Option[Metadata] = { import scala.jdk.CollectionConverters._ val map = config.getConfig("additional-request-headers").root.unwrapped.asScala.toMap.map { @@ -45,47 +37,34 @@ object GrpcQuerySettings { .build()) } - new GrpcQuerySettings(streamId, protoClassMapping, additionalHeaders) + new GrpcQuerySettings(streamId, additionalHeaders) } /** * Scala API: Programmatic construction of GrpcQuerySettings * * @param streamId The stream id to consume - * @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used - * when deserializing Protobuf events. * @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request * on the remote side. */ - def apply( - streamId: String, - protoClassMapping: Map[String, String], - additionalRequestMetadata: Option[Metadata]): GrpcQuerySettings = { - new GrpcQuerySettings(streamId, protoClassMapping, additionalRequestMetadata) + def apply(streamId: String, additionalRequestMetadata: Option[Metadata]): GrpcQuerySettings = { + new GrpcQuerySettings(streamId, additionalRequestMetadata) } /** * Java API: Programmatic construction of GrpcQuerySettings * * @param streamId The stream id to consume - * @param protoClassMapping Mapping between full Protobuf message names and Java class names that are used - * when deserializing Protobuf events. * @param additionalRequestMetadata Additional request metadata, for authentication/authorization of the request * on the remote side. */ - def create( - streamId: String, - protoClassMapping: java.util.Map[String, String], - additionalRequestMetadata: Optional[akka.grpc.javadsl.Metadata]): GrpcQuerySettings = { - new GrpcQuerySettings(streamId, protoClassMapping.asScala.toMap, additionalRequestMetadata.asScala.map(_.asScala)) + def create(streamId: String, additionalRequestMetadata: Optional[akka.grpc.javadsl.Metadata]): GrpcQuerySettings = { + new GrpcQuerySettings(streamId, additionalRequestMetadata.asScala.map(_.asScala)) } } @ApiMayChange -final class GrpcQuerySettings( - val streamId: String, - val protoClassMapping: Map[String, String], - val additionalRequestMetadata: Option[Metadata]) { +final class GrpcQuerySettings(val streamId: String, val additionalRequestMetadata: Option[Metadata]) { require( streamId != "", "Configuration property [stream-id] must be an id exposed by the streaming side (but was empty).") diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala index fb5472de1..a6e113352 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/GrpcReadJournalProvider.scala @@ -6,6 +6,7 @@ package akka.projection.grpc.consumer import akka.actor.ExtendedActorSystem import akka.persistence.query.ReadJournalProvider +import akka.projection.grpc.internal.ProtoAnySerialization import com.typesafe.config.Config final class GrpcReadJournalProvider(system: ExtendedActorSystem, config: Config, cfgPath: String) @@ -14,5 +15,6 @@ final class GrpcReadJournalProvider(system: ExtendedActorSystem, config: Config, new scaladsl.GrpcReadJournal(system, config, cfgPath) override val javadslReadJournal: javadsl.GrpcReadJournal = - new javadsl.GrpcReadJournal(scaladslReadJournal) + new javadsl.GrpcReadJournal( + new scaladsl.GrpcReadJournal(system, config, cfgPath, ProtoAnySerialization.Prefer.Java)) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala index 28c9f8e11..c80445584 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala @@ -8,11 +8,15 @@ import java.time.Instant import java.util import java.util.Optional import java.util.concurrent.CompletionStage -import scala.compat.java8.OptionConverters._ + import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ + import akka.NotUsed +import akka.actor.ClassicActorSystemProvider import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts +import akka.grpc.GrpcClientSettings import akka.japi.Pair import akka.persistence.query.Offset import akka.persistence.query.javadsl.ReadJournal @@ -20,12 +24,31 @@ import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.javadsl.EventTimestampQuery import akka.persistence.query.typed.javadsl.EventsBySliceQuery import akka.persistence.query.typed.javadsl.LoadEventQuery +import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl +import akka.projection.grpc.internal.ProtoAnySerialization import akka.stream.javadsl.Source +import com.google.protobuf.Descriptors @ApiMayChange object GrpcReadJournal { val Identifier: String = scaladsl.GrpcReadJournal.Identifier + + /** + * Construct a gRPC read journal for the given stream-id and explicit `GrpcClientSettings` to control + * how to reach the Akka Projection gRPC producer service (host, port etc). + */ + def create( + system: ClassicActorSystemProvider, + settings: GrpcQuerySettings, + clientSettings: GrpcClientSettings, + protobufDescriptors: java.util.List[Descriptors.FileDescriptor]): GrpcReadJournal = { + import akka.util.ccompat.JavaConverters._ + new GrpcReadJournal(scaladsl + .GrpcReadJournal(settings, clientSettings, protobufDescriptors.asScala.toList, ProtoAnySerialization.Prefer.Java)( + system)) + } + } @ApiMayChange diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 9c05da444..fb07e2a54 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -6,17 +6,20 @@ package akka.projection.grpc.consumer.scaladsl import java.time.Instant import java.util.concurrent.TimeUnit + import scala.collection.immutable import scala.concurrent.Future + import akka.NotUsed import akka.actor.ClassicActorSystemProvider import akka.actor.ExtendedActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.annotation.ApiMayChange +import akka.annotation.InternalApi import akka.grpc.GrpcClientSettings -import akka.grpc.scaladsl.SingleResponseRequestBuilder import akka.grpc.scaladsl.BytesEntry +import akka.grpc.scaladsl.SingleResponseRequestBuilder import akka.grpc.scaladsl.StreamResponseRequestBuilder import akka.grpc.scaladsl.StringEntry import akka.persistence.Persistence @@ -31,6 +34,7 @@ import akka.persistence.query.typed.scaladsl.LoadEventQuery import akka.persistence.typed.PersistenceId import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl +import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilderOverrides import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.internal.proto import akka.projection.grpc.internal.proto.Event @@ -44,6 +48,7 @@ import akka.projection.grpc.internal.proto.PersistenceIdSeqNr import akka.projection.grpc.internal.proto.StreamIn import akka.projection.grpc.internal.proto.StreamOut import akka.stream.scaladsl.Source +import com.google.protobuf.Descriptors import com.google.protobuf.timestamp.Timestamp import com.typesafe.config.Config import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder @@ -61,18 +66,35 @@ object GrpcReadJournal { * Construct a gRPC read journal for the given settings and explicit `GrpcClientSettings` to control * how to reach the Akka Projection gRPC producer service (host, port etc). */ - def apply(settings: GrpcQuerySettings, clientSettings: GrpcClientSettings)( - implicit system: ClassicActorSystemProvider) = { + def apply(settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor])( // FIXME should we support the scalaDescriptor? + implicit system: ClassicActorSystemProvider): GrpcReadJournal = + apply(settings, clientSettings, protobufDescriptors, ProtoAnySerialization.Prefer.Scala) - val clientSettingsWithOverrides = - // compose with potential user overrides to allow overriding our defaults - clientSettings.withChannelBuilderOverrides( - channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides)) + /** + * INTERNAL API + */ + @InternalApi private[akka] def apply( + settings: GrpcQuerySettings, + clientSettings: GrpcClientSettings, + protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor], + protobufPrefer: ProtoAnySerialization.Prefer)(implicit system: ClassicActorSystemProvider): GrpcReadJournal = { + + // FIXME This probably means that one GrpcReadJournal instance is created for each Projection instance, + // and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings? + + val protoAnySerialization = + new ProtoAnySerialization(system.classicSystem.toTyped, protobufDescriptors, protobufPrefer) new scaladsl.GrpcReadJournal( system.classicSystem.asInstanceOf[ExtendedActorSystem], settings, - clientSettingsWithOverrides) + withChannelBuilderOverrides(clientSettings), + protoAnySerialization) + } + + private def withChannelBuilderOverrides(clientSettings: GrpcClientSettings): GrpcClientSettings = { + // compose with potential user overrides to allow overriding our defaults + clientSettings.withChannelBuilderOverrides(channelBuilderOverrides.andThen(clientSettings.channelBuilderOverrides)) } private def channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = @@ -86,21 +108,33 @@ object GrpcReadJournal { final class GrpcReadJournal private ( system: ExtendedActorSystem, settings: GrpcQuerySettings, - clientSettings: GrpcClientSettings) + clientSettings: GrpcClientSettings, + protoAnySerialization: ProtoAnySerialization) extends ReadJournal with EventsBySliceQuery with EventTimestampQuery with LoadEventQuery { import GrpcReadJournal.log + // when used as delegate in javadsl + private[akka] def this( + system: ExtendedActorSystem, + config: Config, + cfgPath: String, + protoAnyPrefer: ProtoAnySerialization.Prefer) = + this( + system, + GrpcQuerySettings(config), + withChannelBuilderOverrides(GrpcClientSettings.fromConfig(config.getConfig("client"))(system)), + // FIXME can/should we load descriptors from config? + new ProtoAnySerialization(system.toTyped, descriptors = Nil, protoAnyPrefer)) + // entry point when created through Akka Persistence def this(system: ExtendedActorSystem, config: Config, cfgPath: String) = - this(system, GrpcQuerySettings(config), GrpcClientSettings.fromConfig(config.getConfig("client"))(system)) + this(system, config, cfgPath, ProtoAnySerialization.Prefer.Scala) private implicit val typedSystem = system.toTyped private val persistenceExt = Persistence(system) - private val protoAnySerialization = - new ProtoAnySerialization(system.toTyped, settings.protoClassMapping) private val client = EventProducerServiceClient(clientSettings) private val additionalRequestHeaders = settings.additionalRequestMetadata match { @@ -238,7 +272,7 @@ final class GrpcReadJournal private ( require(streamId == settings.streamId, s"Stream id mismatch, was [$streamId], expected [${settings.streamId}]") val eventOffset = timestampOffset(event.offset.get) val evt = - event.payload.map(protoAnySerialization.decode(_).asInstanceOf[Evt]) + event.payload.map(protoAnySerialization.deserialize(_).asInstanceOf[Evt]) new EventEnvelope( eventOffset, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index 0cd095881..0aacd573f 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -77,8 +77,7 @@ import scala.annotation.nowarn s"No events by slices query defined for stream id [${s.streamId}]") } - private val protoAnySerialization = - new ProtoAnySerialization(system, protoClassMapping = Map.empty) + private val protoAnySerialization = new ProtoAnySerialization(system) private val streamIdToSourceMap: Map[String, EventProducer.EventProducerSource] = sources.map(s => s.streamId -> s).toMap @@ -207,7 +206,7 @@ import scala.annotation.nowarn f(event).map { _.map { transformedEvent => - val protoEvent = protoAnySerialization.encode(transformedEvent) + val protoEvent = protoAnySerialization.serialize(transformedEvent) Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), Some(protoEvent)) } } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala index c838884bc..be2aeb22a 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtoAnySerialization.scala @@ -4,18 +4,29 @@ package akka.projection.grpc.internal -import java.lang.reflect.Method +import scala.collection.concurrent.TrieMap +import scala.collection.immutable +import scala.util.Try import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.serialization.SerializationExtension import akka.serialization.Serializers +import akka.util.ccompat.JavaConverters._ +import com.google.common.base.CaseFormat import com.google.protobuf.ByteString +import com.google.protobuf.Descriptors import com.google.protobuf.GeneratedMessageV3 import com.google.protobuf.Message +import com.google.protobuf.Parser import com.google.protobuf.any.{ Any => ScalaPbAny } +import com.google.protobuf.{ Any => JavaPbAny } import com.google.protobuf.{ Any => PbAny } +import org.slf4j.LoggerFactory +import scalapb.GeneratedMessage import scalapb.GeneratedMessageCompanion +import scalapb.options.Scalapb /** * INTERNAL API @@ -24,47 +35,106 @@ import scalapb.GeneratedMessageCompanion final val GoogleTypeUrlPrefix = "type.googleapis.com/" final val AkkaSerializationTypeUrlPrefix = "ser.akka.io/" final val AkkaTypeUrlManifestSeparator = ':' + private final val ProtoAnyTypeUrl = GoogleTypeUrlPrefix + "google.protobuf.Any" + + private val log = LoggerFactory.getLogger(classOf[ProtoAnySerialization]) + + final case class SerializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause) + + /** + * A resolved type + */ + private sealed trait ResolvedType[T] { + + /** + * Parse the given bytes into this type. + */ + def parseFrom(bytes: ByteString): T + + } + + private final class JavaPbResolvedType[T <: Message](parser: Parser[T]) extends ResolvedType[T] { + override def parseFrom(bytes: ByteString): T = parser.parseFrom(bytes) + } + + private final class ScalaPbResolvedType[T <: scalapb.GeneratedMessage]( + companion: scalapb.GeneratedMessageCompanion[_]) + extends ResolvedType[T] { + override def parseFrom(bytes: ByteString): T = companion.parseFrom(bytes.newCodedInput()).asInstanceOf[T] + } + + /** + * When locating protobufs, if both a Java and a ScalaPB generated class is found on the classpath, this says which + * should be preferred. + */ + sealed trait Prefer + final object Prefer { + case object Java extends Prefer + case object Scala extends Prefer + } + + private def flattenDescriptors( + descriptors: Seq[Descriptors.FileDescriptor]): Map[String, Descriptors.FileDescriptor] = + flattenDescriptors(Map.empty, descriptors) + + private def flattenDescriptors( + seenSoFar: Map[String, Descriptors.FileDescriptor], + descriptors: Seq[Descriptors.FileDescriptor]): Map[String, Descriptors.FileDescriptor] = + descriptors.foldLeft(seenSoFar) { + case (results, descriptor) => + val descriptorName = descriptor.getName + if (results.contains(descriptorName)) results + else { + val withDesc = results.updated(descriptorName, descriptor) + flattenDescriptors( + withDesc, + descriptor.getDependencies.asScala.toSeq ++ descriptor.getPublicDependencies.asScala) + } + } - private val ArrayOfByteArray = Array[Class[_]](classOf[Array[Byte]]) } /** * INTERNAL API */ -@InternalApi private[akka] class ProtoAnySerialization(system: ActorSystem[_], protoClassMapping: Map[String, String]) { +@InternalApi private[akka] class ProtoAnySerialization( + system: ActorSystem[_], + descriptors: immutable.Seq[Descriptors.FileDescriptor], + prefer: ProtoAnySerialization.Prefer) { import ProtoAnySerialization._ private val serialization = SerializationExtension(system.classicSystem) + val classLoader = system.dynamicAccess.classLoader // FIXME use DynamicAccess - private val scalaPbCompanions: Map[String, GeneratedMessageCompanion[scalapb.GeneratedMessage]] = { - protoClassMapping.flatMap { - case (protoName, className) => - system.dynamicAccess - .getObjectFor[GeneratedMessageCompanion[scalapb.GeneratedMessage]](className) - .toOption - .map(companion => protoName -> companion) - } - } + private val allDescriptors = flattenDescriptors(descriptors) - private val javaParsingMethods: Map[String, Method] = { - protoClassMapping.collect { - case (protoName, className) if !scalaPbCompanions.contains(protoName) => - val clazz = system.dynamicAccess.getClassFor[Message](className).get - val parsingMethod = - clazz.getDeclaredMethod("parseFrom", ArrayOfByteArray: _*) - protoName -> parsingMethod - } - } + private val allTypes = (for { + descriptor <- allDescriptors.values + messageType <- descriptor.getMessageTypes.asScala + } yield messageType.getFullName -> messageType).toMap - def encode(event: Any): ScalaPbAny = { + private val reflectionCache = TrieMap.empty[String, Try[ResolvedType[Any]]] + + /** + * If only used for encoding messages and not decoding messages. + */ + def this(system: ActorSystem[_]) = + this(system, descriptors = Nil, ProtoAnySerialization.Prefer.Scala) + + def serialize(event: Any): ScalaPbAny = { event match { + case scalaPbAny: ScalaPbAny if scalaPbAny.typeUrl.startsWith(GoogleTypeUrlPrefix) => + ScalaPbAny(ProtoAnyTypeUrl, scalaPbAny.toByteString) + case pbAny: PbAny if pbAny.getTypeUrl.startsWith(GoogleTypeUrlPrefix) => + ScalaPbAny(ProtoAnyTypeUrl, pbAny.toByteString) case scalaPbAny: ScalaPbAny => scalaPbAny case pbAny: PbAny => ScalaPbAny.fromJavaProto(pbAny) case msg: scalapb.GeneratedMessage => - ScalaPbAny(GoogleTypeUrlPrefix + msg.companion.scalaDescriptor.fullName, msg.toByteString) + encode(msg) case msg: GeneratedMessageV3 => - ScalaPbAny(GoogleTypeUrlPrefix + msg.getDescriptorForType.getFullName, msg.toByteString) + encode(msg) case other => + // fallback to Akka serialization val otherAnyRef = other.asInstanceOf[AnyRef] val bytes = serialization.serialize(otherAnyRef).get val serializer = serialization.findSerializerFor(otherAnyRef) @@ -79,22 +149,15 @@ import scalapb.GeneratedMessageCompanion } } - def decode(scalaPbAny: ScalaPbAny): Any = { + def deserialize(scalaPbAny: ScalaPbAny): Any = { val typeUrl = scalaPbAny.typeUrl - if (typeUrl.startsWith(GoogleTypeUrlPrefix)) { - val manifest = typeUrl.substring(GoogleTypeUrlPrefix.length) - scalaPbCompanions.get(manifest) match { - case Some(companion) => - companion.parseFrom(scalaPbAny.value.newCodedInput()) - case None => - javaParsingMethods.get(manifest) match { - case Some(parsingMethod) => - parsingMethod.invoke(null, scalaPbAny.value.toByteArray) - case None => - throw new IllegalArgumentException( - s"Need a configured protobuf message class to be able to deserialize [$manifest].") - } - } + if (typeUrl == ProtoAnyTypeUrl) { + if (prefer == Prefer.Scala) + ScalaPbAny.parseFrom(scalaPbAny.value.newCodedInput()) + else + PbAny.parseFrom(scalaPbAny.value) + } else if (typeUrl.startsWith(GoogleTypeUrlPrefix)) { + decodeMessage(scalaPbAny) } else if (typeUrl.startsWith(AkkaSerializationTypeUrlPrefix)) { val idAndManifest = typeUrl.substring(AkkaSerializationTypeUrlPrefix.length) @@ -106,9 +169,194 @@ import scalapb.GeneratedMessageCompanion idAndManifest.substring(0, i).toInt -> idAndManifest.substring(i + 1) serialization.deserialize(scalaPbAny.value.toByteArray, id, manifest).get + } else if (prefer == Prefer.Scala) { + // when custom typeUrl + scalaPbAny } else { + // when custom typeUrl ScalaPbAny.toJavaProto(scalaPbAny) } } + private def strippedFileName(fileName: String) = + fileName.split(Array('/', '\\')).last.stripSuffix(".proto") + + private def tryResolveJavaPbType(typeDescriptor: Descriptors.Descriptor): Option[JavaPbResolvedType[Message]] = { + val fileDescriptor = typeDescriptor.getFile + val options = fileDescriptor.getOptions + // Firstly, determine the java package + val packageName = + if (options.hasJavaPackage) options.getJavaPackage + "." + else if (fileDescriptor.getPackage.nonEmpty) + fileDescriptor.getPackage + "." + else "" + + val outerClassName = + if (options.hasJavaMultipleFiles && options.getJavaMultipleFiles) "" + else if (options.hasJavaOuterClassname) + options.getJavaOuterClassname + "$" + else if (fileDescriptor.getName.nonEmpty) { + val name = strippedFileName(fileDescriptor.getName) + if (name.contains('_') || name.contains('-') || !name(0).isUpper) { + // transform snake and kebab case into camel case + CaseFormat.LOWER_UNDERSCORE + .to(CaseFormat.UPPER_CAMEL, name.replace('-', '_')) + "$" + } else { + // keep name as is to keep already camel cased file name + strippedFileName(fileDescriptor.getName) + "$" + } + } else "" + + val className = packageName + outerClassName + typeDescriptor.getName + try { + log.debug("tryResolveJavaPbType attempting to load class {}", className) + + val clazz = system.dynamicAccess.getClassFor[Any](className).get + val parser = clazz + .getMethod("parser") + .invoke(null) + .asInstanceOf[Parser[com.google.protobuf.Message]] + Some(new JavaPbResolvedType(parser)) + + } catch { + case cnfe: ClassNotFoundException => + log.debug2("Failed to load class [{}] because: {}", className, cnfe.getMessage) + None + case nsme: NoSuchElementException => + // FIXME wrong exception? NoSuchMethodException is thrown from getMethod("parser") + throw SerializationException( + s"Found com.google.protobuf.Message class $className to deserialize protobuf ${typeDescriptor.getFullName} but it didn't have a static parser() method on it.", + nsme) + case _: NoSuchMethodException => + // throw SerializationException( + // s"Found com.google.protobuf.Message class $className to deserialize protobuf ${typeDescriptor.getFullName} but it didn't have a static parser() method on it.", + // nsme) + // FIXME there seems to be a case where the ScalaPB class can be loaded, but it ofc doesn't have the parser method + None + case iae @ (_: IllegalAccessException | _: IllegalArgumentException) => + throw SerializationException(s"Could not invoke $className.parser()", iae) + case cce: ClassCastException => + throw SerializationException(s"$className.parser() did not return a ${classOf[Parser[_]]}", cce) + } + } + + private def tryResolveScalaPbType(typeDescriptor: Descriptors.Descriptor): Option[ScalaPbResolvedType[Nothing]] = { + // todo - attempt to load the package.proto file for this package to get default options from there + val fileDescriptor = typeDescriptor.getFile + val options = fileDescriptor.getOptions + val scalaOptions: Scalapb.ScalaPbOptions = + if (options.hasExtension(Scalapb.options)) { + options.getExtension(Scalapb.options) + } else Scalapb.ScalaPbOptions.getDefaultInstance + + // Firstly, determine the java package + val packageName = + if (scalaOptions.hasPackageName) scalaOptions.getPackageName + "." + else if (options.hasJavaPackage) options.getJavaPackage + "." + else if (fileDescriptor.getPackage.nonEmpty) + fileDescriptor.getPackage + "." + else "" + + // flat package could be overridden on the command line, so attempt to load both possibilities if it's not + // explicitly setclassLoader.loadClass(className) + val possibleBaseNames = + if (scalaOptions.hasFlatPackage) { + if (scalaOptions.getFlatPackage) Seq("") + else Seq(fileDescriptor.getName.stripSuffix(".proto") + ".") + } else if (fileDescriptor.getName.nonEmpty) + Seq("", strippedFileName(fileDescriptor.getName) + ".") + else Seq("") + + possibleBaseNames.collectFirst(Function.unlift { baseName => + val className = packageName + baseName + typeDescriptor.getName + try { + log.debug("Attempting to load scalapb.GeneratedMessageCompanion object {}", className) + val companionObject = + system.dynamicAccess.getObjectFor[GeneratedMessageCompanion[GeneratedMessage]](className).get + Some(new ScalaPbResolvedType(companionObject)) + } catch { + case cnfe: ClassNotFoundException => + log.debug2("Failed to load class [{}] because: {}", className, cnfe.getMessage) + None + case cce: ClassCastException => + log.debug2("Failed to load class [{}] because: {}", className, cce.getMessage) + None + } + }) + } + + private def resolveTypeDescriptor(typeDescriptor: Descriptors.Descriptor): ResolvedType[Any] = + reflectionCache + .getOrElseUpdate( + typeDescriptor.getFullName, + Try { + val maybeResolvedType = + if (prefer == Prefer.Java) { + tryResolveJavaPbType(typeDescriptor).orElse(tryResolveScalaPbType(typeDescriptor)) + } else { + tryResolveScalaPbType(typeDescriptor).orElse(tryResolveJavaPbType(typeDescriptor)) + } + + maybeResolvedType match { + case Some(resolvedType) => + resolvedType.asInstanceOf[ResolvedType[Any]] + case None => + throw SerializationException("Could not determine serializer for type " + typeDescriptor.getFullName) + } + }) + .get + + private def resolveTypeUrl(typeName: String): Option[ResolvedType[_]] = + allTypes.get(typeName).map(resolveTypeDescriptor) + + def encode(value: Any): ScalaPbAny = + value match { + case javaPbAny: JavaPbAny => ScalaPbAny.fromJavaProto(javaPbAny) + case scalaPbAny: ScalaPbAny => scalaPbAny + + case javaProtoMessage: com.google.protobuf.Message => + ScalaPbAny( + GoogleTypeUrlPrefix + javaProtoMessage.getDescriptorForType.getFullName, + javaProtoMessage.toByteString) + + case scalaPbMessage: GeneratedMessage => + ScalaPbAny(GoogleTypeUrlPrefix + scalaPbMessage.companion.scalaDescriptor.fullName, scalaPbMessage.toByteString) + + case null => + throw SerializationException(s"Don't know how to serialize object of type null.") + + case other => + throw SerializationException( + s"Don't know how to serialize object of type ${other.getClass.getName}. " + + "Try passing a protobuf message type.") + } + + /** + * Decodes a Protobuf Any wrapped message into the concrete user message type. + */ + def decodeMessage(any: ScalaPbAny): Any = { + val typeUrl = any.typeUrl + // wrapped concrete protobuf message, parse into the right type + if (!typeUrl.startsWith(GoogleTypeUrlPrefix)) { + log.warn("Message type [{}] does not match type url prefix [{}]", typeUrl: Any, GoogleTypeUrlPrefix) + } + val typeName = typeUrl.split("/", 2) match { + case Array(_, typeName) => + typeName + case _ => + log.warn( + "Message type [{}] does not have a url prefix, it should have one that matchers the type url prefix [{}]", + typeUrl: Any, + GoogleTypeUrlPrefix) + typeUrl + } + + resolveTypeUrl(typeName) match { + case Some(parser) => + parser.parseFrom(any.value) + case None => + throw SerializationException("Unable to find descriptor for type: " + typeUrl) + } + } + } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala index 4385f5aaa..0c5ad76c5 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducer.scala @@ -4,27 +4,27 @@ package akka.projection.grpc.producer.javadsl -import akka.Done - +import java.util.Collections +import java.util.Optional import java.util.concurrent.CompletionStage + import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters.RichOptionalGeneric +import scala.concurrent.Future + +import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.grpc.internal.JavaMetadataImpl import akka.grpc.scaladsl.{ Metadata => ScalaMetadata } -import akka.projection.grpc.internal.EventProducerServiceImpl -import akka.japi.function.{ Function => JapiFunction } import akka.http.javadsl.model.HttpRequest import akka.http.javadsl.model.HttpResponse +import akka.japi.function.{ Function => JapiFunction } +import akka.projection.grpc.internal.EventProducerServiceImpl import akka.projection.grpc.internal.proto.EventProducerServicePowerApiHandler import akka.util.ccompat.JavaConverters._ -import java.util.Collections -import java.util.Optional -import scala.compat.java8.OptionConverters.RichOptionalGeneric -import scala.concurrent.Future - /** * The event producer implementation that can be included a gRPC route in an Akka HTTP server. */ diff --git a/akka-projection-grpc/src/test/protobuf/akka/projection/grpc/internal/shoppingcart_api.proto b/akka-projection-grpc/src/test/protobuf/akka/projection/grpc/internal/shoppingcart_api.proto new file mode 100644 index 000000000..fd640ffcb --- /dev/null +++ b/akka-projection-grpc/src/test/protobuf/akka/projection/grpc/internal/shoppingcart_api.proto @@ -0,0 +1,50 @@ +// This is the public API offered by the shopping cart entity. + +syntax = "proto3"; + +package com.example.shoppingcart; + +option java_multiple_files = true; + +import "google/protobuf/empty.proto"; + +message AddLineItem { + string cart_id = 1; + string product_id = 2; + string name = 3; + int32 quantity = 4; +} + +message AddLineItems { + string cart_id = 1; + repeated LineItem items = 2; +} + +message RemoveLineItem { + string cart_id = 1; + string product_id = 2; +} + +message GetShoppingCart { + string cart_id = 1; +} + +message LineItem { + string product_id = 1; + string name = 2; + int32 quantity = 3; +} + +message Cart { + repeated LineItem items = 1; +} + +service ShoppingCartService { + rpc AddItem(AddLineItem) returns (google.protobuf.Empty); + + rpc AddItems(AddLineItems) returns (google.protobuf.Empty); + + rpc RemoveItem(RemoveLineItem) returns (google.protobuf.Empty); + + rpc GetCart(GetShoppingCart) returns (Cart); +} diff --git a/akka-projection-grpc/src/test/scala/akka/projection/grpc/consumer/GrpcQuerySettingsSpec.scala b/akka-projection-grpc/src/test/scala/akka/projection/grpc/consumer/GrpcQuerySettingsSpec.scala index a853b4b12..01e14cabd 100644 --- a/akka-projection-grpc/src/test/scala/akka/projection/grpc/consumer/GrpcQuerySettingsSpec.scala +++ b/akka-projection-grpc/src/test/scala/akka/projection/grpc/consumer/GrpcQuerySettingsSpec.scala @@ -14,9 +14,6 @@ class GrpcQuerySettingsSpec extends AnyWordSpecLike with Matchers { "parse from config" in { val config = ConfigFactory.parseString(""" stream-id = "my-stream-id" - proto-class-mapping { - "proto.MyMessage" = "java.MyClass" - } additional-request-headers { "x-auth-header" = "secret" } @@ -24,7 +21,6 @@ class GrpcQuerySettingsSpec extends AnyWordSpecLike with Matchers { val settings = GrpcQuerySettings(config) settings.streamId shouldBe "my-stream-id" - settings.protoClassMapping shouldBe (Map("proto.MyMessage" -> "java.MyClass")) settings.additionalRequestMetadata.map(_.asList) shouldBe Some(List("x-auth-header" -> StringEntry("secret"))) } } diff --git a/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala b/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala index 71748b5ac..821bf9190 100644 --- a/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala +++ b/akka-projection-grpc/src/test/scala/akka/projection/grpc/internal/ProtoAnySerializationSpec.scala @@ -11,8 +11,11 @@ import akka.actor.Address import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.projection.grpc.internal.proto.TestEvent +import akka.projection.grpc.internal.proto.TestProto import akka.serialization.SerializationExtension import akka.serialization.Serializers +import com.example.shoppingcart.AddLineItem +import com.example.shoppingcart.ShoppingcartApiProto import com.google.protobuf.ByteString import org.scalatest.wordspec.AnyWordSpecLike import com.google.protobuf.any.{ Any => ScalaPbAny } @@ -20,16 +23,31 @@ import com.google.protobuf.{ Any => PbAny } class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { - private val protoAnySerialization = + private val serializationJava = new ProtoAnySerialization( system, - protoClassMapping = Map( - "google.protobuf.Timestamp" -> "com.google.protobuf.Timestamp", - "akka.projection.grpc.internal.TestEvent" -> "akka.projection.grpc.internal.proto.TestEvent")) + List( + TestProto.javaDescriptor, + ShoppingcartApiProto.javaDescriptor, + com.google.protobuf.TimestampProto.getDescriptor), + ProtoAnySerialization.Prefer.Java) + + private val serializationScala = + new ProtoAnySerialization( + system, + List( + TestProto.javaDescriptor, + ShoppingcartApiProto.javaDescriptor, + com.google.protobuf.timestamp.TimestampProto.javaDescriptor, + com.google.protobuf.any.AnyProto.javaDescriptor), + ProtoAnySerialization.Prefer.Scala) + private val akkaSerialization = SerializationExtension(system.classicSystem) + private val addLineItem = AddLineItem(name = "item", productId = "id", quantity = 10) + "ProtoAnySerialization" must { - "encode and decode Java proto message" in { + "serialize and deserialize Java proto message" in { val instant = Instant.now() val event = com.google.protobuf.Timestamp @@ -37,17 +55,18 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp .setSeconds(instant.getEpochSecond) .setNanos(17) .build() - val pbAny = protoAnySerialization.encode(event) + val pbAny = serializationJava.serialize(event) pbAny.typeUrl shouldBe "type.googleapis.com/google.protobuf.Timestamp" - val deserializedEvent = protoAnySerialization.decode(pbAny) + val deserializedEvent = serializationJava.deserialize(pbAny) + deserializedEvent.getClass shouldBe classOf[com.google.protobuf.Timestamp] deserializedEvent shouldBe event } "encode and decode ScalaPb proto message" in { val event = TestEvent("cart1", "item1", 17) - val pbAny = protoAnySerialization.encode(event) + val pbAny = serializationScala.serialize(event) pbAny.typeUrl shouldBe "type.googleapis.com/akka.projection.grpc.internal.TestEvent" - val deserializedEvent = protoAnySerialization.decode(pbAny) + val deserializedEvent = serializationScala.deserialize(pbAny) deserializedEvent shouldBe event } @@ -59,48 +78,97 @@ class ProtoAnySerializationSpec extends ScalaTestWithActorTestKit with AnyWordSp .setTypeUrl(typeUrl) .setValue(ByteString.copyFrom(value, StandardCharsets.UTF_8)) .build() - val pbAny = protoAnySerialization.encode(event) + val pbAny = serializationJava.serialize(event) pbAny.typeUrl shouldBe typeUrl val deserializedEvent = - protoAnySerialization.decode(pbAny).asInstanceOf[PbAny] + serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEvent.getTypeUrl shouldBe typeUrl deserializedEvent.getValue.toString(StandardCharsets.UTF_8) shouldBe value } - "pass through ScalaPb Any and decode it as Java proto Any" in { + "pass through ScalaPb Any and decode it as preferred Any Any" in { val value = "hello" val typeUrl = "type.my.io/custom" val event = ScalaPbAny(typeUrl, ByteString.copyFrom(value, StandardCharsets.UTF_8)) - val pbAny = protoAnySerialization.encode(event) + val pbAny = serializationScala.serialize(event) pbAny.typeUrl shouldBe typeUrl - val deserializedEvent = - protoAnySerialization.decode(pbAny).asInstanceOf[PbAny] + + val deserializedEventScala = + serializationScala.deserialize(pbAny).asInstanceOf[ScalaPbAny] + deserializedEventScala.typeUrl shouldBe typeUrl + deserializedEventScala.value.toString(StandardCharsets.UTF_8) shouldBe value + + val deserializedEventJava = + serializationJava.deserialize(pbAny).asInstanceOf[PbAny] + deserializedEventJava.getTypeUrl shouldBe typeUrl + deserializedEventJava.getValue.toString(StandardCharsets.UTF_8) shouldBe value + } + + "pass through Java proto Any with Google typeUrl" in { + val instant = Instant.now() + val value = + com.google.protobuf.Timestamp + .newBuilder() + .setSeconds(instant.getEpochSecond) + .setNanos(17) + .build() + val typeUrl = "type.googleapis.com/google.protobuf.Timestamp" + val event = PbAny + .newBuilder() + .setTypeUrl(typeUrl) + .setValue(value.toByteString) + .build() + val pbAny = serializationJava.serialize(event) + pbAny.typeUrl shouldBe "type.googleapis.com/google.protobuf.Any" // wrapped + val deserializedEvent = serializationJava.deserialize(pbAny).asInstanceOf[PbAny] deserializedEvent.getTypeUrl shouldBe typeUrl - deserializedEvent.getValue.toString(StandardCharsets.UTF_8) shouldBe value + com.google.protobuf.Timestamp.parseFrom(deserializedEvent.getValue) shouldBe value + } + + "pass through ScalaPb Any with Google typeUrl" in { + val value = TestEvent("cart1", "item1", 17) + val typeUrl = "type.googleapis.com/TestEvent" + val event = + ScalaPbAny(typeUrl, value.toByteString) + val pbAny = serializationScala.serialize(event) + pbAny.typeUrl shouldBe "type.googleapis.com/google.protobuf.Any" // wrapped + + val deserializedEvent = + serializationScala.deserialize(pbAny).asInstanceOf[ScalaPbAny] + deserializedEvent.typeUrl shouldBe typeUrl + TestEvent.parseFrom(deserializedEvent.value.toByteArray) shouldBe value } "encode and decode with Akka serialization with string manifest" in { val event = Address("akka", system.name, "localhost", 2552) - val pbAny = protoAnySerialization.encode(event) + val pbAny = serializationJava.serialize(event) val serializer = akkaSerialization.findSerializerFor(event) // no manifest for String serializer pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}:${Serializers .manifestFor(serializer, event)}" - val deserializedEvent = protoAnySerialization.decode(pbAny) + val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent shouldBe event } "encode and decode with Akka serialization without string manifest" in { val event = "e1" - val pbAny = protoAnySerialization.encode(event) + val pbAny = serializationJava.serialize(event) val serializer = akkaSerialization.findSerializerFor(event) // no manifest for String serializer pbAny.typeUrl shouldBe s"ser.akka.io/${serializer.identifier}" - val deserializedEvent = protoAnySerialization.decode(pbAny) + val deserializedEvent = serializationJava.deserialize(pbAny) deserializedEvent shouldBe event } + + "support se/deserializing java protobufs" in { + val any = serializationJava.encode(addLineItem) + any.typeUrl should ===("type.googleapis.com/" + AddLineItem.scalaDescriptor.fullName) + serializationJava.decodeMessage(any) should ===(addLineItem) + } + } + } diff --git a/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestOffsetStoreAdapter.scala b/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestOffsetStoreAdapter.scala index 6529c86c0..7e26eab0d 100644 --- a/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestOffsetStoreAdapter.scala +++ b/akka-projection-testkit/src/main/scala/akka/projection/testkit/internal/TestOffsetStoreAdapter.scala @@ -4,7 +4,7 @@ package akka.projection.testkit.internal -import scala.jdk.CollectionConverters._ +import akka.util.ccompat.JavaConverters._ import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import scala.concurrent.Future diff --git a/docs/src/main/paradox/grpc.md b/docs/src/main/paradox/grpc.md index 7265b7363..277a1474d 100644 --- a/docs/src/main/paradox/grpc.md +++ b/docs/src/main/paradox/grpc.md @@ -49,6 +49,9 @@ Scala Java : @@snip [ShoppingCartEventConsumer.java](/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java) { #initProjections } +The Protobuf descriptors are defined when the @apidoc[GrpcReadJournal] is created. The descriptors are used +when deserializing the received events. + The gRPC connection to the producer is defined in the [consumer configuration](#consumer-configuration). The @extref:[R2dbcProjection](akka-persistence-r2dbc:projection.html) has support for storing the offset in a relational database using R2DBC. @@ -124,8 +127,6 @@ The configuration for the `GrpcReadJournal` may look like this: The `client` section in the configuration defines where the producer is running. It is an @extref:[Akka gRPC configuration](akka-grpc:configuration.html#by-configuration) with several connection options. -TODO: Describe `proto-class-mapping`, but we might have a more convenient solution for that https://github.com/lightbend/akka-projection-grpc/issues/13 - ### Reference configuration The following can be overridden in your `application.conf` for the Projection specific settings: diff --git a/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java b/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java index 2886d9fbb..2350ca8dc 100644 --- a/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java +++ b/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java @@ -2,12 +2,14 @@ //#initProjections import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; +import akka.grpc.GrpcClientSettings; import akka.japi.Pair; import akka.persistence.Persistence; import akka.persistence.query.typed.EventEnvelope; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.javadsl.EventSourcedProvider; +import akka.projection.grpc.consumer.GrpcQuerySettings; import akka.projection.grpc.consumer.javadsl.GrpcReadJournal; import akka.projection.javadsl.SourceProvider; import akka.projection.r2dbc.javadsl.R2dbcProjection; @@ -24,7 +26,9 @@ import shopping.cart.proto.ItemAdded; import shopping.cart.proto.ItemQuantityAdjusted; import shopping.cart.proto.ItemRemoved; +import shopping.cart.proto.ShoppingCartEvents; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -115,9 +119,17 @@ public static void init(ActorSystem system) { String projectionKey = streamId + "-" + sliceRange.first() + "-" + sliceRange.second(); ProjectionId projectionId = ProjectionId.of(projectionName, projectionKey); + GrpcReadJournal eventsBySlicesQuery = GrpcReadJournal.create( + system, + GrpcQuerySettings.create(streamId, Optional.empty()), + GrpcClientSettings.fromConfig( // FIXME this is rather inconvenient + system.settings().config() + .getConfig("akka.projection.grpc.consumer.client"), system), + List.of(ShoppingCartEvents.getDescriptor())); + SourceProvider> sourceProvider = EventSourcedProvider.eventsBySlices( system, - GrpcReadJournal.Identifier(), + eventsBySlicesQuery, streamId, sliceRange.first(), sliceRange.second()); diff --git a/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf b/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf index 0f62fcbdb..a68982648 100644 --- a/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf +++ b/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf @@ -7,11 +7,4 @@ akka.projection.grpc.consumer { use-tls = false } stream-id = "cart" - - proto-class-mapping { - "shoppingcart.ItemAdded" = "shopping.cart.proto.ItemAdded" - "shoppingcart.ItemQuantityAdjusted" = "shopping.cart.proto.ItemQuantityAdjusted" - "shoppingcart.ItemRemoved" = "shopping.cart.proto.ItemRemoved" - "shoppingcart.CheckedOut" = "shopping.cart.proto.CheckedOut" - } } diff --git a/samples/grpc/shopping-analytics-service-scala/build.sbt b/samples/grpc/shopping-analytics-service-scala/build.sbt index 73c61886a..4edc640b9 100644 --- a/samples/grpc/shopping-analytics-service-scala/build.sbt +++ b/samples/grpc/shopping-analytics-service-scala/build.sbt @@ -2,8 +2,7 @@ name := "shopping-analytics-service" organization := "com.lightbend.akka.samples" organizationHomepage := Some(url("https://akka.io")) -licenses := Seq( - ("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) +licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) scalaVersion := "2.13.5" @@ -22,7 +21,9 @@ Test / logBuffered := false run / fork := true // pass along config selection to forked jvm -run / javaOptions ++= sys.props.get("config.resource").fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) +run / javaOptions ++= sys.props + .get("config.resource") + .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c val AkkaVersion = "2.7.0-M3" @@ -30,7 +31,8 @@ val AkkaHttpVersion = "10.4.0-M1" val AkkaManagementVersion = "1.2.0-M1" val AkkaPersistenceR2dbcVersion = "1.0.0-M2" // FIXME final release version -val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.3.0-M2") +val AkkaProjectionVersion = + sys.props.getOrElse("akka-projection.version", "1.3.0-M2") enablePlugins(AkkaGrpcPlugin) diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf b/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf index 8f38dc3b5..a68982648 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf +++ b/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf @@ -7,11 +7,4 @@ akka.projection.grpc.consumer { use-tls = false } stream-id = "cart" - - proto-class-mapping { - "shoppingcart.ItemAdded" = "shoppingcart.ItemAdded" - "shoppingcart.ItemQuantityAdjusted" = "shoppingcart.ItemQuantityAdjusted" - "shoppingcart.ItemRemoved" = "shoppingcart.ItemRemoved" - "shoppingcart.CheckedOut" = "shoppingcart.CheckedOut" - } } diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala index eebea88a4..040598689 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala +++ b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala @@ -2,14 +2,17 @@ package shopping.analytics //#initProjections import scala.concurrent.Future + import akka.Done import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.grpc.GrpcClientSettings import akka.persistence.Persistence import akka.persistence.query.typed.EventEnvelope import akka.projection.ProjectionBehavior import akka.projection.ProjectionId import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.Handler @@ -18,6 +21,7 @@ import shoppingcart.CheckedOut import shoppingcart.ItemAdded import shoppingcart.ItemQuantityAdjusted import shoppingcart.ItemRemoved +import shoppingcart.ShoppingCartEventsProto object ShoppingCartEventConsumer { //#initProjections @@ -115,9 +119,20 @@ object ShoppingCartEventConsumer { val sliceRange = sliceRanges(idx) val projectionKey = s"$streamId-${sliceRange.start}-${sliceRange.end}" val projectionId = ProjectionId.of(projectionName, projectionKey) + + val eventsBySlicesQuery = GrpcReadJournal( + GrpcQuerySettings(streamId, None), + GrpcClientSettings.fromConfig( // FIXME this is rather inconvenient + system.settings.config + .getConfig("akka.projection.grpc.consumer.client")), + List( + ShoppingCartEventsProto.javaDescriptor + ) // FIXME should we support the scalaDescriptor? + ) + val sourceProvider = EventSourcedProvider.eventsBySlices[AnyRef]( system, - GrpcReadJournal.Identifier, + eventsBySlicesQuery, streamId, sliceRange.start, sliceRange.end) diff --git a/samples/grpc/shopping-cart-service-scala/build.sbt b/samples/grpc/shopping-cart-service-scala/build.sbt index 7b58571f6..330721ee0 100644 --- a/samples/grpc/shopping-cart-service-scala/build.sbt +++ b/samples/grpc/shopping-cart-service-scala/build.sbt @@ -2,7 +2,8 @@ name := "shopping-cart-service" organization := "com.lightbend.akka.samples" organizationHomepage := Some(url("https://akka.io")) -licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) +licenses := Seq( + ("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) scalaVersion := "2.13.5" @@ -21,7 +22,9 @@ Test / logBuffered := false run / fork := true // pass along config selection to forked jvm -run / javaOptions ++= sys.props.get("config.resource").fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) +run / javaOptions ++= sys.props + .get("config.resource") + .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c val AkkaVersion = "2.7.0-M3" @@ -29,7 +32,8 @@ val AkkaHttpVersion = "10.4.0-M1" val AkkaManagementVersion = "1.2.0-M1" val AkkaPersistenceR2dbcVersion = "1.0.0-M2" // FIXME final release version -val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.3.0-M2") +val AkkaProjectionVersion = + sys.props.getOrElse("akka-projection.version", "1.3.0-M2") enablePlugins(AkkaGrpcPlugin)