diff --git a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala index ceac102a4..94ae630dd 100644 --- a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala +++ b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala @@ -122,7 +122,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf) .loadEnvelope[String](pid.id, sequenceNr = 1L) .futureValue env.filtered shouldBe true - env.eventMetadata shouldBe None + env.internalEventMetadata shouldBe None env.eventOption.isEmpty shouldBe true } diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala index 225dbe8e9..9bb45bcb9 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala @@ -257,7 +257,7 @@ class EventProducerServiceSpec EventProducerSource(entityType7, streamId7, transformation, settings) .withReplicatedEventMetadataTransformation( env => - if (env.eventMetadata.isDefined) None + if (env.metadata[ReplicatedEventMetadata].isDefined) None else { // migrated from non-replicated, fill in metadata Some( diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala index d094b75a5..01eb6c991 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala @@ -47,7 +47,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures { "transform low level with metadata" in { val transformer = Transformation.empty.registerAsyncEnvelopeMapper((env: EventEnvelope[String]) => - Future.successful(env.eventMetadata)) + Future.successful(env.metadata[String])) transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta")) } @@ -63,7 +63,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures { "fallback low level with metadata if no transformer exist for event" in { val transformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper((env: EventEnvelope[Any]) => - Future.successful(env.eventMetadata)) + Future.successful(env.metadata[String])) transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta")) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index c32f6b085..705967e70 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -78,8 +78,8 @@ private[akka] object EventPusherConsumerServiceImpl { log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId) Future.successful(Done) } else { - envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => + envelope.metadata[ReplicatedEventMetadata] match { + case Some(replicatedEventMetadata) => // send event to entity in this replica val replicationId = ReplicationId.fromString(envelope.persistenceId) val destinationReplicaId = replicationId.withReplica(replicationSettings.selfReplicaId) @@ -104,7 +104,8 @@ private[akka] object EventPusherConsumerServiceImpl { Some( new ReplicatedPublishedEventMetaData( replicatedEventMetadata.originReplica, - replicatedEventMetadata.version)), + replicatedEventMetadata.version, + envelope.internalEventMetadata)), Some(replyTo))) } @@ -123,9 +124,9 @@ private[akka] object EventPusherConsumerServiceImpl { error)) askResult - case unexpected => + case None => throw new IllegalArgumentException( - s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + + s"Missing replication metadata (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + ", is the remote entity really a Replicated Event Sourced Entity?") } } @@ -141,7 +142,7 @@ private[akka] object EventPusherConsumerServiceImpl { event = envelope.eventOption.getOrElse(FilteredPayload), isSnapshotEvent = fromSnapshot(envelope), fillSequenceNumberGaps = fillSequenceNumberGaps, - metadata = envelope.eventMetadata, + metadata = envelope.internalEventMetadata, tags = envelope.tags, replyTo = replyTo))(d.settings.journalWriteTimeout, system.scheduler) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala index 49cf61c60..39d40e9e1 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala @@ -426,7 +426,7 @@ import scala.concurrent.ExecutionContext val pid = env.persistenceId // replicaId is used for validation of replay requests, to avoid replay for other replicas - if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata])) + if (replicaId.isEmpty && env.metadata[ReplicatedEventMetadata].isDefined) replicaId = Some(ReplicationId.fromString(pid).replicaId) if (producerFilter(env) && filter.matches(env)) { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 6e257a6af..7274c40e3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -113,7 +113,7 @@ private[akka] object ProtobufProtocolConversions { def toEvent(transformedEvent: Any): Event = { val protoEvent = protoAnySerialization.serialize(transformedEvent) - val metadata = env.eventMetadata.map(protoAnySerialization.serialize) + val metadata = env.internalEventMetadata.map(protoAnySerialization.serialize) Event( persistenceId = env.persistenceId, seqNr = env.sequenceNr, @@ -169,7 +169,7 @@ private[akka] object ProtobufProtocolConversions { event.seqNr, evt, eventOffset.timestamp.toEpochMilli, - eventMetadata = metadata, + _eventMetadata = metadata, PersistenceId.extractEntityType(event.persistenceId), event.slice, filtered = false, @@ -188,7 +188,7 @@ private[akka] object ProtobufProtocolConversions { event.seqNr, eventOption = Some(serializedEvent), eventOffset.timestamp.toEpochMilli, - eventMetadata = metadata, + _eventMetadata = metadata, PersistenceId.extractEntityType(event.persistenceId), event.slice, filtered = false, @@ -213,7 +213,7 @@ private[akka] object ProtobufProtocolConversions { filtered.seqNr, None, eventOffset.timestamp.toEpochMilli, - eventMetadata = None, + _eventMetadata = None, PersistenceId.extractEntityType(filtered.persistenceId), filtered.slice, filtered = true, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala index b45983b44..3a4bdc328 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala @@ -36,10 +36,10 @@ import akka.projection.grpc.internal.proto.ReplicaInfo if (envelope.eventOption.isEmpty) true else - envelope.eventMetadata match { - case Some(meta: ReplicatedEventMetadata) => + envelope.metadata[ReplicatedEventMetadata] match { + case Some(meta) => !exclude(meta.originReplica) - case _ => + case None => throw new IllegalArgumentException( s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})") } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index c85cd2155..c17db1c8f 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -103,7 +103,7 @@ private[akka] object ReplicationImpl { settings.producerFilter) .withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId)) .withReplicatedEventMetadataTransformation(env => - if (env.eventMetadata.isDefined) None + if (env.metadata[ReplicatedEventMetadata].isDefined) None else { // migrated from non-replicated, fill in metadata Some( @@ -218,8 +218,8 @@ private[akka] object ReplicationImpl { envelope.persistenceId) { case (envelope, _) => if (!envelope.filtered) { - envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) + envelope.metadata[ReplicatedEventMetadata] match { + case Some(replicatedEventMetadata) if replicatedEventMetadata.originReplica == settings.selfReplicaId => // skipping events originating from self replica (break cycle) if (log.isTraceEnabled) @@ -231,7 +231,7 @@ private[akka] object ReplicationImpl { envelope.sequenceNr) Future.successful(Done) - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => + case Some(replicatedEventMetadata) => val replicationId = ReplicationId.fromString(envelope.persistenceId) val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId) val entityRef = @@ -251,9 +251,11 @@ private[akka] object ReplicationImpl { replicatedEventMetadata.originSequenceNr, envelope.event, envelope.timestamp, - Some(new ReplicatedPublishedEventMetaData( - replicatedEventMetadata.originReplica, - replicatedEventMetadata.version)), + Some( + new ReplicatedPublishedEventMetaData( + replicatedEventMetadata.originReplica, + replicatedEventMetadata.version, + envelope.internalEventMetadata)), Some(replyTo))) askResult.failed.foreach(error => log.warn( @@ -360,7 +362,7 @@ private[akka] object ReplicationImpl { settings.eventProducerSettings.withAkkaSerializationOnly()) .withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId)) .withReplicatedEventMetadataTransformation(env => - if (env.eventMetadata.isDefined) None + if (env.metadata[ReplicatedEventMetadata].isDefined) None else { // migrated from non-replicated, fill in metadata Some( diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala index 4812d8a37..8172de34b 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala @@ -395,7 +395,7 @@ class R2dbcTimestampOffsetProjectionSpec env.sequenceNr, eventOption = None, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, env.filtered, @@ -484,7 +484,7 @@ class R2dbcTimestampOffsetProjectionSpec env.sequenceNr, env.eventOption, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, filtered = true, diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index dbd7520a0..6065d3001 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -96,7 +96,7 @@ class R2dbcTimestampOffsetStoreSpec env.sequenceNr, eventOption = None, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, env.filtered, @@ -109,7 +109,7 @@ class R2dbcTimestampOffsetStoreSpec env.sequenceNr, env.eventOption, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, filtered = true, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1e5173c16..949293b85 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,14 +9,14 @@ object Dependencies { // sync with Java version in .github/workflows/release.yml#documentation lazy val JavaDocLinkVersion = 17 - val Scala213 = "2.13.15" + val Scala213 = "2.13.16" val Scala3 = "3.3.4" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 object Versions { - val Akka = sys.props.getOrElse("build.akka.version", "2.10.0") + val Akka = sys.props.getOrElse("build.akka.version", "2.10.0+53-8ec61a38-SNAPSHOT") val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } val Alpakka = "9.0.0" @@ -35,7 +35,7 @@ object Dependencies { val AkkaPersistenceCassandra = "1.3.0" val AkkaPersistenceJdbc = "5.5.0" - val AkkaPersistenceR2dbc = "1.3.1" + val AkkaPersistenceR2dbc = "1.3.1+4-d1a01e21-SNAPSHOT" val AkkaPersistenceR2dbcVersionInDocs = VersionNumber(AkkaPersistenceR2dbc).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }