From 02d08dbe0783690e2ad7b42255be6f66f764e0ab Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 24 Jan 2025 15:47:51 +0100 Subject: [PATCH 1/3] bump: Akka core 2.10.1, r2dbc 3.1.2 --- .../consumer/scaladsl/LoadEventQuerySpec.scala | 2 +- .../internal/EventProducerServiceSpec.scala | 2 +- .../producer/scaladsl/TransformationSpec.scala | 4 ++-- .../EventPusherConsumerServiceImpl.scala | 13 +++++++------ .../projection/grpc/internal/FilterStage.scala | 2 +- .../internal/ProtobufProtocolConversions.scala | 8 ++++---- .../internal/EventOriginFilter.scala | 6 +++--- .../replication/internal/ReplicationImpl.scala | 18 ++++++++++-------- .../R2dbcTimestampOffsetProjectionSpec.scala | 4 ++-- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 4 ++-- project/Dependencies.scala | 6 +++--- 11 files changed, 36 insertions(+), 33 deletions(-) diff --git a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala index ceac102a4..94ae630dd 100644 --- a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala +++ b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala @@ -122,7 +122,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf) .loadEnvelope[String](pid.id, sequenceNr = 1L) .futureValue env.filtered shouldBe true - env.eventMetadata shouldBe None + env.internalEventMetadata shouldBe None env.eventOption.isEmpty shouldBe true } diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala index 225dbe8e9..9bb45bcb9 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala @@ -257,7 +257,7 @@ class EventProducerServiceSpec EventProducerSource(entityType7, streamId7, transformation, settings) .withReplicatedEventMetadataTransformation( env => - if (env.eventMetadata.isDefined) None + if (env.metadata[ReplicatedEventMetadata].isDefined) None else { // migrated from non-replicated, fill in metadata Some( diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala index d094b75a5..01eb6c991 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala @@ -47,7 +47,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures { "transform low level with metadata" in { val transformer = Transformation.empty.registerAsyncEnvelopeMapper((env: EventEnvelope[String]) => - Future.successful(env.eventMetadata)) + Future.successful(env.metadata[String])) transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta")) } @@ -63,7 +63,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures { "fallback low level with metadata if no transformer exist for event" in { val transformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper((env: EventEnvelope[Any]) => - Future.successful(env.eventMetadata)) + Future.successful(env.metadata[String])) transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta")) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index c32f6b085..705967e70 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -78,8 +78,8 @@ private[akka] object EventPusherConsumerServiceImpl { log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId) Future.successful(Done) } else { - envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => + envelope.metadata[ReplicatedEventMetadata] match { + case Some(replicatedEventMetadata) => // send event to entity in this replica val replicationId = ReplicationId.fromString(envelope.persistenceId) val destinationReplicaId = replicationId.withReplica(replicationSettings.selfReplicaId) @@ -104,7 +104,8 @@ private[akka] object EventPusherConsumerServiceImpl { Some( new ReplicatedPublishedEventMetaData( replicatedEventMetadata.originReplica, - replicatedEventMetadata.version)), + replicatedEventMetadata.version, + envelope.internalEventMetadata)), Some(replyTo))) } @@ -123,9 +124,9 @@ private[akka] object EventPusherConsumerServiceImpl { error)) askResult - case unexpected => + case None => throw new IllegalArgumentException( - s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + + s"Missing replication metadata (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + ", is the remote entity really a Replicated Event Sourced Entity?") } } @@ -141,7 +142,7 @@ private[akka] object EventPusherConsumerServiceImpl { event = envelope.eventOption.getOrElse(FilteredPayload), isSnapshotEvent = fromSnapshot(envelope), fillSequenceNumberGaps = fillSequenceNumberGaps, - metadata = envelope.eventMetadata, + metadata = envelope.internalEventMetadata, tags = envelope.tags, replyTo = replyTo))(d.settings.journalWriteTimeout, system.scheduler) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala index 49cf61c60..39d40e9e1 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala @@ -426,7 +426,7 @@ import scala.concurrent.ExecutionContext val pid = env.persistenceId // replicaId is used for validation of replay requests, to avoid replay for other replicas - if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata])) + if (replicaId.isEmpty && env.metadata[ReplicatedEventMetadata].isDefined) replicaId = Some(ReplicationId.fromString(pid).replicaId) if (producerFilter(env) && filter.matches(env)) { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 6e257a6af..7274c40e3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -113,7 +113,7 @@ private[akka] object ProtobufProtocolConversions { def toEvent(transformedEvent: Any): Event = { val protoEvent = protoAnySerialization.serialize(transformedEvent) - val metadata = env.eventMetadata.map(protoAnySerialization.serialize) + val metadata = env.internalEventMetadata.map(protoAnySerialization.serialize) Event( persistenceId = env.persistenceId, seqNr = env.sequenceNr, @@ -169,7 +169,7 @@ private[akka] object ProtobufProtocolConversions { event.seqNr, evt, eventOffset.timestamp.toEpochMilli, - eventMetadata = metadata, + _eventMetadata = metadata, PersistenceId.extractEntityType(event.persistenceId), event.slice, filtered = false, @@ -188,7 +188,7 @@ private[akka] object ProtobufProtocolConversions { event.seqNr, eventOption = Some(serializedEvent), eventOffset.timestamp.toEpochMilli, - eventMetadata = metadata, + _eventMetadata = metadata, PersistenceId.extractEntityType(event.persistenceId), event.slice, filtered = false, @@ -213,7 +213,7 @@ private[akka] object ProtobufProtocolConversions { filtered.seqNr, None, eventOffset.timestamp.toEpochMilli, - eventMetadata = None, + _eventMetadata = None, PersistenceId.extractEntityType(filtered.persistenceId), filtered.slice, filtered = true, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala index b45983b44..3a4bdc328 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala @@ -36,10 +36,10 @@ import akka.projection.grpc.internal.proto.ReplicaInfo if (envelope.eventOption.isEmpty) true else - envelope.eventMetadata match { - case Some(meta: ReplicatedEventMetadata) => + envelope.metadata[ReplicatedEventMetadata] match { + case Some(meta) => !exclude(meta.originReplica) - case _ => + case None => throw new IllegalArgumentException( s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})") } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index c85cd2155..c17db1c8f 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -103,7 +103,7 @@ private[akka] object ReplicationImpl { settings.producerFilter) .withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId)) .withReplicatedEventMetadataTransformation(env => - if (env.eventMetadata.isDefined) None + if (env.metadata[ReplicatedEventMetadata].isDefined) None else { // migrated from non-replicated, fill in metadata Some( @@ -218,8 +218,8 @@ private[akka] object ReplicationImpl { envelope.persistenceId) { case (envelope, _) => if (!envelope.filtered) { - envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) + envelope.metadata[ReplicatedEventMetadata] match { + case Some(replicatedEventMetadata) if replicatedEventMetadata.originReplica == settings.selfReplicaId => // skipping events originating from self replica (break cycle) if (log.isTraceEnabled) @@ -231,7 +231,7 @@ private[akka] object ReplicationImpl { envelope.sequenceNr) Future.successful(Done) - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => + case Some(replicatedEventMetadata) => val replicationId = ReplicationId.fromString(envelope.persistenceId) val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId) val entityRef = @@ -251,9 +251,11 @@ private[akka] object ReplicationImpl { replicatedEventMetadata.originSequenceNr, envelope.event, envelope.timestamp, - Some(new ReplicatedPublishedEventMetaData( - replicatedEventMetadata.originReplica, - replicatedEventMetadata.version)), + Some( + new ReplicatedPublishedEventMetaData( + replicatedEventMetadata.originReplica, + replicatedEventMetadata.version, + envelope.internalEventMetadata)), Some(replyTo))) askResult.failed.foreach(error => log.warn( @@ -360,7 +362,7 @@ private[akka] object ReplicationImpl { settings.eventProducerSettings.withAkkaSerializationOnly()) .withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId)) .withReplicatedEventMetadataTransformation(env => - if (env.eventMetadata.isDefined) None + if (env.metadata[ReplicatedEventMetadata].isDefined) None else { // migrated from non-replicated, fill in metadata Some( diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala index 4812d8a37..8172de34b 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala @@ -395,7 +395,7 @@ class R2dbcTimestampOffsetProjectionSpec env.sequenceNr, eventOption = None, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, env.filtered, @@ -484,7 +484,7 @@ class R2dbcTimestampOffsetProjectionSpec env.sequenceNr, env.eventOption, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, filtered = true, diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index dbd7520a0..6065d3001 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -96,7 +96,7 @@ class R2dbcTimestampOffsetStoreSpec env.sequenceNr, eventOption = None, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, env.filtered, @@ -109,7 +109,7 @@ class R2dbcTimestampOffsetStoreSpec env.sequenceNr, env.eventOption, env.timestamp, - env.eventMetadata, + env.internalEventMetadata, env.entityType, env.slice, filtered = true, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1e5173c16..949293b85 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,14 +9,14 @@ object Dependencies { // sync with Java version in .github/workflows/release.yml#documentation lazy val JavaDocLinkVersion = 17 - val Scala213 = "2.13.15" + val Scala213 = "2.13.16" val Scala3 = "3.3.4" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 object Versions { - val Akka = sys.props.getOrElse("build.akka.version", "2.10.0") + val Akka = sys.props.getOrElse("build.akka.version", "2.10.0+53-8ec61a38-SNAPSHOT") val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } val Alpakka = "9.0.0" @@ -35,7 +35,7 @@ object Dependencies { val AkkaPersistenceCassandra = "1.3.0" val AkkaPersistenceJdbc = "5.5.0" - val AkkaPersistenceR2dbc = "1.3.1" + val AkkaPersistenceR2dbc = "1.3.1+4-d1a01e21-SNAPSHOT" val AkkaPersistenceR2dbcVersionInDocs = VersionNumber(AkkaPersistenceR2dbc).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } From 1f4432cf909495b9d11bbbedd27283b28bc82795 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 24 Jan 2025 15:58:14 +0100 Subject: [PATCH 2/3] in samples --- samples/grpc/iot-service-java/pom.xml | 4 ++-- samples/grpc/iot-service-scala/build.sbt | 6 +++--- samples/grpc/local-drone-control-java/pom.xml | 4 ++-- .../autoscaling/simulator/build.sbt | 2 +- samples/grpc/local-drone-control-scala/build.sbt | 6 +++--- .../grpc/restaurant-drone-deliveries-service-java/pom.xml | 4 ++-- .../restaurant-drone-deliveries-service-scala/build.sbt | 6 +++--- samples/grpc/shopping-analytics-service-java/pom.xml | 4 ++-- samples/grpc/shopping-analytics-service-scala/build.sbt | 6 +++--- samples/grpc/shopping-cart-service-java/pom.xml | 4 ++-- samples/grpc/shopping-cart-service-scala/build.sbt | 6 +++--- samples/replicated/shopping-cart-service-java/pom.xml | 4 ++-- samples/replicated/shopping-cart-service-scala/build.sbt | 6 +++--- 13 files changed, 31 insertions(+), 31 deletions(-) diff --git a/samples/grpc/iot-service-java/pom.xml b/samples/grpc/iot-service-java/pom.xml index 948bd3626..0a90f228e 100644 --- a/samples/grpc/iot-service-java/pom.xml +++ b/samples/grpc/iot-service-java/pom.xml @@ -17,9 +17,9 @@ UTF-8 - 2.10.0 + 2.10.1 1.6.7 - 1.3.1 + 1.3.2 1.6.0 2.2.0 2.5.0 diff --git a/samples/grpc/iot-service-scala/build.sbt b/samples/grpc/iot-service-scala/build.sbt index d53d08007..da0f2341d 100644 --- a/samples/grpc/iot-service-scala/build.sbt +++ b/samples/grpc/iot-service-scala/build.sbt @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0") resolvers += "Akka library repository".at("https://repo.akka.io/maven") -scalaVersion := "2.13.15" +scalaVersion := "2.13.16" Compile / scalacOptions ++= Seq( "-release:11", @@ -28,10 +28,10 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.10.0" +val AkkaVersion = "2.10.1" val AkkaHttpVersion = "10.7.0" val AkkaManagementVersion = "1.6.0" -val AkkaPersistenceR2dbcVersion = "1.3.1" +val AkkaPersistenceR2dbcVersion = "1.3.2" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.6.7") val AkkaDiagnosticsVersion = "2.2.0" diff --git a/samples/grpc/local-drone-control-java/pom.xml b/samples/grpc/local-drone-control-java/pom.xml index 54d8773a3..0cc412be2 100644 --- a/samples/grpc/local-drone-control-java/pom.xml +++ b/samples/grpc/local-drone-control-java/pom.xml @@ -17,9 +17,9 @@ UTF-8 - 2.10.0 + 2.10.1 1.6.7 - 1.3.1 + 1.3.2 1.6.0 2.2.0 10.7.0 diff --git a/samples/grpc/local-drone-control-scala/autoscaling/simulator/build.sbt b/samples/grpc/local-drone-control-scala/autoscaling/simulator/build.sbt index 695c358ce..acdc65d46 100644 --- a/samples/grpc/local-drone-control-scala/autoscaling/simulator/build.sbt +++ b/samples/grpc/local-drone-control-scala/autoscaling/simulator/build.sbt @@ -1,4 +1,4 @@ -scalaVersion := "2.13.15" +scalaVersion := "2.13.16" enablePlugins(GatlingPlugin) diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index bda4113d5..898ef9d57 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0") resolvers += "Akka library repository".at("https://repo.akka.io/maven") -scalaVersion := "2.13.15" +scalaVersion := "2.13.16" Compile / scalacOptions ++= Seq( "-release:11", @@ -30,10 +30,10 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.10.0" +val AkkaVersion = "2.10.1" val AkkaHttpVersion = "10.7.0" val AkkaManagementVersion = "1.6.0" -val AkkaPersistenceR2dbcVersion = "1.3.1" +val AkkaPersistenceR2dbcVersion = "1.3.2" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.6.7") val AkkaDiagnosticsVersion = "2.2.0" diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml index cbde0e5a5..52c51e22c 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml +++ b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml @@ -17,9 +17,9 @@ UTF-8 - 2.10.0 + 2.10.1 1.6.7 - 1.3.1 + 1.3.2 1.6.0 2.2.0 2.5.0 diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt index be3918a5f..228bf16d0 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0") resolvers += "Akka library repository".at("https://repo.akka.io/maven") -scalaVersion := "2.13.15" +scalaVersion := "2.13.16" Compile / scalacOptions ++= Seq( "-release:11", @@ -28,10 +28,10 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.10.0" +val AkkaVersion = "2.10.1" val AkkaHttpVersion = "10.7.0" val AkkaManagementVersion = "1.6.0" -val AkkaPersistenceR2dbcVersion = "1.3.1" +val AkkaPersistenceR2dbcVersion = "1.3.2" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.6.7") val AkkaDiagnosticsVersion = "2.2.0" diff --git a/samples/grpc/shopping-analytics-service-java/pom.xml b/samples/grpc/shopping-analytics-service-java/pom.xml index 96984d6f2..df7b820fe 100644 --- a/samples/grpc/shopping-analytics-service-java/pom.xml +++ b/samples/grpc/shopping-analytics-service-java/pom.xml @@ -17,9 +17,9 @@ UTF-8 - 2.10.0 + 2.10.1 1.6.7 - 1.3.1 + 1.3.2 1.6.0 2.2.0 2.5.0 diff --git a/samples/grpc/shopping-analytics-service-scala/build.sbt b/samples/grpc/shopping-analytics-service-scala/build.sbt index 6900056ab..88fa9fc4b 100644 --- a/samples/grpc/shopping-analytics-service-scala/build.sbt +++ b/samples/grpc/shopping-analytics-service-scala/build.sbt @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0") resolvers += "Akka library repository".at("https://repo.akka.io/maven") -scalaVersion := "2.13.15" +scalaVersion := "2.13.16" Compile / scalacOptions ++= Seq( "-target:11", @@ -28,10 +28,10 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.10.0" +val AkkaVersion = "2.10.1" val AkkaHttpVersion = "10.7.0" val AkkaManagementVersion = "1.6.0" -val AkkaPersistenceR2dbcVersion = "1.3.1" +val AkkaPersistenceR2dbcVersion = "1.3.2" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.6.7") val AkkaDiagnosticsVersion = "2.2.0" diff --git a/samples/grpc/shopping-cart-service-java/pom.xml b/samples/grpc/shopping-cart-service-java/pom.xml index 6f36cc2f4..f3dc04323 100644 --- a/samples/grpc/shopping-cart-service-java/pom.xml +++ b/samples/grpc/shopping-cart-service-java/pom.xml @@ -17,9 +17,9 @@ UTF-8 - 2.10.0 + 2.10.1 1.6.7 - 1.3.1 + 1.3.2 1.6.0 2.2.0 2.5.0 diff --git a/samples/grpc/shopping-cart-service-scala/build.sbt b/samples/grpc/shopping-cart-service-scala/build.sbt index 499e64874..33a6bd564 100644 --- a/samples/grpc/shopping-cart-service-scala/build.sbt +++ b/samples/grpc/shopping-cart-service-scala/build.sbt @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0") resolvers += "Akka library repository".at("https://repo.akka.io/maven") -scalaVersion := "2.13.15" +scalaVersion := "2.13.16" Compile / scalacOptions ++= Seq( "-target:11", @@ -28,10 +28,10 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.10.0" +val AkkaVersion = "2.10.1" val AkkaHttpVersion = "10.7.0" val AkkaManagementVersion = "1.6.0" -val AkkaPersistenceR2dbcVersion = "1.3.1" +val AkkaPersistenceR2dbcVersion = "1.3.2" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.6.7") val AkkaDiagnosticsVersion = "2.2.0" diff --git a/samples/replicated/shopping-cart-service-java/pom.xml b/samples/replicated/shopping-cart-service-java/pom.xml index 3f7ede187..987651c58 100644 --- a/samples/replicated/shopping-cart-service-java/pom.xml +++ b/samples/replicated/shopping-cart-service-java/pom.xml @@ -17,9 +17,9 @@ UTF-8 - 2.10.0 + 2.10.1 1.6.7 - 1.3.1 + 1.3.2 1.6.0 2.2.0 2.5.0 diff --git a/samples/replicated/shopping-cart-service-scala/build.sbt b/samples/replicated/shopping-cart-service-scala/build.sbt index bdc28841e..20726b6b2 100644 --- a/samples/replicated/shopping-cart-service-scala/build.sbt +++ b/samples/replicated/shopping-cart-service-scala/build.sbt @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0") resolvers += "Akka library repository".at("https://repo.akka.io/maven") -scalaVersion := "2.13.15" +scalaVersion := "2.13.16" Compile / scalacOptions ++= Seq( "-release", @@ -29,10 +29,10 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = sys.props.getOrElse("akka.version", "2.10.0") +val AkkaVersion = sys.props.getOrElse("akka.version", "2.10.1") val AkkaHttpVersion = "10.7.0" val AkkaManagementVersion = "1.6.0" -val AkkaPersistenceR2dbcVersion = "1.3.1" +val AkkaPersistenceR2dbcVersion = "1.3.2" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.6.7") val AkkaDiagnosticsVersion = "2.2.0" From d6d88d6d9253ccf6e65e462827f6350520626af2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 24 Jan 2025 16:28:37 +0100 Subject: [PATCH 3/3] include slice in log --- .../akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 20e2d3c34..11193d9e5 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -196,7 +196,7 @@ private[projection] class PostgresOffsetStoreDao( slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = { r2dbcExecutor.select("read timestamp offset")( conn => { - logger.trace("reading timestamp offset for [{}]", projectionId) + logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId) conn .createStatement(selectTimestampOffsetSql) .bind(0, slice)