Skip to content

Commit

Permalink
fix: A few smaller follow ups for the edge replication (#1094)
Browse files Browse the repository at this point in the history
* Java types in Java DSL
* Aligned Java function types (throwing factories vs non-throwing where user calls)
* Wrong type of replication impl in Java API fixed
* Initial consumer filter for edge replication
* Pass initial consumer filters to producer for RES
* Include internal replication settings when converting back and forth to Java API EventProducerPushDestination
* JavaDSL test for edge replication
* Fixes and refactoring of SourceProviderAdapters 

---------

Co-authored-by: Patrik Nordwall <[email protected]>
  • Loading branch information
johanandren and patriknw authored Dec 13, 2023
1 parent d410abe commit 9985eb9
Show file tree
Hide file tree
Showing 21 changed files with 913 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.projection.internal.GroupedHandlerStrategy
import akka.projection.internal.HandlerAdapter
import akka.projection.internal.NoopStatusObserver
import akka.projection.internal.SingleHandlerStrategy
import akka.projection.internal.SourceProviderAdapter
import akka.projection.internal.JavaToScalaSourceProviderAdapter
import akka.projection.javadsl.AtLeastOnceFlowProjection
import akka.projection.javadsl.AtLeastOnceProjection
import akka.projection.javadsl.AtMostOnceProjection
Expand Down Expand Up @@ -52,7 +52,7 @@ object CassandraProjection {
handler: Supplier[Handler[Envelope]]): AtLeastOnceProjection[Offset, Envelope] =
new CassandraProjectionImpl(
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
Expand All @@ -76,7 +76,7 @@ object CassandraProjection {
handler: Supplier[Handler[java.util.List[Envelope]]]): GroupedProjection[Offset, Envelope] =
new CassandraProjectionImpl[Offset, Envelope](
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy =
Expand Down Expand Up @@ -112,7 +112,7 @@ object CassandraProjection {
: AtLeastOnceFlowProjection[Offset, Envelope] =
new CassandraProjectionImpl(
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
Expand All @@ -130,7 +130,7 @@ object CassandraProjection {
handler: Supplier[Handler[Envelope]]): AtMostOnceProjection[Offset, Envelope] =
new CassandraProjectionImpl(
projectionId,
new SourceProviderAdapter(sourceProvider),
new JavaToScalaSourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtMostOnce(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# internal/renamed
ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.ScalaBySlicesSourceProviderAdapter")
# (was internal stable but only because it was historically used by akka-persistence-r2dbc)
ProblemFilters.exclude[MissingClassProblem]("akka.projection.internal.SourceProviderAdapter")
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,68 @@
* Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.r2dbc.internal
package akka.projection.internal

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.concurrent.Future
import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.stream.scaladsl.Source

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._

import scala.compat.java8.OptionConverters._
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import scala.concurrent.Future

@InternalApi private[projection] object JavaToScalaBySliceSourceProviderAdapter {
def apply[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope]): scaladsl.SourceProvider[Offset, Envelope] =
delegate match {
case adapted: ScalaToJavaBySlicesSourceProviderAdapter[_, _] =>
// just unwrap rather than wrapping further
adapted.delegate
case delegate: BySlicesSourceProvider with CanTriggerReplay =>
new JavaToScalaBySliceSourceProviderAdapterWithCanTriggerReplay(delegate)
case _: BySlicesSourceProvider => new JavaToScalaBySliceSourceProviderAdapter(delegate)
case _ => new JavaToScalaSourceProviderAdapter(delegate)
}
}

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
@InternalApi private[projection] class BySliceSourceProviderAdapter[Offset, Envelope](
private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope] {

def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava
}
delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
}

def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)

def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)
}

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
*/
@InternalApi private[projection] sealed class JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope] private[internal] (
val delegate: javadsl.SourceProvider[Offset, Envelope])
extends scaladsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider
with EventTimestampQuery
Expand Down Expand Up @@ -75,4 +110,18 @@ import akka.projection.BySlicesSourceProvider
s"Expected SourceProvider [${delegate.getClass.getName}] to implement " +
s"EventTimestampQuery when LoadEventQuery is used."))
}

}

/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider that also implements
* CanTriggerReplay
*/
@InternalApi
private[projection] final class JavaToScalaBySliceSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope] private[internal] (
delegate: javadsl.SourceProvider[Offset, Envelope] with CanTriggerReplay)
extends JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope](delegate)
with CanTriggerReplay {
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.internal

import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.javadsl.EventTimestampQuery
import akka.persistence.query.typed.javadsl.LoadEventQuery
import akka.persistence.query.typed.scaladsl.{ EventTimestampQuery => ScalaEventTimestampQuery }
import akka.persistence.query.typed.scaladsl.{ LoadEventQuery => ScalaLoadEventQuery }
import akka.projection.BySlicesSourceProvider
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.stream.javadsl.{ Source => JSource }

import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi private[projection] object ScalaToJavaBySlicesSourceProviderAdapter {
def apply[Offset, Envelope](
delegate: scaladsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider): javadsl.SourceProvider[Offset, Envelope] =
delegate match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] =>
// just unwrap rather than wrapping further
adapted.delegate
case delegate: CanTriggerReplay => new ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay(delegate)
case _ => new ScalaToJavaBySlicesSourceProviderAdapter(delegate)
}
}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi
private[projection] sealed class ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope] private[internal] (
val delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider)
extends javadsl.SourceProvider[Offset, Envelope]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
override def source(
offset: Supplier[CompletionStage[Optional[Offset]]]): CompletionStage[JSource[Envelope, NotUsed]] =
delegate
.source(() => offset.get().toScala.map(_.asScala)(ExecutionContexts.parasitic))
.map(_.asJava)(ExecutionContexts.parasitic)
.toJava

override def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)

override def extractCreationTime(envelope: Envelope): Long = delegate.extractCreationTime(envelope)

def minSlice: Int = delegate.minSlice

def maxSlice: Int = delegate.maxSlice

override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] =
delegate match {
case etq: ScalaEventTimestampQuery =>
etq.timestampOf(persistenceId, sequenceNr).map(_.asJava)(ExecutionContexts.parasitic).toJava
case _ =>
throw new IllegalStateException(
s"timestampOf was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.EventTimestampQuery")
}

override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] =
delegate match {
case etq: ScalaLoadEventQuery =>
etq.loadEnvelope[Event](persistenceId, sequenceNr).toJava
case _ =>
throw new IllegalStateException(
s"loadEnvelope was called but delegate of type [${delegate.getClass}] does not implement akka.persistence.query.typed.scaladsl.LoadEventQuery")
}

}

/**
* INTERNAL API: Adapter from scaladsl.SourceProvider with BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
*/
@InternalApi
private[projection] final class ScalaToJavaBySlicesSourceProviderAdapterWithCanTriggerReplay[Offset, Envelope] private[internal] (
delegate: scaladsl.SourceProvider[Offset, Envelope] with BySlicesSourceProvider with CanTriggerReplay)
extends ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope](delegate)
with CanTriggerReplay {

override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
package akka.projection.scaladsl

import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -23,6 +21,9 @@ import akka.projection.ProjectionId
import akka.util.JavaDurationConverters._
import akka.util.Timeout

import java.net.URLEncoder
import java.nio.charset.StandardCharsets

object ProjectionManagement extends ExtensionId[ProjectionManagement] {
def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system)

Expand Down Expand Up @@ -50,7 +51,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
private def topic(projectionName: String): ActorRef[Topic.Command[ProjectionManagementCommand]] = {
topics.computeIfAbsent(projectionName, _ => {
val name = topicName(projectionName)
system.systemActorOf(Topic[ProjectionManagementCommand](name), name)
system.systemActorOf(Topic[ProjectionManagementCommand](name), sanitizeActorName(name))
})
}

Expand Down Expand Up @@ -151,4 +152,7 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
}
retry(() => askSetPaused())
}

private def sanitizeActorName(text: String): String =
URLEncoder.encode(text, StandardCharsets.UTF_8.name())
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
otherReplicas,
10.seconds,
8,
R2dbcReplication())
R2dbcReplication()(replicaSystem))
.withEdgeReplication(true)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf)
otherReplicas,
10.seconds,
8,
R2dbcReplication())
R2dbcReplication()(replicaSystem))
}

selfReplicaId match {
Expand Down
Loading

0 comments on commit 9985eb9

Please sign in to comment.