From d514078bf48d97bf093005024da143391726eec0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 4 Dec 2023 16:12:18 +0100 Subject: [PATCH] fix: Pass through FilteredEvents for eventOriginFilter and ack from cloud (#1079) Makes sure there are no weird gaps in the producing offset store --- .../grpc/internal/EventPusher.scala | 16 ++-- .../EventPusherConsumerServiceImpl.scala | 90 ++++++++++--------- 2 files changed, 52 insertions(+), 54 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala index bc88cd7f9..13a5c3252 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala @@ -36,6 +36,7 @@ import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic import akka.stream.stage.InHandler import akka.stream.stage.OutHandler +import akka.util.ConstantFun import org.slf4j.LoggerFactory import java.util @@ -82,14 +83,7 @@ private[akka] object EventPusher { .getOrElse(throw new IllegalArgumentException( s"Entity ${eps.entityType} is a replicated entity but `replicatedEventOriginFilter` is not set")) .createFilter(replicaInfo) - val eventOriginFilterFlow = - Flow[(EventEnvelope[Event], ProjectionContext)] - .filter { - case (envelope, _) => - // completely filter out replicated events that originated in the cloud - eventOriginFilterPredicate(envelope) - } - (filter, eventOriginFilterFlow) + (filter, eventOriginFilterPredicate) case None => ( @@ -97,16 +91,16 @@ private[akka] object EventPusher { Filter.empty(eps.settings.topicTagPrefix), startMessage.filter, mapEntityIdToPidHandledByThisStream = identity), - Flow[(EventEnvelope[Event], ProjectionContext)]) + ConstantFun.anyToTrue) } } Flow[(EventEnvelope[Event], ProjectionContext)] - .via(replicatedEventOriginFilter) .mapAsync(eps.settings.transformationParallelism) { case (envelope, projectionContext) => val filteredTransformed = - if (eps.producerFilter(envelope.asInstanceOf[EventEnvelope[Any]]) && + if (replicatedEventOriginFilter(envelope) && eps.producerFilter( + envelope.asInstanceOf[EventEnvelope[Any]]) && consumerFilter.matches(envelope)) { if (logger.isTraceEnabled()) logger.trace( diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 923a34e34..8f1be301e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -103,50 +103,54 @@ private[akka] object EventPusherConsumerServiceImpl { case ((eventProducerDestinations, destinationPerStreamId), replicationSetting) => implicit val timeout: Timeout = replicationSetting.entityEventReplicationTimeout val sendEvent = { (envelope: EventEnvelope[_], fillSequenceNumberGaps: Boolean) => - envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => - // send event to entity in this replica - val replicationId = ReplicationId.fromString(envelope.persistenceId) - val destinationReplicaId = replicationId.withReplica(replicationSetting.selfReplicaId) - if (fillSequenceNumberGaps) + if (envelope.filtered) { + Future.successful(Done) + } else { + envelope.eventMetadata match { + case Some(replicatedEventMetadata: ReplicatedEventMetadata) => + // send event to entity in this replica + val replicationId = ReplicationId.fromString(envelope.persistenceId) + val destinationReplicaId = replicationId.withReplica(replicationSetting.selfReplicaId) + if (fillSequenceNumberGaps) + throw new IllegalArgumentException( + s"fillSequenceNumberGaps can not be true for RES (pid ${envelope.persistenceId}, seq nr ${envelope.sequenceNr} from ${replicationId}") + val entityRef = sharding + .entityRefFor(replicationSetting.entityTypeKey, destinationReplicaId.entityId) + .asInstanceOf[EntityRef[PublishedEvent]] + val ask = () => + entityRef.ask[Done]( + replyTo => + PublishedEventImpl( + replicationId.persistenceId, + replicatedEventMetadata.originSequenceNr, + envelope.event, + envelope.timestamp, + Some( + new ReplicatedPublishedEventMetaData( + replicatedEventMetadata.originReplica, + replicatedEventMetadata.version)), + Some(replyTo))) + + // try a few times before tearing stream down, forcing the client to restart/reconnect + val askResult = akka.pattern.retry( + ask, + replicationSetting.edgeReplicationDeliveryRetries, + replicationSetting.edgeReplicationDeliveryMinBackoff, + replicationSetting.edgeReplicationDeliveryMaxBackoff, + 0.2d)(system.executionContext, system.classicSystem.scheduler) + + askResult.failed.foreach( + error => + log.warn( + s"Failing replication stream from [${replicatedEventMetadata.originReplica.id}], event pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]", + error)) + askResult + + case unexpected => throw new IllegalArgumentException( - s"fillSequenceNumberGaps can not be true for RES (pid ${envelope.persistenceId}, seq nr ${envelope.sequenceNr} from ${replicationId}") - val entityRef = sharding - .entityRefFor(replicationSetting.entityTypeKey, destinationReplicaId.entityId) - .asInstanceOf[EntityRef[PublishedEvent]] - val ask = () => - entityRef.ask[Done]( - replyTo => - PublishedEventImpl( - replicationId.persistenceId, - replicatedEventMetadata.originSequenceNr, - envelope.event, - envelope.timestamp, - Some( - new ReplicatedPublishedEventMetaData( - replicatedEventMetadata.originReplica, - replicatedEventMetadata.version)), - Some(replyTo))) - - // try a few times before tearing stream down, forcing the client to restart/reconnect - val askResult = akka.pattern.retry( - ask, - replicationSetting.edgeReplicationDeliveryRetries, - replicationSetting.edgeReplicationDeliveryMinBackoff, - replicationSetting.edgeReplicationDeliveryMaxBackoff, - 0.2d)(system.executionContext, system.classicSystem.scheduler) - - askResult.failed.foreach( - error => - log.warn( - s"Failing replication stream from [${replicatedEventMetadata.originReplica.id}], event pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]", - error)) - askResult - - case unexpected => - throw new IllegalArgumentException( - s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + - ", is the remote entity really a Replicated Event Sourced Entity?") + s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + + ", is the remote entity really a Replicated Event Sourced Entity?") + } } }