Skip to content

Commit

Permalink
Initial consumer filter for edge replication
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 8, 2023
1 parent e358cac commit 6e552a5
Showing 1 changed file with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ private[akka] object ReplicationImpl {
val sharding = ClusterSharding(system)
sharding.init(replicatedEntity.entity)

if (settings.initialConsumerFilter.nonEmpty) {
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter)
}

// sharded daemon process for consuming event stream from the other replicas
val shardingEntityRefFactory: String => EntityRef[Command] =
sharding.entityRefFor(replicatedEntity.entity.typeKey, _)
Expand Down

0 comments on commit 6e552a5

Please sign in to comment.