Skip to content

Commit

Permalink
Merge pull request #201 from akka/wip-177-settings-patriknw
Browse files Browse the repository at this point in the history
withRestartBackoff settings directly on Projection, #177
  • Loading branch information
octonato authored Jun 5, 2020
2 parents 04d2bf4 + 8847dfa commit 1eb9ee4
Show file tree
Hide file tree
Showing 21 changed files with 498 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._
import scala.util.control.NonFatal

import akka.Done
Expand All @@ -22,7 +21,6 @@ import akka.event.Logging
import akka.projection.HandlerRecoveryStrategy
import akka.projection.ProjectionId
import akka.projection.ProjectionOffsetManagement
import akka.projection.ProjectionSettings
import akka.projection.RunningProjection
import akka.projection.RunningProjection.AbortProjectionException
import akka.projection.StatusObserver
Expand All @@ -35,6 +33,9 @@ import akka.projection.scaladsl.HandlerLifecycle
import akka.projection.scaladsl.SourceProvider
import akka.projection.OffsetVerification.VerificationFailure
import akka.projection.OffsetVerification.VerificationSuccess
import akka.projection.internal.ProjectionSettings
import akka.projection.internal.RestartBackoffSettings
import akka.projection.internal.SettingsImpl
import akka.stream.KillSwitches
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.Flow
Expand Down Expand Up @@ -79,6 +80,7 @@ import akka.stream.scaladsl.Source
override val projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
settingsOpt: Option[ProjectionSettings],
restartBackoffOpt: Option[RestartBackoffSettings],
val offsetStrategy: CassandraProjectionImpl.OffsetStrategy,
handlerStrategy: CassandraProjectionImpl.HandlerStrategy[Envelope],
override val statusObserver: StatusObserver[Envelope])
Expand All @@ -91,26 +93,33 @@ import akka.stream.scaladsl.Source
with javadsl.AtMostOnceCassandraProjection[Envelope]
with scaladsl.AtMostOnceCassandraProjection[Envelope]
with scaladsl.AtLeastOnceFlowCassandraProjection[Envelope]
with javadsl.AtLeastOnceFlowCassandraProjection[Envelope] {
with javadsl.AtLeastOnceFlowCassandraProjection[Envelope]
with SettingsImpl[CassandraProjectionImpl[Offset, Envelope]] {

import CassandraProjectionImpl._

private def copy(
settingsOpt: Option[ProjectionSettings] = this.settingsOpt,
restartBackoffOpt: Option[RestartBackoffSettings] = this.restartBackoffOpt,
offsetStrategy: OffsetStrategy = this.offsetStrategy,
handlerStrategy: CassandraProjectionImpl.HandlerStrategy[Envelope] = this.handlerStrategy,
statusObserver: StatusObserver[Envelope] = this.statusObserver): CassandraProjectionImpl[Offset, Envelope] =
new CassandraProjectionImpl(
projectionId,
sourceProvider,
settingsOpt,
restartBackoffOpt,
offsetStrategy,
handlerStrategy,
statusObserver)

override def withSettings(settings: ProjectionSettings): CassandraProjectionImpl[Offset, Envelope] =
copy(settingsOpt = Option(settings))

override def withRestartBackoffSettings(
restartBackoff: RestartBackoffSettings): CassandraProjectionImpl[Offset, Envelope] =
copy(restartBackoffOpt = Some(restartBackoff))

/**
* Settings for AtLeastOnceCassandraProjection
*/
Expand All @@ -120,15 +129,6 @@ import akka.stream.scaladsl.Source
copy(offsetStrategy = atLeastOnceStrategy
.copy(afterEnvelopes = Some(afterEnvelopes), orAfterDuration = Some(afterDuration)))

/**
* Java API
*/
override def withSaveOffset(
afterEnvelopes: Int,
afterDuration: java.time.Duration): CassandraProjectionImpl[Offset, Envelope] =
copy(offsetStrategy = atLeastOnceStrategy
.copy(afterEnvelopes = Some(afterEnvelopes), orAfterDuration = Some(afterDuration.toScala)))

/**
* Settings for GroupedCassandraProjection
*/
Expand All @@ -139,16 +139,6 @@ import akka.stream.scaladsl.Source
.asInstanceOf[GroupedHandlerStrategy[Envelope]]
.copy(afterEnvelopes = Some(groupAfterEnvelopes), orAfterDuration = Some(groupAfterDuration)))

/**
* Java API
*/
override def withGroup(
groupAfterEnvelopes: Int,
groupAfterDuration: java.time.Duration): CassandraProjectionImpl[Offset, Envelope] =
copy(handlerStrategy = handlerStrategy
.asInstanceOf[GroupedHandlerStrategy[Envelope]]
.copy(afterEnvelopes = Some(groupAfterEnvelopes), orAfterDuration = Some(groupAfterDuration.toScala)))

override def withRecoveryStrategy(
recoveryStrategy: HandlerRecoveryStrategy): CassandraProjectionImpl[Offset, Envelope] =
copy(offsetStrategy = atLeastOnceStrategy.copy(recoveryStrategy = Some(recoveryStrategy)))
Expand Down Expand Up @@ -186,8 +176,13 @@ import akka.stream.scaladsl.Source
/*
* Build the final ProjectionSettings to use, if currently set to None fallback to values in config file
*/
private def settingsOrDefaults(implicit system: ActorSystem[_]): ProjectionSettings =
settingsOpt.getOrElse(ProjectionSettings(system))
private def settingsOrDefaults(implicit system: ActorSystem[_]): ProjectionSettings = {
val settings = settingsOpt.getOrElse(ProjectionSettings(system))
restartBackoffOpt match {
case None => settings
case Some(r) => settings.copy(restartBackoff = r)
}
}

/*
* INTERNAL API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import akka.annotation.DoNotInherit
import akka.projection.HandlerRecoveryStrategy
import akka.projection.Projection
import akka.projection.ProjectionId
import akka.projection.ProjectionSettings
import akka.projection.StatusObserver
import akka.projection.StrictRecoveryStrategy
import akka.projection.cassandra.internal.CassandraProjectionImpl
Expand Down Expand Up @@ -55,6 +54,7 @@ object CassandraProjection {
projectionId,
new SourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
handlerStrategy = SingleHandlerStrategy(new HandlerAdapter(handler)),
statusObserver = NoopStatusObserver)
Expand All @@ -78,6 +78,7 @@ object CassandraProjection {
projectionId,
new SourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = CassandraProjectionImpl
.AtLeastOnce(afterEnvelopes = Some(1), orAfterDuration = Some(scala.concurrent.duration.Duration.Zero)),
handlerStrategy = GroupedHandlerStrategy(new GroupedHandlerAdapter(handler)),
Expand Down Expand Up @@ -112,6 +113,7 @@ object CassandraProjection {
projectionId,
new SourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
handlerStrategy = FlowHandlerStrategy(handler.asScala),
statusObserver = NoopStatusObserver)
Expand All @@ -129,14 +131,24 @@ object CassandraProjection {
projectionId,
new SourceProviderAdapter(sourceProvider),
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtMostOnce(),
handlerStrategy = SingleHandlerStrategy(new HandlerAdapter(handler)),
statusObserver = NoopStatusObserver)
}

@DoNotInherit trait CassandraProjection[Envelope] extends Projection[Envelope] {

override def withSettings(settings: ProjectionSettings): CassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double): CassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int): CassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): CassandraProjection[Envelope]

Expand All @@ -149,7 +161,16 @@ object CassandraProjection {
}

@DoNotInherit trait AtLeastOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): AtLeastOnceCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double): AtLeastOnceCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int): AtLeastOnceCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceCassandraProjection[Envelope]

Expand All @@ -159,7 +180,16 @@ object CassandraProjection {
}

@DoNotInherit trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): GroupedCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double): GroupedCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int): GroupedCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): GroupedCassandraProjection[Envelope]

Expand All @@ -169,15 +199,35 @@ object CassandraProjection {
}

@DoNotInherit trait AtMostOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): AtMostOnceCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double): AtMostOnceCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int): AtMostOnceCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): AtMostOnceCassandraProjection[Envelope]

def withRecoveryStrategy(recoveryStrategy: StrictRecoveryStrategy): AtMostOnceCassandraProjection[Envelope]
}

@DoNotInherit trait AtLeastOnceFlowCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): AtLeastOnceFlowCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double): AtLeastOnceFlowCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int): AtLeastOnceFlowCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceFlowCassandraProjection[Envelope]

def withSaveOffset(
afterEnvelopes: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import akka.annotation.DoNotInherit
import akka.projection.HandlerRecoveryStrategy
import akka.projection.Projection
import akka.projection.ProjectionId
import akka.projection.ProjectionSettings
import akka.projection.StatusObserver
import akka.projection.StrictRecoveryStrategy
import akka.projection.cassandra.internal.CassandraProjectionImpl
Expand Down Expand Up @@ -61,6 +60,7 @@ object CassandraProjection {
projectionId,
sourceProvider,
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
handlerStrategy = SingleHandlerStrategy(handler),
statusObserver = NoopStatusObserver)
Expand All @@ -84,6 +84,7 @@ object CassandraProjection {
projectionId,
sourceProvider,
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(afterEnvelopes = Some(1), orAfterDuration = Some(Duration.Zero)),
handlerStrategy = GroupedHandlerStrategy(handler),
statusObserver = NoopStatusObserver)
Expand Down Expand Up @@ -117,6 +118,7 @@ object CassandraProjection {
projectionId,
sourceProvider,
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
handlerStrategy = FlowHandlerStrategy(handler),
statusObserver = NoopStatusObserver)
Expand All @@ -134,6 +136,7 @@ object CassandraProjection {
projectionId,
sourceProvider,
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = AtMostOnce(),
handlerStrategy = SingleHandlerStrategy(handler),
statusObserver = NoopStatusObserver)
Expand All @@ -142,7 +145,16 @@ object CassandraProjection {
@DoNotInherit trait CassandraProjection[Envelope] extends Projection[Envelope] {
private[cassandra] def offsetStrategy: OffsetStrategy

override def withSettings(settings: ProjectionSettings): CassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): CassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int): CassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): CassandraProjection[Envelope]

Expand All @@ -157,7 +169,16 @@ object CassandraProjection {
@DoNotInherit trait AtLeastOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
private[cassandra] def atLeastOnceStrategy: AtLeastOnce = offsetStrategy.asInstanceOf[AtLeastOnce]

override def withSettings(settings: ProjectionSettings): AtLeastOnceCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): AtLeastOnceCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int): AtLeastOnceCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceCassandraProjection[Envelope]

Expand All @@ -167,7 +188,16 @@ object CassandraProjection {
}

@DoNotInherit trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): GroupedCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): GroupedCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int): GroupedCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): GroupedCassandraProjection[Envelope]

Expand All @@ -179,15 +209,35 @@ object CassandraProjection {
@DoNotInherit trait AtMostOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
private[cassandra] def atMostOnceStrategy: AtMostOnce = offsetStrategy.asInstanceOf[AtMostOnce]

override def withSettings(settings: ProjectionSettings): AtMostOnceCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): AtMostOnceCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int): AtMostOnceCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): AtMostOnceCassandraProjection[Envelope]

def withRecoveryStrategy(recoveryStrategy: StrictRecoveryStrategy): AtMostOnceCassandraProjection[Envelope]
}

@DoNotInherit trait AtLeastOnceFlowCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): AtLeastOnceFlowCassandraProjection[Envelope]
override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): AtLeastOnceFlowCassandraProjection[Envelope]

override def withRestartBackoff(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int): AtLeastOnceFlowCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): AtLeastOnceFlowCassandraProjection[Envelope]

def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): AtLeastOnceFlowCassandraProjection[Envelope]
}
Loading

0 comments on commit 1eb9ee4

Please sign in to comment.