Skip to content

Commit

Permalink
fix: Pass through FilteredEvents for eventOriginFilter and ack from c…
Browse files Browse the repository at this point in the history
…loud (#1079)

Makes sure there are no weird gaps in the producing offset store
  • Loading branch information
johanandren authored Dec 4, 2023
1 parent 751de86 commit d514078
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.util.ConstantFun
import org.slf4j.LoggerFactory

import java.util
Expand Down Expand Up @@ -82,31 +83,24 @@ private[akka] object EventPusher {
.getOrElse(throw new IllegalArgumentException(
s"Entity ${eps.entityType} is a replicated entity but `replicatedEventOriginFilter` is not set"))
.createFilter(replicaInfo)
val eventOriginFilterFlow =
Flow[(EventEnvelope[Event], ProjectionContext)]
.filter {
case (envelope, _) =>
// completely filter out replicated events that originated in the cloud
eventOriginFilterPredicate(envelope)
}
(filter, eventOriginFilterFlow)
(filter, eventOriginFilterPredicate)

case None =>
(
updateFilterFromProto(
Filter.empty(eps.settings.topicTagPrefix),
startMessage.filter,
mapEntityIdToPidHandledByThisStream = identity),
Flow[(EventEnvelope[Event], ProjectionContext)])
ConstantFun.anyToTrue)
}
}

Flow[(EventEnvelope[Event], ProjectionContext)]
.via(replicatedEventOriginFilter)
.mapAsync(eps.settings.transformationParallelism) {
case (envelope, projectionContext) =>
val filteredTransformed =
if (eps.producerFilter(envelope.asInstanceOf[EventEnvelope[Any]]) &&
if (replicatedEventOriginFilter(envelope) && eps.producerFilter(
envelope.asInstanceOf[EventEnvelope[Any]]) &&
consumerFilter.matches(envelope)) {
if (logger.isTraceEnabled())
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,50 +103,54 @@ private[akka] object EventPusherConsumerServiceImpl {
case ((eventProducerDestinations, destinationPerStreamId), replicationSetting) =>
implicit val timeout: Timeout = replicationSetting.entityEventReplicationTimeout
val sendEvent = { (envelope: EventEnvelope[_], fillSequenceNumberGaps: Boolean) =>
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
// send event to entity in this replica
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(replicationSetting.selfReplicaId)
if (fillSequenceNumberGaps)
if (envelope.filtered) {
Future.successful(Done)
} else {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
// send event to entity in this replica
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(replicationSetting.selfReplicaId)
if (fillSequenceNumberGaps)
throw new IllegalArgumentException(
s"fillSequenceNumberGaps can not be true for RES (pid ${envelope.persistenceId}, seq nr ${envelope.sequenceNr} from ${replicationId}")
val entityRef = sharding
.entityRefFor(replicationSetting.entityTypeKey, destinationReplicaId.entityId)
.asInstanceOf[EntityRef[PublishedEvent]]
val ask = () =>
entityRef.ask[Done](
replyTo =>
PublishedEventImpl(
replicationId.persistenceId,
replicatedEventMetadata.originSequenceNr,
envelope.event,
envelope.timestamp,
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
Some(replyTo)))

// try a few times before tearing stream down, forcing the client to restart/reconnect
val askResult = akka.pattern.retry(
ask,
replicationSetting.edgeReplicationDeliveryRetries,
replicationSetting.edgeReplicationDeliveryMinBackoff,
replicationSetting.edgeReplicationDeliveryMaxBackoff,
0.2d)(system.executionContext, system.classicSystem.scheduler)

askResult.failed.foreach(
error =>
log.warn(
s"Failing replication stream from [${replicatedEventMetadata.originReplica.id}], event pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]",
error))
askResult

case unexpected =>
throw new IllegalArgumentException(
s"fillSequenceNumberGaps can not be true for RES (pid ${envelope.persistenceId}, seq nr ${envelope.sequenceNr} from ${replicationId}")
val entityRef = sharding
.entityRefFor(replicationSetting.entityTypeKey, destinationReplicaId.entityId)
.asInstanceOf[EntityRef[PublishedEvent]]
val ask = () =>
entityRef.ask[Done](
replyTo =>
PublishedEventImpl(
replicationId.persistenceId,
replicatedEventMetadata.originSequenceNr,
envelope.event,
envelope.timestamp,
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
Some(replyTo)))

// try a few times before tearing stream down, forcing the client to restart/reconnect
val askResult = akka.pattern.retry(
ask,
replicationSetting.edgeReplicationDeliveryRetries,
replicationSetting.edgeReplicationDeliveryMinBackoff,
replicationSetting.edgeReplicationDeliveryMaxBackoff,
0.2d)(system.executionContext, system.classicSystem.scheduler)

askResult.failed.foreach(
error =>
log.warn(
s"Failing replication stream from [${replicatedEventMetadata.originReplica.id}], event pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]",
error))
askResult

case unexpected =>
throw new IllegalArgumentException(
s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
", is the remote entity really a Replicated Event Sourced Entity?")
s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
", is the remote entity really a Replicated Event Sourced Entity?")
}
}
}

Expand Down

0 comments on commit d514078

Please sign in to comment.