From 47be6304ccce0ae9e615dac1b17f0418612cbcc7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 Oct 2024 11:36:21 +0200 Subject: [PATCH] optimization --- .../grpc/internal/EventProducerServiceImpl.scala | 16 ++++++++++------ .../grpc/producer/scaladsl/EventProducer.scala | 10 ++++++++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index 882549410..72bd4178d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -149,13 +149,17 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation }, init.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) - val transformMetadata: Flow[EventEnvelope[Any], EventEnvelope[Any], NotUsed] = - Flow[EventEnvelope[Any]].map { env => - producerSource.replicatedEventMetadataTransformation(env) match { - case None => env - case Some(metadata) => env.withMetadata(metadata) + val transformMetadata: Flow[EventEnvelope[Any], EventEnvelope[Any], NotUsed] = { + if (producerSource.hasReplicatedEventMetadataTransformation) + Flow[EventEnvelope[Any]].map { env => + producerSource.replicatedEventMetadataTransformation(env) match { + case None => env + case Some(metadata) => env.withMetadata(metadata) + } } - } + else + Flow[EventEnvelope[Any]] + } val events: Source[EventEnvelope[Any], NotUsed] = (eventsBySlicesPerStreamId.get(init.streamId) match { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala index f62d039a4..2004e61ff 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala @@ -34,6 +34,9 @@ import akka.projection.grpc.replication.internal.EventOriginFilter object EventProducer { object EventProducerSource { + private val IdentityReplicatedEventMetadataTransformation: EventEnvelope[Any] => Option[ReplicatedEventMetadata] = + _ => None + def apply( entityType: String, streamId: String, @@ -47,7 +50,7 @@ object EventProducer { _ => true, transformSnapshot = None, replicatedEventOriginFilter = None, - replicatedEventMetadataTransformation = _ => None) + replicatedEventMetadataTransformation = IdentityReplicatedEventMetadataTransformation) def apply[Event]( entityType: String, @@ -63,7 +66,7 @@ object EventProducer { producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean], transformSnapshot = None, replicatedEventOriginFilter = None, - replicatedEventMetadataTransformation = _ => None) + replicatedEventMetadataTransformation = IdentityReplicatedEventMetadataTransformation) } @@ -129,6 +132,9 @@ object EventProducer { f: EventEnvelope[Any] => Option[ReplicatedEventMetadata]): EventProducerSource = copy(replicatedEventMetadataTransformation = f) + def hasReplicatedEventMetadataTransformation: Boolean = + replicatedEventMetadataTransformation ne EventProducerSource.IdentityReplicatedEventMetadataTransformation + /** * INTERNAL API */