Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bump: Akka core 2.10.1, r2dbc 3.1.2 #1305

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
.loadEnvelope[String](pid.id, sequenceNr = 1L)
.futureValue
env.filtered shouldBe true
env.eventMetadata shouldBe None
env.internalEventMetadata shouldBe None
env.eventOption.isEmpty shouldBe true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class EventProducerServiceSpec
EventProducerSource(entityType7, streamId7, transformation, settings)
.withReplicatedEventMetadataTransformation(
env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {
"transform low level with metadata" in {
val transformer =
Transformation.empty.registerAsyncEnvelopeMapper((env: EventEnvelope[String]) =>
Future.successful(env.eventMetadata))
Future.successful(env.metadata[String]))
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
}

Expand All @@ -63,7 +63,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {

"fallback low level with metadata if no transformer exist for event" in {
val transformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper((env: EventEnvelope[Any]) =>
Future.successful(env.eventMetadata))
Future.successful(env.metadata[String]))
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private[akka] object EventPusherConsumerServiceImpl {
log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId)
Future.successful(Done)
} else {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
envelope.metadata[ReplicatedEventMetadata] match {
case Some(replicatedEventMetadata) =>
// send event to entity in this replica
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(replicationSettings.selfReplicaId)
Expand All @@ -104,7 +104,8 @@ private[akka] object EventPusherConsumerServiceImpl {
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
replicatedEventMetadata.version,
envelope.internalEventMetadata)),
Some(replyTo)))
}

Expand All @@ -123,9 +124,9 @@ private[akka] object EventPusherConsumerServiceImpl {
error))
askResult

case unexpected =>
case None =>
throw new IllegalArgumentException(
s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
s"Missing replication metadata (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
", is the remote entity really a Replicated Event Sourced Entity?")
}
}
Expand All @@ -141,7 +142,7 @@ private[akka] object EventPusherConsumerServiceImpl {
event = envelope.eventOption.getOrElse(FilteredPayload),
isSnapshotEvent = fromSnapshot(envelope),
fillSequenceNumberGaps = fillSequenceNumberGaps,
metadata = envelope.eventMetadata,
metadata = envelope.internalEventMetadata,
tags = envelope.tags,
replyTo = replyTo))(d.settings.journalWriteTimeout, system.scheduler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ import scala.concurrent.ExecutionContext
val pid = env.persistenceId

// replicaId is used for validation of replay requests, to avoid replay for other replicas
if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata]))
if (replicaId.isEmpty && env.metadata[ReplicatedEventMetadata].isDefined)
replicaId = Some(ReplicationId.fromString(pid).replicaId)

if (producerFilter(env) && filter.matches(env)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[akka] object ProtobufProtocolConversions {

def toEvent(transformedEvent: Any): Event = {
val protoEvent = protoAnySerialization.serialize(transformedEvent)
val metadata = env.eventMetadata.map(protoAnySerialization.serialize)
val metadata = env.internalEventMetadata.map(protoAnySerialization.serialize)
Event(
persistenceId = env.persistenceId,
seqNr = env.sequenceNr,
Expand Down Expand Up @@ -169,7 +169,7 @@ private[akka] object ProtobufProtocolConversions {
event.seqNr,
evt,
eventOffset.timestamp.toEpochMilli,
eventMetadata = metadata,
_eventMetadata = metadata,
PersistenceId.extractEntityType(event.persistenceId),
event.slice,
filtered = false,
Expand All @@ -188,7 +188,7 @@ private[akka] object ProtobufProtocolConversions {
event.seqNr,
eventOption = Some(serializedEvent),
eventOffset.timestamp.toEpochMilli,
eventMetadata = metadata,
_eventMetadata = metadata,
PersistenceId.extractEntityType(event.persistenceId),
event.slice,
filtered = false,
Expand All @@ -213,7 +213,7 @@ private[akka] object ProtobufProtocolConversions {
filtered.seqNr,
None,
eventOffset.timestamp.toEpochMilli,
eventMetadata = None,
_eventMetadata = None,
PersistenceId.extractEntityType(filtered.persistenceId),
filtered.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import akka.projection.grpc.internal.proto.ReplicaInfo
if (envelope.eventOption.isEmpty)
true
else
envelope.eventMetadata match {
case Some(meta: ReplicatedEventMetadata) =>
envelope.metadata[ReplicatedEventMetadata] match {
case Some(meta) =>
!exclude(meta.originReplica)
case _ =>
case None =>
throw new IllegalArgumentException(
s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[akka] object ReplicationImpl {
settings.producerFilter)
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
.withReplicatedEventMetadataTransformation(env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down Expand Up @@ -218,8 +218,8 @@ private[akka] object ReplicationImpl {
envelope.persistenceId) {
case (envelope, _) =>
if (!envelope.filtered) {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata)
envelope.metadata[ReplicatedEventMetadata] match {
case Some(replicatedEventMetadata)
if replicatedEventMetadata.originReplica == settings.selfReplicaId =>
// skipping events originating from self replica (break cycle)
if (log.isTraceEnabled)
Expand All @@ -231,7 +231,7 @@ private[akka] object ReplicationImpl {
envelope.sequenceNr)
Future.successful(Done)

case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
case Some(replicatedEventMetadata) =>
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId)
val entityRef =
Expand All @@ -251,9 +251,11 @@ private[akka] object ReplicationImpl {
replicatedEventMetadata.originSequenceNr,
envelope.event,
envelope.timestamp,
Some(new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version,
envelope.internalEventMetadata)),
Some(replyTo)))
askResult.failed.foreach(error =>
log.warn(
Expand Down Expand Up @@ -360,7 +362,7 @@ private[akka] object ReplicationImpl {
settings.eventProducerSettings.withAkkaSerializationOnly())
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
.withReplicatedEventMetadataTransformation(env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class R2dbcTimestampOffsetProjectionSpec
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand Down Expand Up @@ -484,7 +484,7 @@ class R2dbcTimestampOffsetProjectionSpec
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class R2dbcTimestampOffsetStoreSpec
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand All @@ -109,7 +109,7 @@ class R2dbcTimestampOffsetStoreSpec
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[projection] class PostgresOffsetStoreDao(
slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = {
r2dbcExecutor.select("read timestamp offset")(
conn => {
logger.trace("reading timestamp offset for [{}]", projectionId)
logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId)
conn
.createStatement(selectTimestampOffsetSql)
.bind(0, slice)
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ object Dependencies {
// sync with Java version in .github/workflows/release.yml#documentation
lazy val JavaDocLinkVersion = 17

val Scala213 = "2.13.15"
val Scala213 = "2.13.16"
val Scala3 = "3.3.4"

val Scala2Versions = Seq(Scala213)
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3

object Versions {
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0")
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0+53-8ec61a38-SNAPSHOT")
val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }

val Alpakka = "9.0.0"
Expand All @@ -35,7 +35,7 @@ object Dependencies {
val AkkaPersistenceCassandra = "1.3.0"
val AkkaPersistenceJdbc = "5.5.0"

val AkkaPersistenceR2dbc = "1.3.1"
val AkkaPersistenceR2dbc = "1.3.1+4-d1a01e21-SNAPSHOT"
val AkkaPersistenceR2dbcVersionInDocs = VersionNumber(AkkaPersistenceR2dbc).numbers match {
case Seq(major, minor, _*) => s"$major.$minor"
}
Expand Down
4 changes: 2 additions & 2 deletions samples/grpc/iot-service-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.10.0</akka.version>
<akka.version>2.10.1</akka.version>
<akka-projection.version>1.6.7</akka-projection.version>
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
<akka-management.version>1.6.0</akka-management.version>
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
<akka-grpc.version>2.5.0</akka-grpc.version>
Expand Down
6 changes: 3 additions & 3 deletions samples/grpc/iot-service-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")

resolvers += "Akka library repository".at("https://repo.akka.io/maven")

scalaVersion := "2.13.15"
scalaVersion := "2.13.16"

Compile / scalacOptions ++= Seq(
"-release:11",
Expand All @@ -28,10 +28,10 @@ run / javaOptions ++= sys.props
.fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res"))
Global / cancelable := false // ctrl-c

val AkkaVersion = "2.10.0"
val AkkaVersion = "2.10.1"
val AkkaHttpVersion = "10.7.0"
val AkkaManagementVersion = "1.6.0"
val AkkaPersistenceR2dbcVersion = "1.3.1"
val AkkaPersistenceR2dbcVersion = "1.3.2"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.6.7")
val AkkaDiagnosticsVersion = "2.2.0"
Expand Down
4 changes: 2 additions & 2 deletions samples/grpc/local-drone-control-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.10.0</akka.version>
<akka.version>2.10.1</akka.version>
<akka-projection.version>1.6.7</akka-projection.version>
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
<akka-management.version>1.6.0</akka-management.version>
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
<akka-http.version>10.7.0</akka-http.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
scalaVersion := "2.13.15"
scalaVersion := "2.13.16"

enablePlugins(GatlingPlugin)

Expand Down
6 changes: 3 additions & 3 deletions samples/grpc/local-drone-control-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")

resolvers += "Akka library repository".at("https://repo.akka.io/maven")

scalaVersion := "2.13.15"
scalaVersion := "2.13.16"

Compile / scalacOptions ++= Seq(
"-release:11",
Expand All @@ -30,10 +30,10 @@ run / javaOptions ++= sys.props
.fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res"))
Global / cancelable := false // ctrl-c

val AkkaVersion = "2.10.0"
val AkkaVersion = "2.10.1"
val AkkaHttpVersion = "10.7.0"
val AkkaManagementVersion = "1.6.0"
val AkkaPersistenceR2dbcVersion = "1.3.1"
val AkkaPersistenceR2dbcVersion = "1.3.2"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.6.7")
val AkkaDiagnosticsVersion = "2.2.0"
Expand Down
4 changes: 2 additions & 2 deletions samples/grpc/restaurant-drone-deliveries-service-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.10.0</akka.version>
<akka.version>2.10.1</akka.version>
<akka-projection.version>1.6.7</akka-projection.version>
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
<akka-management.version>1.6.0</akka-management.version>
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
<akka-grpc.version>2.5.0</akka-grpc.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")

resolvers += "Akka library repository".at("https://repo.akka.io/maven")

scalaVersion := "2.13.15"
scalaVersion := "2.13.16"

Compile / scalacOptions ++= Seq(
"-release:11",
Expand All @@ -28,10 +28,10 @@ run / javaOptions ++= sys.props
.fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res"))
Global / cancelable := false // ctrl-c

val AkkaVersion = "2.10.0"
val AkkaVersion = "2.10.1"
val AkkaHttpVersion = "10.7.0"
val AkkaManagementVersion = "1.6.0"
val AkkaPersistenceR2dbcVersion = "1.3.1"
val AkkaPersistenceR2dbcVersion = "1.3.2"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.6.7")
val AkkaDiagnosticsVersion = "2.2.0"
Expand Down
4 changes: 2 additions & 2 deletions samples/grpc/shopping-analytics-service-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.10.0</akka.version>
<akka.version>2.10.1</akka.version>
<akka-projection.version>1.6.7</akka-projection.version>
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
<akka-management.version>1.6.0</akka-management.version>
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
<akka-grpc.version>2.5.0</akka-grpc.version>
Expand Down
Loading
Loading