Skip to content

Commit

Permalink
bump: Akka core 2.10.1, r2dbc 3.1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Jan 24, 2025
1 parent ec0d6a7 commit c6f7884
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
.loadEnvelope[String](pid.id, sequenceNr = 1L)
.futureValue
env.filtered shouldBe true
env.eventMetadata shouldBe None
env.internalEventMetadata shouldBe None
env.eventOption.isEmpty shouldBe true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class EventProducerServiceSpec
EventProducerSource(entityType7, streamId7, transformation, settings)
.withReplicatedEventMetadataTransformation(
env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {
"transform low level with metadata" in {
val transformer =
Transformation.empty.registerAsyncEnvelopeMapper((env: EventEnvelope[String]) =>
Future.successful(env.eventMetadata))
Future.successful(env.metadata[String]))
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
}

Expand All @@ -63,7 +63,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {

"fallback low level with metadata if no transformer exist for event" in {
val transformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper((env: EventEnvelope[Any]) =>
Future.successful(env.eventMetadata))
Future.successful(env.metadata[String]))
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private[akka] object EventPusherConsumerServiceImpl {
log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId)
Future.successful(Done)
} else {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
envelope.metadata[ReplicatedEventMetadata] match {
case Some(replicatedEventMetadata) =>
// send event to entity in this replica
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(replicationSettings.selfReplicaId)
Expand All @@ -104,7 +104,8 @@ private[akka] object EventPusherConsumerServiceImpl {
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
replicatedEventMetadata.version,
envelope.internalEventMetadata)),
Some(replyTo)))
}

Expand All @@ -123,9 +124,9 @@ private[akka] object EventPusherConsumerServiceImpl {
error))
askResult

case unexpected =>
case None =>
throw new IllegalArgumentException(
s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
s"Missing replication metadata (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
", is the remote entity really a Replicated Event Sourced Entity?")
}
}
Expand All @@ -141,7 +142,7 @@ private[akka] object EventPusherConsumerServiceImpl {
event = envelope.eventOption.getOrElse(FilteredPayload),
isSnapshotEvent = fromSnapshot(envelope),
fillSequenceNumberGaps = fillSequenceNumberGaps,
metadata = envelope.eventMetadata,
metadata = envelope.internalEventMetadata,
tags = envelope.tags,
replyTo = replyTo))(d.settings.journalWriteTimeout, system.scheduler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ import scala.concurrent.ExecutionContext
val pid = env.persistenceId

// replicaId is used for validation of replay requests, to avoid replay for other replicas
if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata]))
if (replicaId.isEmpty && env.metadata[ReplicatedEventMetadata].isDefined)
replicaId = Some(ReplicationId.fromString(pid).replicaId)

if (producerFilter(env) && filter.matches(env)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[akka] object ProtobufProtocolConversions {

def toEvent(transformedEvent: Any): Event = {
val protoEvent = protoAnySerialization.serialize(transformedEvent)
val metadata = env.eventMetadata.map(protoAnySerialization.serialize)
val metadata = env.internalEventMetadata.map(protoAnySerialization.serialize)
Event(
persistenceId = env.persistenceId,
seqNr = env.sequenceNr,
Expand Down Expand Up @@ -169,7 +169,7 @@ private[akka] object ProtobufProtocolConversions {
event.seqNr,
evt,
eventOffset.timestamp.toEpochMilli,
eventMetadata = metadata,
_eventMetadata = metadata,
PersistenceId.extractEntityType(event.persistenceId),
event.slice,
filtered = false,
Expand All @@ -188,7 +188,7 @@ private[akka] object ProtobufProtocolConversions {
event.seqNr,
eventOption = Some(serializedEvent),
eventOffset.timestamp.toEpochMilli,
eventMetadata = metadata,
_eventMetadata = metadata,
PersistenceId.extractEntityType(event.persistenceId),
event.slice,
filtered = false,
Expand All @@ -213,7 +213,7 @@ private[akka] object ProtobufProtocolConversions {
filtered.seqNr,
None,
eventOffset.timestamp.toEpochMilli,
eventMetadata = None,
_eventMetadata = None,
PersistenceId.extractEntityType(filtered.persistenceId),
filtered.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import akka.projection.grpc.internal.proto.ReplicaInfo
if (envelope.eventOption.isEmpty)
true
else
envelope.eventMetadata match {
case Some(meta: ReplicatedEventMetadata) =>
envelope.metadata[ReplicatedEventMetadata] match {
case Some(meta) =>
!exclude(meta.originReplica)
case _ =>
case None =>
throw new IllegalArgumentException(
s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[akka] object ReplicationImpl {
settings.producerFilter)
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
.withReplicatedEventMetadataTransformation(env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down Expand Up @@ -218,8 +218,8 @@ private[akka] object ReplicationImpl {
envelope.persistenceId) {
case (envelope, _) =>
if (!envelope.filtered) {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata)
envelope.metadata[ReplicatedEventMetadata] match {
case Some(replicatedEventMetadata)
if replicatedEventMetadata.originReplica == settings.selfReplicaId =>
// skipping events originating from self replica (break cycle)
if (log.isTraceEnabled)
Expand All @@ -231,7 +231,7 @@ private[akka] object ReplicationImpl {
envelope.sequenceNr)
Future.successful(Done)

case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
case Some(replicatedEventMetadata) =>
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId)
val entityRef =
Expand All @@ -251,9 +251,11 @@ private[akka] object ReplicationImpl {
replicatedEventMetadata.originSequenceNr,
envelope.event,
envelope.timestamp,
Some(new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version,
envelope.internalEventMetadata)),
Some(replyTo)))
askResult.failed.foreach(error =>
log.warn(
Expand Down Expand Up @@ -360,7 +362,7 @@ private[akka] object ReplicationImpl {
settings.eventProducerSettings.withAkkaSerializationOnly())
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
.withReplicatedEventMetadataTransformation(env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class R2dbcTimestampOffsetProjectionSpec
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand Down Expand Up @@ -484,7 +484,7 @@ class R2dbcTimestampOffsetProjectionSpec
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class R2dbcTimestampOffsetStoreSpec
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand All @@ -109,7 +109,7 @@ class R2dbcTimestampOffsetStoreSpec
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ object Dependencies {
// sync with Java version in .github/workflows/release.yml#documentation
lazy val JavaDocLinkVersion = 17

val Scala213 = "2.13.15"
val Scala213 = "2.13.16"
val Scala3 = "3.3.4"

val Scala2Versions = Seq(Scala213)
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3

object Versions {
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0")
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0+53-8ec61a38-SNAPSHOT")
val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }

val Alpakka = "9.0.0"
Expand All @@ -35,7 +35,7 @@ object Dependencies {
val AkkaPersistenceCassandra = "1.3.0"
val AkkaPersistenceJdbc = "5.5.0"

val AkkaPersistenceR2dbc = "1.3.1"
val AkkaPersistenceR2dbc = "1.3.1+4-d1a01e21-SNAPSHOT"
val AkkaPersistenceR2dbcVersionInDocs = VersionNumber(AkkaPersistenceR2dbc).numbers match {
case Seq(major, minor, _*) => s"$major.$minor"
}
Expand Down

0 comments on commit c6f7884

Please sign in to comment.