Skip to content

Commit

Permalink
optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 25, 2024
1 parent c30588c commit 47be630
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,17 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
},
init.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]"))

val transformMetadata: Flow[EventEnvelope[Any], EventEnvelope[Any], NotUsed] =
Flow[EventEnvelope[Any]].map { env =>
producerSource.replicatedEventMetadataTransformation(env) match {
case None => env
case Some(metadata) => env.withMetadata(metadata)
val transformMetadata: Flow[EventEnvelope[Any], EventEnvelope[Any], NotUsed] = {
if (producerSource.hasReplicatedEventMetadataTransformation)
Flow[EventEnvelope[Any]].map { env =>
producerSource.replicatedEventMetadataTransformation(env) match {
case None => env
case Some(metadata) => env.withMetadata(metadata)
}
}
}
else
Flow[EventEnvelope[Any]]
}

val events: Source[EventEnvelope[Any], NotUsed] =
(eventsBySlicesPerStreamId.get(init.streamId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import akka.projection.grpc.replication.internal.EventOriginFilter
object EventProducer {

object EventProducerSource {
private val IdentityReplicatedEventMetadataTransformation: EventEnvelope[Any] => Option[ReplicatedEventMetadata] =
_ => None

def apply(
entityType: String,
streamId: String,
Expand All @@ -47,7 +50,7 @@ object EventProducer {
_ => true,
transformSnapshot = None,
replicatedEventOriginFilter = None,
replicatedEventMetadataTransformation = _ => None)
replicatedEventMetadataTransformation = IdentityReplicatedEventMetadataTransformation)

def apply[Event](
entityType: String,
Expand All @@ -63,7 +66,7 @@ object EventProducer {
producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean],
transformSnapshot = None,
replicatedEventOriginFilter = None,
replicatedEventMetadataTransformation = _ => None)
replicatedEventMetadataTransformation = IdentityReplicatedEventMetadataTransformation)

}

Expand Down Expand Up @@ -129,6 +132,9 @@ object EventProducer {
f: EventEnvelope[Any] => Option[ReplicatedEventMetadata]): EventProducerSource =
copy(replicatedEventMetadataTransformation = f)

def hasReplicatedEventMetadataTransformation: Boolean =
replicatedEventMetadataTransformation ne EventProducerSource.IdentityReplicatedEventMetadataTransformation

/**
* INTERNAL API
*/
Expand Down

0 comments on commit 47be630

Please sign in to comment.