From 8847dfa08711f88d66a0be4a2c61081933d66dc6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 5 Jun 2020 13:22:32 +0200 Subject: [PATCH] withRestartBackoff settings directly on Projection, #177 * moved ProjectionSettings to internal * cleanup (reduce DRY) of with methods in projection impl --- .../internal/CassandraProjectionImpl.scala | 43 ++--- .../javadsl/CassandraProjection.scala | 62 ++++++- .../scaladsl/CassandraProjection.scala | 62 ++++++- .../cassandra/CassandraProjectionSpec.scala | 7 +- .../scala/akka/projection/Projection.scala | 47 ++++- .../akka/projection/ProjectionSettings.scala | 175 ------------------ .../internal/ProjectionSettings.scala | 125 +++++++++++++ .../ProjectionBehaviorCompileTest.java | 24 +++ .../projection/ProjectionBehaviorSpec.scala | 12 +- .../KafkaSourceProviderImplSpec.scala | 14 +- .../projection/slick/SlickProjection.scala | 70 ++++++- .../slick/internal/SlickProjectionImpl.scala | 23 ++- .../slick/SlickProjectionSpec.scala | 7 +- .../testkit/javadsl/ProjectionTestKit.scala | 5 +- .../testkit/scaladsl/ProjectionTestKit.scala | 5 +- .../javadsl/ProjectionTestKitTest.java | 24 ++- .../scaladsl/ProjectionTestKitSpec.scala | 13 +- docs/src/main/paradox/projection-settings.md | 4 +- .../CassandraProjectionDocExample.java | 24 +-- .../java/jdocs/testkit/TestKitDocExample.java | 23 ++- .../CassandraProjectionDocExample.scala | 12 +- 21 files changed, 498 insertions(+), 283 deletions(-) delete mode 100644 akka-projection-core/src/main/scala/akka/projection/ProjectionSettings.scala create mode 100644 akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala diff --git a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/internal/CassandraProjectionImpl.scala b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/internal/CassandraProjectionImpl.scala index 2da53c181..4e4e467e5 100644 --- a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/internal/CassandraProjectionImpl.scala +++ b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/internal/CassandraProjectionImpl.scala @@ -11,7 +11,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration -import scala.jdk.DurationConverters._ import scala.util.control.NonFatal import akka.Done @@ -22,7 +21,6 @@ import akka.event.Logging import akka.projection.HandlerRecoveryStrategy import akka.projection.ProjectionId import akka.projection.ProjectionOffsetManagement -import akka.projection.ProjectionSettings import akka.projection.RunningProjection import akka.projection.RunningProjection.AbortProjectionException import akka.projection.StatusObserver @@ -35,6 +33,9 @@ import akka.projection.scaladsl.HandlerLifecycle import akka.projection.scaladsl.SourceProvider import akka.projection.OffsetVerification.VerificationFailure import akka.projection.OffsetVerification.VerificationSuccess +import akka.projection.internal.ProjectionSettings +import akka.projection.internal.RestartBackoffSettings +import akka.projection.internal.SettingsImpl import akka.stream.KillSwitches import akka.stream.SharedKillSwitch import akka.stream.scaladsl.Flow @@ -79,6 +80,7 @@ import akka.stream.scaladsl.Source override val projectionId: ProjectionId, sourceProvider: SourceProvider[Offset, Envelope], settingsOpt: Option[ProjectionSettings], + restartBackoffOpt: Option[RestartBackoffSettings], val offsetStrategy: CassandraProjectionImpl.OffsetStrategy, handlerStrategy: CassandraProjectionImpl.HandlerStrategy[Envelope], override val statusObserver: StatusObserver[Envelope]) @@ -91,12 +93,14 @@ import akka.stream.scaladsl.Source with javadsl.AtMostOnceCassandraProjection[Envelope] with scaladsl.AtMostOnceCassandraProjection[Envelope] with scaladsl.AtLeastOnceFlowCassandraProjection[Envelope] - with javadsl.AtLeastOnceFlowCassandraProjection[Envelope] { + with javadsl.AtLeastOnceFlowCassandraProjection[Envelope] + with SettingsImpl[CassandraProjectionImpl[Offset, Envelope]] { import CassandraProjectionImpl._ private def copy( settingsOpt: Option[ProjectionSettings] = this.settingsOpt, + restartBackoffOpt: Option[RestartBackoffSettings] = this.restartBackoffOpt, offsetStrategy: OffsetStrategy = this.offsetStrategy, handlerStrategy: CassandraProjectionImpl.HandlerStrategy[Envelope] = this.handlerStrategy, statusObserver: StatusObserver[Envelope] = this.statusObserver): CassandraProjectionImpl[Offset, Envelope] = @@ -104,6 +108,7 @@ import akka.stream.scaladsl.Source projectionId, sourceProvider, settingsOpt, + restartBackoffOpt, offsetStrategy, handlerStrategy, statusObserver) @@ -111,6 +116,10 @@ import akka.stream.scaladsl.Source override def withSettings(settings: ProjectionSettings): CassandraProjectionImpl[Offset, Envelope] = copy(settingsOpt = Option(settings)) + override def withRestartBackoffSettings( + restartBackoff: RestartBackoffSettings): CassandraProjectionImpl[Offset, Envelope] = + copy(restartBackoffOpt = Some(restartBackoff)) + /** * Settings for AtLeastOnceCassandraProjection */ @@ -120,15 +129,6 @@ import akka.stream.scaladsl.Source copy(offsetStrategy = atLeastOnceStrategy .copy(afterEnvelopes = Some(afterEnvelopes), orAfterDuration = Some(afterDuration))) - /** - * Java API - */ - override def withSaveOffset( - afterEnvelopes: Int, - afterDuration: java.time.Duration): CassandraProjectionImpl[Offset, Envelope] = - copy(offsetStrategy = atLeastOnceStrategy - .copy(afterEnvelopes = Some(afterEnvelopes), orAfterDuration = Some(afterDuration.toScala))) - /** * Settings for GroupedCassandraProjection */ @@ -139,16 +139,6 @@ import akka.stream.scaladsl.Source .asInstanceOf[GroupedHandlerStrategy[Envelope]] .copy(afterEnvelopes = Some(groupAfterEnvelopes), orAfterDuration = Some(groupAfterDuration))) - /** - * Java API - */ - override def withGroup( - groupAfterEnvelopes: Int, - groupAfterDuration: java.time.Duration): CassandraProjectionImpl[Offset, Envelope] = - copy(handlerStrategy = handlerStrategy - .asInstanceOf[GroupedHandlerStrategy[Envelope]] - .copy(afterEnvelopes = Some(groupAfterEnvelopes), orAfterDuration = Some(groupAfterDuration.toScala))) - override def withRecoveryStrategy( recoveryStrategy: HandlerRecoveryStrategy): CassandraProjectionImpl[Offset, Envelope] = copy(offsetStrategy = atLeastOnceStrategy.copy(recoveryStrategy = Some(recoveryStrategy))) @@ -186,8 +176,13 @@ import akka.stream.scaladsl.Source /* * Build the final ProjectionSettings to use, if currently set to None fallback to values in config file */ - private def settingsOrDefaults(implicit system: ActorSystem[_]): ProjectionSettings = - settingsOpt.getOrElse(ProjectionSettings(system)) + private def settingsOrDefaults(implicit system: ActorSystem[_]): ProjectionSettings = { + val settings = settingsOpt.getOrElse(ProjectionSettings(system)) + restartBackoffOpt match { + case None => settings + case Some(r) => settings.copy(restartBackoff = r) + } + } /* * INTERNAL API diff --git a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala index cc203cbe3..eb0f8ac82 100644 --- a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala +++ b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/javadsl/CassandraProjection.scala @@ -13,7 +13,6 @@ import akka.annotation.DoNotInherit import akka.projection.HandlerRecoveryStrategy import akka.projection.Projection import akka.projection.ProjectionId -import akka.projection.ProjectionSettings import akka.projection.StatusObserver import akka.projection.StrictRecoveryStrategy import akka.projection.cassandra.internal.CassandraProjectionImpl @@ -55,6 +54,7 @@ object CassandraProjection { projectionId, new SourceProviderAdapter(sourceProvider), settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtLeastOnce(), handlerStrategy = SingleHandlerStrategy(new HandlerAdapter(handler)), statusObserver = NoopStatusObserver) @@ -78,6 +78,7 @@ object CassandraProjection { projectionId, new SourceProviderAdapter(sourceProvider), settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = CassandraProjectionImpl .AtLeastOnce(afterEnvelopes = Some(1), orAfterDuration = Some(scala.concurrent.duration.Duration.Zero)), handlerStrategy = GroupedHandlerStrategy(new GroupedHandlerAdapter(handler)), @@ -112,6 +113,7 @@ object CassandraProjection { projectionId, new SourceProviderAdapter(sourceProvider), settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtLeastOnce(), handlerStrategy = FlowHandlerStrategy(handler.asScala), statusObserver = NoopStatusObserver) @@ -129,6 +131,7 @@ object CassandraProjection { projectionId, new SourceProviderAdapter(sourceProvider), settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtMostOnce(), handlerStrategy = SingleHandlerStrategy(new HandlerAdapter(handler)), statusObserver = NoopStatusObserver) @@ -136,7 +139,16 @@ object CassandraProjection { @DoNotInherit trait CassandraProjection[Envelope] extends Projection[Envelope] { - override def withSettings(settings: ProjectionSettings): CassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): CassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRestarts: Int): CassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): CassandraProjection[Envelope] @@ -149,7 +161,16 @@ object CassandraProjection { } @DoNotInherit trait AtLeastOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] { - override def withSettings(settings: ProjectionSettings): AtLeastOnceCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): AtLeastOnceCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRestarts: Int): AtLeastOnceCassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceCassandraProjection[Envelope] @@ -159,7 +180,16 @@ object CassandraProjection { } @DoNotInherit trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] { - override def withSettings(settings: ProjectionSettings): GroupedCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): GroupedCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRestarts: Int): GroupedCassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): GroupedCassandraProjection[Envelope] @@ -169,7 +199,16 @@ object CassandraProjection { } @DoNotInherit trait AtMostOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] { - override def withSettings(settings: ProjectionSettings): AtMostOnceCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): AtMostOnceCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRestarts: Int): AtMostOnceCassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): AtMostOnceCassandraProjection[Envelope] @@ -177,7 +216,18 @@ object CassandraProjection { } @DoNotInherit trait AtLeastOnceFlowCassandraProjection[Envelope] extends CassandraProjection[Envelope] { - override def withSettings(settings: ProjectionSettings): AtLeastOnceFlowCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): AtLeastOnceFlowCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRestarts: Int): AtLeastOnceFlowCassandraProjection[Envelope] + + override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceFlowCassandraProjection[Envelope] def withSaveOffset( afterEnvelopes: Int, diff --git a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/scaladsl/CassandraProjection.scala b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/scaladsl/CassandraProjection.scala index 63f61f666..70335b96e 100644 --- a/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/scaladsl/CassandraProjection.scala +++ b/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/scaladsl/CassandraProjection.scala @@ -16,7 +16,6 @@ import akka.annotation.DoNotInherit import akka.projection.HandlerRecoveryStrategy import akka.projection.Projection import akka.projection.ProjectionId -import akka.projection.ProjectionSettings import akka.projection.StatusObserver import akka.projection.StrictRecoveryStrategy import akka.projection.cassandra.internal.CassandraProjectionImpl @@ -61,6 +60,7 @@ object CassandraProjection { projectionId, sourceProvider, settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtLeastOnce(), handlerStrategy = SingleHandlerStrategy(handler), statusObserver = NoopStatusObserver) @@ -84,6 +84,7 @@ object CassandraProjection { projectionId, sourceProvider, settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtLeastOnce(afterEnvelopes = Some(1), orAfterDuration = Some(Duration.Zero)), handlerStrategy = GroupedHandlerStrategy(handler), statusObserver = NoopStatusObserver) @@ -117,6 +118,7 @@ object CassandraProjection { projectionId, sourceProvider, settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtLeastOnce(), handlerStrategy = FlowHandlerStrategy(handler), statusObserver = NoopStatusObserver) @@ -134,6 +136,7 @@ object CassandraProjection { projectionId, sourceProvider, settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtMostOnce(), handlerStrategy = SingleHandlerStrategy(handler), statusObserver = NoopStatusObserver) @@ -142,7 +145,16 @@ object CassandraProjection { @DoNotInherit trait CassandraProjection[Envelope] extends Projection[Envelope] { private[cassandra] def offsetStrategy: OffsetStrategy - override def withSettings(settings: ProjectionSettings): CassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): CassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): CassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): CassandraProjection[Envelope] @@ -157,7 +169,16 @@ object CassandraProjection { @DoNotInherit trait AtLeastOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] { private[cassandra] def atLeastOnceStrategy: AtLeastOnce = offsetStrategy.asInstanceOf[AtLeastOnce] - override def withSettings(settings: ProjectionSettings): AtLeastOnceCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): AtLeastOnceCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): AtLeastOnceCassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceCassandraProjection[Envelope] @@ -167,7 +188,16 @@ object CassandraProjection { } @DoNotInherit trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] { - override def withSettings(settings: ProjectionSettings): GroupedCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): GroupedCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): GroupedCassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): GroupedCassandraProjection[Envelope] @@ -179,7 +209,16 @@ object CassandraProjection { @DoNotInherit trait AtMostOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] { private[cassandra] def atMostOnceStrategy: AtMostOnce = offsetStrategy.asInstanceOf[AtMostOnce] - override def withSettings(settings: ProjectionSettings): AtMostOnceCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): AtMostOnceCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): AtMostOnceCassandraProjection[Envelope] override def withStatusObserver(observer: StatusObserver[Envelope]): AtMostOnceCassandraProjection[Envelope] @@ -187,7 +226,18 @@ object CassandraProjection { } @DoNotInherit trait AtLeastOnceFlowCassandraProjection[Envelope] extends CassandraProjection[Envelope] { - override def withSettings(settings: ProjectionSettings): AtLeastOnceFlowCassandraProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): AtLeastOnceFlowCassandraProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): AtLeastOnceFlowCassandraProjection[Envelope] + + override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceFlowCassandraProjection[Envelope] def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): AtLeastOnceFlowCassandraProjection[Envelope] } diff --git a/akka-projection-cassandra/src/test/scala/akka/projection/cassandra/CassandraProjectionSpec.scala b/akka-projection-cassandra/src/test/scala/akka/projection/cassandra/CassandraProjectionSpec.scala index 4078608cb..3f1b86894 100644 --- a/akka-projection-cassandra/src/test/scala/akka/projection/cassandra/CassandraProjectionSpec.scala +++ b/akka-projection-cassandra/src/test/scala/akka/projection/cassandra/CassandraProjectionSpec.scala @@ -31,7 +31,6 @@ import akka.projection.OffsetVerification.VerificationFailure import akka.projection.OffsetVerification.VerificationSuccess import akka.projection.ProjectionBehavior import akka.projection.ProjectionId -import akka.projection.ProjectionSettings import akka.projection.TestStatusObserver import akka.projection.cassandra.internal.CassandraOffsetStore import akka.projection.cassandra.scaladsl.CassandraProjection @@ -861,7 +860,7 @@ class CassandraProjectionSpec val projection = CassandraProjection .atLeastOnce[Long, Envelope](projectionId, sourceProvider(system, entityId), handler) - .withSettings(ProjectionSettings(system).withBackoff(1.second, 2.seconds, 0.0)) + .withRestartBackoff(1.second, 2.seconds, 0.0) .withSaveOffset(1, Duration.Zero) .withStatusObserver(statusObserver) @@ -911,9 +910,7 @@ class CassandraProjectionSpec val projection = CassandraProjection .atLeastOnce[Long, Envelope](projectionId, sourceProvider(system, entityId), handler) - .withSettings( - ProjectionSettings(system).withBackoff(1.second, 2.seconds, 0.0, maxRestarts = 0) - ) // no restarts + .withRestartBackoff(1.second, 2.seconds, 0.0, maxRestarts = 0) // no restarts .withSaveOffset(1, Duration.Zero) // not using ProjectionTestKit because want to test restarts diff --git a/akka-projection-core/src/main/scala/akka/projection/Projection.scala b/akka-projection-core/src/main/scala/akka/projection/Projection.scala index 642ba56f2..4da27dcba 100644 --- a/akka-projection-core/src/main/scala/akka/projection/Projection.scala +++ b/akka-projection-core/src/main/scala/akka/projection/Projection.scala @@ -6,6 +6,7 @@ package akka.projection import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration import scala.util.Failure import scala.util.Success import scala.util.Try @@ -16,6 +17,7 @@ import akka.NotUsed import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.annotation.InternalApi +import akka.projection.internal.ProjectionSettings import akka.projection.scaladsl.HandlerLifecycle import akka.stream.scaladsl.RestartSource import akka.stream.scaladsl.Source @@ -36,12 +38,43 @@ trait Projection[Envelope] { def projectionId: ProjectionId - def withSettings(settings: ProjectionSettings): Projection[Envelope] + def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): Projection[Envelope] + + def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): Projection[Envelope] + + /** + * Java API + */ + def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): Projection[Envelope] + + /** + * Java API + */ + def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRestarts: Int): Projection[Envelope] def statusObserver: StatusObserver[Envelope] def withStatusObserver(observer: StatusObserver[Envelope]): Projection[Envelope] + /** + * INTERNAL API + */ + @InternalApi private[akka] def withSettings(settings: ProjectionSettings): Projection[Envelope] + /** * INTERNAL API * @@ -74,13 +107,13 @@ private[projection] object RunningProjection { case object AbortProjectionException extends RuntimeException("Projection aborted.") with NoStackTrace def withBackoff(source: () => Source[Done, _], settings: ProjectionSettings): Source[Done, _] = { + val backoff = settings.restartBackoff RestartSource - .onFailuresWithBackoff(settings.minBackoff, settings.maxBackoff, settings.randomFactor, settings.maxRestarts) { - () => - source() - .recoverWithRetries(1, { - case AbortProjectionException => Source.empty // don't restart - }) + .onFailuresWithBackoff(backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor, backoff.maxRestarts) { () => + source() + .recoverWithRetries(1, { + case AbortProjectionException => Source.empty // don't restart + }) } } diff --git a/akka-projection-core/src/main/scala/akka/projection/ProjectionSettings.scala b/akka-projection-core/src/main/scala/akka/projection/ProjectionSettings.scala deleted file mode 100644 index 9e22b70a8..000000000 --- a/akka-projection-core/src/main/scala/akka/projection/ProjectionSettings.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (C) 2020 Lightbend Inc. - */ - -package akka.projection - -import java.time - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ - -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.util.JavaDurationConverters._ -import com.typesafe.config.Config - -trait ProjectionSettings { - - def minBackoff: FiniteDuration - def maxBackoff: FiniteDuration - def randomFactor: Double - def maxRestarts: Int - def saveOffsetAfterEnvelopes: Int - def saveOffsetAfterDuration: FiniteDuration - def groupAfterEnvelopes: Int - def groupAfterDuration: FiniteDuration - def recoveryStrategy: HandlerRecoveryStrategy - - def withBackoff(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): ProjectionSettings - - def withBackoff( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int): ProjectionSettings - - /** - * Java API - */ - def withBackoff( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double): ProjectionSettings - - /** - * Java API - */ - def withBackoff( - minBackoff: java.time.Duration, - maxBackoff: java.time.Duration, - randomFactor: Double, - maxRestarts: Int): ProjectionSettings -} - -object ProjectionSettings { - - /** - * Java API - */ - def create(system: ActorSystem[_]): ProjectionSettings = apply(system) - - def apply(system: ActorSystem[_]): ProjectionSettings = { - fromConfig(system.classicSystem.settings.config.getConfig("akka.projection")) - } - - def fromConfig(config: Config): ProjectionSettings = { - val restartBackoffConfig = config.getConfig("restart-backoff") - val atLeastOnceConfig = config.getConfig("at-least-once") - val groupedConfig = config.getConfig("grouped") - val recoveryStrategyConfig = config.getConfig("recovery-strategy") - new ProjectionSettingsImpl( - restartBackoffConfig.getDuration("min-backoff", MILLISECONDS).millis, - restartBackoffConfig.getDuration("max-backoff", MILLISECONDS).millis, - restartBackoffConfig.getDouble("random-factor"), - restartBackoffConfig.getInt("max-restarts"), - atLeastOnceConfig.getInt("save-offset-after-envelopes"), - atLeastOnceConfig.getDuration("save-offset-after-duration", MILLISECONDS).millis, - groupedConfig.getInt("group-after-envelopes"), - groupedConfig.getDuration("group-after-duration", MILLISECONDS).millis, - RecoveryStrategyConfig.fromConfig(recoveryStrategyConfig)) - } -} - -/** - * INTERNAL API - */ -@InternalApi -private[akka] class ProjectionSettingsImpl( - val minBackoff: FiniteDuration, - val maxBackoff: FiniteDuration, - val randomFactor: Double, - val maxRestarts: Int, - val saveOffsetAfterEnvelopes: Int, - val saveOffsetAfterDuration: FiniteDuration, - val groupAfterEnvelopes: Int, - val groupAfterDuration: FiniteDuration, - val recoveryStrategy: HandlerRecoveryStrategy) - extends ProjectionSettings { - - /** - * Scala API - */ - override def withBackoff( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double): ProjectionSettings = - withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) - - override def withBackoff( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - maxRestarts: Int): ProjectionSettings = - copy(minBackoff = minBackoff, maxBackoff = maxBackoff, randomFactor = randomFactor, maxRestarts = maxRestarts) - - /** - * Java API - */ - override def withBackoff( - minBackoff: time.Duration, - maxBackoff: time.Duration, - randomFactor: Double): ProjectionSettings = - withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts = -1) - - /** - * Java API - */ - override def withBackoff( - minBackoff: time.Duration, - maxBackoff: time.Duration, - randomFactor: Double, - maxRestarts: Int): ProjectionSettings = - copy( - minBackoff = minBackoff.asScala, - maxBackoff = maxBackoff.asScala, - randomFactor = randomFactor, - maxRestarts = maxRestarts) - - private[akka] def copy( - minBackoff: FiniteDuration = minBackoff, - maxBackoff: FiniteDuration = maxBackoff, - randomFactor: Double = randomFactor, - maxRestarts: Int = maxRestarts, - saveOffsetAfterEnvelopes: Int = saveOffsetAfterEnvelopes, - saveOffsetAfterDuration: FiniteDuration = saveOffsetAfterDuration, - recoveryStrategy: HandlerRecoveryStrategy = recoveryStrategy): ProjectionSettings = - new ProjectionSettingsImpl( - minBackoff, - maxBackoff, - randomFactor, - maxRestarts, - saveOffsetAfterEnvelopes, - saveOffsetAfterDuration, - groupAfterEnvelopes, - groupAfterDuration, - recoveryStrategy) -} - -private object RecoveryStrategyConfig { - def fromConfig(config: Config): HandlerRecoveryStrategy = { - val strategy = config.getString("strategy") - val retries = config.getInt("retries") - val retryDelay = config.getDuration("retry-delay", MILLISECONDS).millis - - strategy match { - case "fail" => HandlerRecoveryStrategy.fail - case "skip" => HandlerRecoveryStrategy.skip - case "retry-and-fail" => HandlerRecoveryStrategy.retryAndFail(retries, retryDelay) - case "retry-and-skip" => HandlerRecoveryStrategy.retryAndSkip(retries, retryDelay) - case s => - throw new IllegalArgumentException( - s"Strategy type [$s] is not supported. Supported options are [fail, skip, retry-and-fail, retry-and-skip]") - } - } -} diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala b/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala new file mode 100644 index 000000000..47ff04a3e --- /dev/null +++ b/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.projection.internal + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.projection.HandlerRecoveryStrategy +import akka.projection.Projection +import akka.util.JavaDurationConverters._ +import com.typesafe.config.Config + +/** + * INTERNAL API + */ +@InternalApi +final case class ProjectionSettings( + restartBackoff: RestartBackoffSettings, + saveOffsetAfterEnvelopes: Int, + saveOffsetAfterDuration: FiniteDuration, + groupAfterEnvelopes: Int, + groupAfterDuration: FiniteDuration, + recoveryStrategy: HandlerRecoveryStrategy) + +/** + * INTERNAL API + */ +@InternalApi +object ProjectionSettings { + + def apply(system: ActorSystem[_]): ProjectionSettings = { + fromConfig(system.classicSystem.settings.config.getConfig("akka.projection")) + } + + def fromConfig(config: Config): ProjectionSettings = { + val restartBackoffConfig = config.getConfig("restart-backoff") + val atLeastOnceConfig = config.getConfig("at-least-once") + val groupedConfig = config.getConfig("grouped") + val recoveryStrategyConfig = config.getConfig("recovery-strategy") + new ProjectionSettings( + RestartBackoffSettings( + minBackoff = restartBackoffConfig.getDuration("min-backoff", MILLISECONDS).millis, + maxBackoff = restartBackoffConfig.getDuration("max-backoff", MILLISECONDS).millis, + randomFactor = restartBackoffConfig.getDouble("random-factor"), + maxRestarts = restartBackoffConfig.getInt("max-restarts")), + atLeastOnceConfig.getInt("save-offset-after-envelopes"), + atLeastOnceConfig.getDuration("save-offset-after-duration", MILLISECONDS).millis, + groupedConfig.getInt("group-after-envelopes"), + groupedConfig.getDuration("group-after-duration", MILLISECONDS).millis, + RecoveryStrategyConfig.fromConfig(recoveryStrategyConfig)) + } +} + +private object RecoveryStrategyConfig { + def fromConfig(config: Config): HandlerRecoveryStrategy = { + val strategy = config.getString("strategy") + val retries = config.getInt("retries") + val retryDelay = config.getDuration("retry-delay", MILLISECONDS).millis + + strategy match { + case "fail" => HandlerRecoveryStrategy.fail + case "skip" => HandlerRecoveryStrategy.skip + case "retry-and-fail" => HandlerRecoveryStrategy.retryAndFail(retries, retryDelay) + case "retry-and-skip" => HandlerRecoveryStrategy.retryAndSkip(retries, retryDelay) + case s => + throw new IllegalArgumentException( + s"Strategy type [$s] is not supported. Supported options are [fail, skip, retry-and-fail, retry-and-skip]") + } + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class RestartBackoffSettings( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int) + +/** + * INTERNAL API: mixin to projection impl to not have to implement all overloaded variants in several places + */ +@InternalApi private[akka] trait SettingsImpl[ProjectionImpl <: Projection[_]] { self: ProjectionImpl => + def withRestartBackoffSettings(restartBackoff: RestartBackoffSettings): ProjectionImpl + + def withRestartBackoff(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): ProjectionImpl = + withRestartBackoffSettings(RestartBackoffSettings(minBackoff, maxBackoff, randomFactor, -1)) + + def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): ProjectionImpl = + withRestartBackoffSettings(RestartBackoffSettings(minBackoff, maxBackoff, randomFactor, maxRestarts)) + + def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): ProjectionImpl = + withRestartBackoffSettings(RestartBackoffSettings(minBackoff.asScala, maxBackoff.asScala, randomFactor, -1)) + + def withRestartBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + maxRestarts: Int): ProjectionImpl = + withRestartBackoffSettings( + RestartBackoffSettings(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts)) + + def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): ProjectionImpl + + def withSaveOffset(afterEnvelopes: Int, afterDuration: java.time.Duration): ProjectionImpl = + withSaveOffset(afterEnvelopes, afterDuration.asScala) + + def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: FiniteDuration): ProjectionImpl + + def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: java.time.Duration): ProjectionImpl = + withGroup(groupAfterEnvelopes, groupAfterDuration.asScala) + +} diff --git a/akka-projection-core/src/test/java/akka/projection/ProjectionBehaviorCompileTest.java b/akka-projection-core/src/test/java/akka/projection/ProjectionBehaviorCompileTest.java index 8a17ec9f7..17c51ea7e 100644 --- a/akka-projection-core/src/test/java/akka/projection/ProjectionBehaviorCompileTest.java +++ b/akka-projection-core/src/test/java/akka/projection/ProjectionBehaviorCompileTest.java @@ -4,12 +4,16 @@ package akka.projection; +import java.time.Duration; + import akka.Done; import akka.actor.testkit.typed.javadsl.ActorTestKit; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.projection.internal.NoopStatusObserver; +import akka.projection.internal.ProjectionSettings; import akka.stream.scaladsl.Source; +import scala.concurrent.duration.FiniteDuration; /** * Compile test: this class serves only for exercising the Java API. @@ -59,5 +63,25 @@ public Projection withStatusObserver(StatusObserver observer) { // no need for StatusObserver in tests return this; } + + @Override + public Projection withRestartBackoff(FiniteDuration minBackoff, FiniteDuration maxBackoff, double randomFactor) { + return this; + } + + @Override + public Projection withRestartBackoff(FiniteDuration minBackoff, FiniteDuration maxBackoff, double randomFactor, int maxRestarts) { + return this; + } + + @Override + public Projection withRestartBackoff(Duration minBackoff, Duration maxBackoff, double randomFactor) { + return this; + } + + @Override + public Projection withRestartBackoff(Duration minBackoff, Duration maxBackoff, double randomFactor, int maxRestarts) { + return this; + } } } diff --git a/akka-projection-core/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala b/akka-projection-core/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala index 79ad6b422..cbfc6b37f 100644 --- a/akka-projection-core/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala +++ b/akka-projection-core/src/test/scala/akka/projection/ProjectionBehaviorSpec.scala @@ -18,6 +18,9 @@ import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.projection.internal.NoopStatusObserver +import akka.projection.internal.ProjectionSettings +import akka.projection.internal.RestartBackoffSettings +import akka.projection.internal.SettingsImpl import akka.stream.KillSwitches import akka.stream.OverflowStrategy import akka.stream.SharedKillSwitch @@ -38,12 +41,13 @@ object ProjectionBehaviorSpec { * This TestProjection has a internal state that we can use to prove that on restart, * the actor is taking a new projection instance. */ - case class TestProjection( + private[akka] case class TestProjection( src: Source[Int, NotUsed], testProbe: TestProbe[ProbeMessage], failToStop: Boolean = false, override val projectionId: ProjectionId = ProjectionBehaviorSpec.TestProjectionId) - extends Projection[Int] { + extends Projection[Int] + with SettingsImpl[TestProjection] { private val offsetStore = new AtomicInteger @@ -61,6 +65,10 @@ object ProjectionBehaviorSpec { override def withStatusObserver(observer: StatusObserver[Int]): Projection[Int] = this // no need for StatusObserver in tests + override def withRestartBackoffSettings(restartBackoff: RestartBackoffSettings): TestProjection = this + override def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): TestProjection = this + override def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: FiniteDuration): TestProjection = this + /* * INTERNAL API * This internal class will hold the KillSwitch that is needed diff --git a/akka-projection-kafka/src/test/scala/akka/projection/kafka/internal/KafkaSourceProviderImplSpec.scala b/akka-projection-kafka/src/test/scala/akka/projection/kafka/internal/KafkaSourceProviderImplSpec.scala index c70d56383..91c01cc1a 100644 --- a/akka-projection-kafka/src/test/scala/akka/projection/kafka/internal/KafkaSourceProviderImplSpec.scala +++ b/akka-projection-kafka/src/test/scala/akka/projection/kafka/internal/KafkaSourceProviderImplSpec.scala @@ -18,10 +18,12 @@ import akka.projection.OffsetVerification.VerificationFailure import akka.projection.OffsetVerification.VerificationSuccess import akka.projection.Projection import akka.projection.ProjectionId -import akka.projection.ProjectionSettings import akka.projection.RunningProjection import akka.projection.StatusObserver import akka.projection.internal.NoopStatusObserver +import akka.projection.internal.ProjectionSettings +import akka.projection.internal.RestartBackoffSettings +import akka.projection.internal.SettingsImpl import akka.projection.kafka.GroupOffsets import akka.projection.kafka.GroupOffsets.TopicPartitionKey import akka.projection.kafka.internal.KafkaSourceProviderImpl.ReadOffsets @@ -111,11 +113,12 @@ class KafkaSourceProviderImplSpec extends ScalaTestWithActorTestKit with LogCapt // FIXME: Copied mostly from ProjectionTestKitSpec. // Maybe a `TestProjection` could be abstracted out and reused to reduce test boilerplate - case class TestProjection( + private[akka] case class TestProjection( sourceProvider: SourceProvider[GroupOffsets, ConsumerRecord[String, String]], topic: String, partitions: Int) - extends Projection[ConsumerRecord[Int, Int]] { + extends Projection[ConsumerRecord[Int, Int]] + with SettingsImpl[TestProjection] { val groupOffsets: GroupOffsets = GroupOffsets( (0 until partitions).map(i => TopicPartitionKey(new TopicPartition(topic, i)) -> 0L).toMap) @@ -135,7 +138,10 @@ class KafkaSourceProviderImplSpec extends ScalaTestWithActorTestKit with LogCapt private lazy val internalState = new InternalProjectionState() override def projectionId: ProjectionId = ProjectionId("name", "key") - override def withSettings(settings: ProjectionSettings): Projection[ConsumerRecord[Int, Int]] = this + override def withSettings(settings: ProjectionSettings): TestProjection = this + override def withRestartBackoffSettings(restartBackoff: RestartBackoffSettings): TestProjection = this + override def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): TestProjection = this + override def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: FiniteDuration): TestProjection = this override private[projection] def mappedSource()(implicit system: ActorSystem[_]) = internalState.mappedSource() diff --git a/akka-projection-slick/src/main/scala/akka/projection/slick/SlickProjection.scala b/akka-projection-slick/src/main/scala/akka/projection/slick/SlickProjection.scala index 170d1080c..feda85077 100644 --- a/akka-projection-slick/src/main/scala/akka/projection/slick/SlickProjection.scala +++ b/akka-projection-slick/src/main/scala/akka/projection/slick/SlickProjection.scala @@ -15,7 +15,7 @@ import akka.annotation.ApiMayChange import akka.projection.HandlerRecoveryStrategy import akka.projection.Projection import akka.projection.ProjectionId -import akka.projection.ProjectionSettings +import akka.projection.StatusObserver import akka.projection.internal.NoopStatusObserver import akka.projection.scaladsl.HandlerLifecycle import akka.projection.scaladsl.SourceProvider @@ -55,6 +55,7 @@ object SlickProjection { sourceProvider, databaseConfig, settingsOpt = None, + restartBackoffOpt = None, ExactlyOnce(), SingleHandlerStrategy(handler), NoopStatusObserver) @@ -81,6 +82,7 @@ object SlickProjection { sourceProvider, databaseConfig, settingsOpt = None, + restartBackoffOpt = None, AtLeastOnce(), SingleHandlerStrategy(handler), NoopStatusObserver) @@ -105,6 +107,7 @@ object SlickProjection { sourceProvider, databaseConfig, settingsOpt = None, + restartBackoffOpt = None, ExactlyOnce(), GroupedHandlerStrategy(handler), NoopStatusObserver) @@ -140,6 +143,7 @@ object SlickProjection { sourceProvider, databaseConfig, settingsOpt = None, + restartBackoffOpt = None, offsetStrategy = AtLeastOnce(), handlerStrategy = FlowHandlerStrategy(handler), NoopStatusObserver) @@ -149,7 +153,18 @@ object SlickProjection { trait SlickProjection[Envelope] extends Projection[Envelope] { private[slick] def offsetStrategy: OffsetStrategy - override def withSettings(settings: ProjectionSettings): SlickProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): SlickProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): SlickProjection[Envelope] + + override def withStatusObserver(observer: StatusObserver[Envelope]): SlickProjection[Envelope] /** * For testing purposes the offset table can be created programmatically. @@ -162,7 +177,18 @@ trait SlickProjection[Envelope] extends Projection[Envelope] { trait AtLeastOnceSlickProjection[Envelope] extends SlickProjection[Envelope] { private[slick] def atLeastOnceStrategy: AtLeastOnce = offsetStrategy.asInstanceOf[AtLeastOnce] - override def withSettings(settings: ProjectionSettings): AtLeastOnceSlickProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): AtLeastOnceSlickProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): AtLeastOnceSlickProjection[Envelope] + + override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceSlickProjection[Envelope] def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): AtLeastOnceSlickProjection[Envelope] @@ -170,8 +196,18 @@ trait AtLeastOnceSlickProjection[Envelope] extends SlickProjection[Envelope] { } trait GroupedSlickProjection[Envelope] extends SlickProjection[Envelope] { + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): GroupedSlickProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): GroupedSlickProjection[Envelope] - override def withSettings(settings: ProjectionSettings): GroupedSlickProjection[Envelope] + override def withStatusObserver(observer: StatusObserver[Envelope]): GroupedSlickProjection[Envelope] def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: FiniteDuration): GroupedSlickProjection[Envelope] @@ -179,7 +215,18 @@ trait GroupedSlickProjection[Envelope] extends SlickProjection[Envelope] { } trait AtLeastOnceFlowSlickProjection[Envelope] extends SlickProjection[Envelope] { - override def withSettings(settings: ProjectionSettings): AtLeastOnceFlowSlickProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): AtLeastOnceFlowSlickProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): AtLeastOnceFlowSlickProjection[Envelope] + + override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceFlowSlickProjection[Envelope] def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): AtLeastOnceFlowSlickProjection[Envelope] } @@ -187,7 +234,18 @@ trait AtLeastOnceFlowSlickProjection[Envelope] extends SlickProjection[Envelope] trait ExactlyOnceSlickProjection[Envelope] extends SlickProjection[Envelope] { private[slick] def exactlyOnceStrategy: ExactlyOnce = offsetStrategy.asInstanceOf[ExactlyOnce] - override def withSettings(settings: ProjectionSettings): ExactlyOnceSlickProjection[Envelope] + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): ExactlyOnceSlickProjection[Envelope] + + override def withRestartBackoff( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + maxRestarts: Int): ExactlyOnceSlickProjection[Envelope] + + override def withStatusObserver(observer: StatusObserver[Envelope]): ExactlyOnceSlickProjection[Envelope] def withRecoveryStrategy(recoveryStrategy: HandlerRecoveryStrategy): ExactlyOnceSlickProjection[Envelope] } diff --git a/akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala b/akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala index eece09a0c..f7aa2d95f 100644 --- a/akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala +++ b/akka-projection-slick/src/main/scala/akka/projection/slick/internal/SlickProjectionImpl.scala @@ -21,7 +21,6 @@ import akka.event.Logging import akka.projection.HandlerRecoveryStrategy import akka.projection.ProjectionId import akka.projection.ProjectionOffsetManagement -import akka.projection.ProjectionSettings import akka.projection.RunningProjection import akka.projection.RunningProjection.AbortProjectionException import akka.projection.StatusObserver @@ -38,6 +37,9 @@ import akka.projection.MergeableKey import akka.projection.MergeableOffset import akka.projection.OffsetVerification.VerificationFailure import akka.projection.OffsetVerification.VerificationSuccess +import akka.projection.internal.ProjectionSettings +import akka.projection.internal.RestartBackoffSettings +import akka.projection.internal.SettingsImpl import akka.stream.KillSwitches import akka.stream.SharedKillSwitch import akka.stream.scaladsl.Flow @@ -84,6 +86,7 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile sourceProvider: SourceProvider[Offset, Envelope], databaseConfig: DatabaseConfig[P], settingsOpt: Option[ProjectionSettings], + restartBackoffOpt: Option[RestartBackoffSettings], val offsetStrategy: SlickProjectionImpl.OffsetStrategy, handlerStrategy: SlickProjectionImpl.HandlerStrategy[Envelope], override val statusObserver: StatusObserver[Envelope]) @@ -91,12 +94,14 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile with ExactlyOnceSlickProjection[Envelope] with AtLeastOnceSlickProjection[Envelope] with GroupedSlickProjection[Envelope] - with AtLeastOnceFlowSlickProjection[Envelope] { + with AtLeastOnceFlowSlickProjection[Envelope] + with SettingsImpl[SlickProjectionImpl[Offset, Envelope, P]] { import SlickProjectionImpl._ private def copy( settingsOpt: Option[ProjectionSettings] = this.settingsOpt, + restartBackoffOpt: Option[RestartBackoffSettings] = this.restartBackoffOpt, offsetStrategy: OffsetStrategy = this.offsetStrategy, handlerStrategy: SlickProjectionImpl.HandlerStrategy[Envelope] = this.handlerStrategy, statusObserver: StatusObserver[Envelope] = this.statusObserver): SlickProjectionImpl[Offset, Envelope, P] = @@ -105,6 +110,7 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile sourceProvider, databaseConfig, settingsOpt, + restartBackoffOpt, offsetStrategy, handlerStrategy, statusObserver) @@ -112,6 +118,10 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile override def withSettings(settings: ProjectionSettings): SlickProjectionImpl[Offset, Envelope, P] = copy(settingsOpt = Option(settings)) + override def withRestartBackoffSettings( + restartBackoff: RestartBackoffSettings): SlickProjectionImpl[Offset, Envelope, P] = + copy(restartBackoffOpt = Some(restartBackoff)) + /** * Settings for AtLeastOnceSlickProjection */ @@ -168,8 +178,13 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile /* * Build the final ProjectionSettings to use, if currently set to None fallback to values in config file */ - private def settingsOrDefaults(implicit system: ActorSystem[_]): ProjectionSettings = - settingsOpt.getOrElse(ProjectionSettings(system)) + private def settingsOrDefaults(implicit system: ActorSystem[_]): ProjectionSettings = { + val settings = settingsOpt.getOrElse(ProjectionSettings(system)) + restartBackoffOpt match { + case None => settings + case Some(r) => settings.copy(restartBackoff = r) + } + } /* * INTERNAL API diff --git a/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala b/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala index 0ea8b7a35..53a833a16 100644 --- a/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala +++ b/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala @@ -28,7 +28,6 @@ import akka.projection.OffsetVerification.VerificationFailure import akka.projection.OffsetVerification.VerificationSuccess import akka.projection.ProjectionBehavior import akka.projection.ProjectionId -import akka.projection.ProjectionSettings import akka.projection.TestStatusObserver import akka.projection.scaladsl.ProjectionManagement import akka.projection.scaladsl.SourceProvider @@ -1246,7 +1245,7 @@ class SlickProjectionSpec extends SlickSpec(SlickProjectionSpec.config) with Any sourceProvider = sourceProvider(system, entityId), databaseConfig = dbConfig, handler) - .withSettings(ProjectionSettings(system).withBackoff(1.second, 2.seconds, 0.0)) + .withRestartBackoff(1.second, 2.seconds, 0.0) .withSaveOffset(1, Duration.Zero) .withStatusObserver(statusObserver) @@ -1300,9 +1299,7 @@ class SlickProjectionSpec extends SlickSpec(SlickProjectionSpec.config) with Any sourceProvider = sourceProvider(system, entityId), databaseConfig = dbConfig, handler) - .withSettings( - ProjectionSettings(system).withBackoff(1.second, 2.seconds, 0.0, maxRestarts = 0) - ) // no restarts + .withRestartBackoff(1.second, 2.seconds, 0.0, maxRestarts = 0) // no restarts .withSaveOffset(1, Duration.Zero) // not using ProjectionTestKit because want to test restarts diff --git a/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/ProjectionTestKit.scala b/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/ProjectionTestKit.scala index b9b040ec6..0cdcf4e0b 100644 --- a/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/ProjectionTestKit.scala +++ b/akka-projection-testkit/src/main/scala/akka/projection/testkit/javadsl/ProjectionTestKit.scala @@ -18,7 +18,8 @@ import akka.annotation.ApiMayChange import akka.japi.function.Effect import akka.japi.function.Procedure import akka.projection.Projection -import akka.projection.ProjectionSettings +import akka.projection.internal.ProjectionSettings +import akka.projection.internal.RestartBackoffSettings import akka.stream.testkit.TestSubscriber import akka.stream.testkit.javadsl.TestSink import akka.util.JavaDurationConverters._ @@ -89,7 +90,7 @@ final class ProjectionTestKit private[akka] (testKit: ActorTestKit) { val probe = testKit.createTestProbe[Nothing]("internal-projection-testkit-probe") - val settingsForTest = ProjectionSettings(system).withBackoff(0.millis, 0.millis, 0.0, 0) + val settingsForTest = ProjectionSettings(system).copy(RestartBackoffSettings(0.millis, 0.millis, 0.0, 0)) val running = projection diff --git a/akka-projection-testkit/src/main/scala/akka/projection/testkit/scaladsl/ProjectionTestKit.scala b/akka-projection-testkit/src/main/scala/akka/projection/testkit/scaladsl/ProjectionTestKit.scala index 31604bb26..9e9af3331 100644 --- a/akka-projection-testkit/src/main/scala/akka/projection/testkit/scaladsl/ProjectionTestKit.scala +++ b/akka-projection-testkit/src/main/scala/akka/projection/testkit/scaladsl/ProjectionTestKit.scala @@ -14,7 +14,8 @@ import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.projection.Projection -import akka.projection.ProjectionSettings +import akka.projection.internal.ProjectionSettings +import akka.projection.internal.RestartBackoffSettings import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink @@ -84,7 +85,7 @@ final class ProjectionTestKit private[akka] (testKit: ActorTestKit) { val probe = testKit.createTestProbe[Nothing]("internal-projection-testkit-probe") - val settingsForTest = ProjectionSettings(system).withBackoff(0.millis, 0.millis, 0.0, 0) + val settingsForTest = ProjectionSettings(system).copy(RestartBackoffSettings(0.millis, 0.millis, 0.0, 0)) val running = projection .withSettings(settingsForTest) diff --git a/akka-projection-testkit/src/test/java/akka/projection/testkit/javadsl/ProjectionTestKitTest.java b/akka-projection-testkit/src/test/java/akka/projection/testkit/javadsl/ProjectionTestKitTest.java index daef80fa6..fe3d88169 100644 --- a/akka-projection-testkit/src/test/java/akka/projection/testkit/javadsl/ProjectionTestKitTest.java +++ b/akka-projection-testkit/src/test/java/akka/projection/testkit/javadsl/ProjectionTestKitTest.java @@ -11,7 +11,7 @@ import akka.japi.function.Function; import akka.projection.Projection; import akka.projection.ProjectionId; -import akka.projection.ProjectionSettings; +import akka.projection.internal.ProjectionSettings; import akka.projection.RunningProjection; import akka.projection.StatusObserver; import akka.projection.internal.NoopStatusObserver; @@ -27,8 +27,8 @@ import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; import scala.compat.java8.FutureConverters; -import scala.concurrent.ExecutionContext; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import java.time.Duration; import java.util.List; @@ -170,6 +170,26 @@ public ProjectionId projectionId() { return ProjectionId.of("test-projection", "00"); } + @Override + public Projection withRestartBackoff(FiniteDuration minBackoff, FiniteDuration maxBackoff, double randomFactor) { + return this; + } + + @Override + public Projection withRestartBackoff(FiniteDuration minBackoff, FiniteDuration maxBackoff, double randomFactor, int maxRestarts) { + return this; + } + + @Override + public Projection withRestartBackoff(Duration minBackoff, Duration maxBackoff, double randomFactor) { + return this; + } + + @Override + public Projection withRestartBackoff(Duration minBackoff, Duration maxBackoff, double randomFactor, int maxRestarts) { + return this; + } + @Override public akka.stream.scaladsl.Source mappedSource(ActorSystem system) { return new InternalProjectionState(strBuffer, predicate, system).mappedSource(); diff --git a/akka-projection-testkit/src/test/scala/akka/projection/testkit/scaladsl/ProjectionTestKitSpec.scala b/akka-projection-testkit/src/test/scala/akka/projection/testkit/scaladsl/ProjectionTestKitSpec.scala index cd6e78090..cbdd80a92 100644 --- a/akka-projection-testkit/src/test/scala/akka/projection/testkit/scaladsl/ProjectionTestKitSpec.scala +++ b/akka-projection-testkit/src/test/scala/akka/projection/testkit/scaladsl/ProjectionTestKitSpec.scala @@ -13,10 +13,12 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem import akka.projection.Projection import akka.projection.ProjectionId -import akka.projection.ProjectionSettings import akka.projection.RunningProjection import akka.projection.StatusObserver import akka.projection.internal.NoopStatusObserver +import akka.projection.internal.ProjectionSettings +import akka.projection.internal.RestartBackoffSettings +import akka.projection.internal.SettingsImpl import akka.stream.DelayOverflowStrategy import akka.stream.KillSwitches import akka.stream.SharedKillSwitch @@ -134,8 +136,9 @@ class ProjectionTestKitSpec extends ScalaTestWithActorTestKit with AnyWordSpecLi } - case class TestProjection(src: Source[Int, NotUsed], strBuffer: StringBuffer, predicate: Int => Boolean) - extends Projection[Int] { + private[akka] case class TestProjection(src: Source[Int, NotUsed], strBuffer: StringBuffer, predicate: Int => Boolean) + extends Projection[Int] + with SettingsImpl[TestProjection] { override def withSettings(settings: ProjectionSettings): Projection[Int] = this // no need for ProjectionSettings in tests @@ -145,6 +148,10 @@ class ProjectionTestKitSpec extends ScalaTestWithActorTestKit with AnyWordSpecLi override def withStatusObserver(observer: StatusObserver[Int]): Projection[Int] = this // no need for StatusObserver in tests + override def withRestartBackoffSettings(restartBackoff: RestartBackoffSettings): TestProjection = this + override def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): TestProjection = this + override def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: FiniteDuration): TestProjection = this + override def projectionId: ProjectionId = ProjectionId("test-projection", "00") override def run()(implicit system: ActorSystem[_]): RunningProjection = diff --git a/docs/src/main/paradox/projection-settings.md b/docs/src/main/paradox/projection-settings.md index 44d459554..a5754b614 100644 --- a/docs/src/main/paradox/projection-settings.md +++ b/docs/src/main/paradox/projection-settings.md @@ -5,10 +5,10 @@ A Projection is a background process that continuously consume event envelopes f By default, the backoff configuration defined in the reference configuration is used. Those values can be overriden in the `application.conf` file or programatically as shown below. Scala -: @@snip [CassandraProjectionDocExample.scala](/examples/src/test/scala/docs/cassandra/CassandraProjectionDocExample.scala) { #projection-settings-imports #projection-imports #projection-settings } +: @@snip [CassandraProjectionDocExample.scala](/examples/src/test/scala/docs/cassandra/CassandraProjectionDocExample.scala) { #projection-imports #projection-settings } Java -: @@snip [CassandraProjectionDocExample.java](/examples/src/test/java/jdocs/cassandra/CassandraProjectionDocExample.java) { #projection-settings-imports #projection-imports #projection-settings } +: @@snip [CassandraProjectionDocExample.java](/examples/src/test/java/jdocs/cassandra/CassandraProjectionDocExample.java) { #projection-imports #projection-settings } ## Configuration diff --git a/examples/src/test/java/jdocs/cassandra/CassandraProjectionDocExample.java b/examples/src/test/java/jdocs/cassandra/CassandraProjectionDocExample.java index b30b1f658..7ca43c180 100644 --- a/examples/src/test/java/jdocs/cassandra/CassandraProjectionDocExample.java +++ b/examples/src/test/java/jdocs/cassandra/CassandraProjectionDocExample.java @@ -48,15 +48,8 @@ //#handler-imports -//#projection-settings-imports -import akka.projection.ProjectionSettings; - -//#projection-settings-imports - //#get-offset import akka.projection.javadsl.ProjectionManagement; -import akka.persistence.query.Offset; -import akka.projection.ProjectionId; //#get-offset @@ -260,16 +253,13 @@ public static void illustrateProjectionSettings() { Projection> projection = CassandraProjection.atLeastOnce( - ProjectionId.of("shopping-carts", "carts-1"), - sourceProvider, - new ShoppingCartHandler() - ).withSettings( - ProjectionSettings.create(system) - .withBackoff( - Duration.ofSeconds(10), /*minBackoff*/ - Duration.ofSeconds(60), /*maxBackoff*/ - 0.5 /*randomFactor*/ - ) + ProjectionId.of("shopping-carts", "carts-1"), + sourceProvider, + new ShoppingCartHandler() + ).withRestartBackoff( + Duration.ofSeconds(10), /*minBackoff*/ + Duration.ofSeconds(60), /*maxBackoff*/ + 0.5 /*randomFactor*/ ) .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration); //#projection-settings diff --git a/examples/src/test/java/jdocs/testkit/TestKitDocExample.java b/examples/src/test/java/jdocs/testkit/TestKitDocExample.java index 01f9c563f..0b8b98d43 100644 --- a/examples/src/test/java/jdocs/testkit/TestKitDocExample.java +++ b/examples/src/test/java/jdocs/testkit/TestKitDocExample.java @@ -7,7 +7,7 @@ import akka.actor.typed.ActorSystem; import akka.projection.Projection; import akka.projection.ProjectionId; -import akka.projection.ProjectionSettings; +import akka.projection.internal.ProjectionSettings; import akka.projection.RunningProjection; import akka.projection.StatusObserver; import akka.stream.scaladsl.Source; @@ -20,6 +20,7 @@ import org.junit.ClassRule; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.projection.testkit.javadsl.ProjectionTestKit; +import scala.concurrent.duration.FiniteDuration; //#testkit-import @@ -78,6 +79,26 @@ public Projection withStatusObserver(StatusObserver observer) { return null; } + @Override + public Projection withRestartBackoff(FiniteDuration minBackoff, FiniteDuration maxBackoff, double randomFactor) { + return this; + } + + @Override + public Projection withRestartBackoff(FiniteDuration minBackoff, FiniteDuration maxBackoff, double randomFactor, int maxRestarts) { + return this; + } + + @Override + public Projection withRestartBackoff(Duration minBackoff, Duration maxBackoff, double randomFactor) { + return this; + } + + @Override + public Projection withRestartBackoff(Duration minBackoff, Duration maxBackoff, double randomFactor, int maxRestarts) { + return this; + } + @Override public Source mappedSource(ActorSystem system) { return null; diff --git a/examples/src/test/scala/docs/cassandra/CassandraProjectionDocExample.scala b/examples/src/test/scala/docs/cassandra/CassandraProjectionDocExample.scala index 75182697e..4f40d1ada 100644 --- a/examples/src/test/scala/docs/cassandra/CassandraProjectionDocExample.scala +++ b/examples/src/test/scala/docs/cassandra/CassandraProjectionDocExample.scala @@ -6,7 +6,6 @@ package docs.cassandra import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors - import akka.stream.scaladsl.FlowWithContext //#daemon-imports import akka.cluster.sharding.typed.ShardedDaemonProcessSettings @@ -29,13 +28,8 @@ import akka.projection.cassandra.scaladsl.CassandraProjection //#projection-imports -//#projection-settings-imports -import scala.concurrent.duration._ - -import akka.projection.ProjectionSettings -//#projection-settings-imports - //#handler-imports +import scala.concurrent.duration._ import scala.concurrent.Future import akka.Done @@ -126,7 +120,6 @@ object CassandraProjectionDocExample { } object IllustrateAtLeastOnceFlow { - import akka.persistence.query.Offset //#atLeastOnceFlow val logger = LoggerFactory.getLogger(getClass) @@ -189,8 +182,7 @@ object CassandraProjectionDocExample { projectionId = ProjectionId("shopping-carts", "carts-1"), sourceProvider, handler = new ShoppingCartHandler) - .withSettings(ProjectionSettings(system) - .withBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.5)) + .withRestartBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.5) .withSaveOffset(100, 500.millis) //#projection-settings