Skip to content

Commit

Permalink
Flow as handler, #145
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Jun 5, 2020
1 parent af779f9 commit 04d2bf4
Show file tree
Hide file tree
Showing 13 changed files with 434 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import akka.projection.OffsetVerification.VerificationSuccess
import akka.stream.KillSwitches
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.FlowWithContext
import akka.stream.scaladsl.Source

/**
Expand Down Expand Up @@ -65,6 +66,10 @@ import akka.stream.scaladsl.Source
extends HandlerStrategy[Envelope] {
override def lifecycle: HandlerLifecycle = handler
}
final case class FlowHandlerStrategy[Envelope](flowCtx: FlowWithContext[Envelope, Envelope, Done, Envelope, _])
extends HandlerStrategy[Envelope] {
override val lifecycle: HandlerLifecycle = new HandlerLifecycle {}
}
}

/**
Expand All @@ -84,7 +89,9 @@ import akka.stream.scaladsl.Source
with javadsl.GroupedCassandraProjection[Envelope]
with scaladsl.GroupedCassandraProjection[Envelope]
with javadsl.AtMostOnceCassandraProjection[Envelope]
with scaladsl.AtMostOnceCassandraProjection[Envelope] {
with scaladsl.AtMostOnceCassandraProjection[Envelope]
with scaladsl.AtLeastOnceFlowCassandraProjection[Envelope]
with javadsl.AtLeastOnceFlowCassandraProjection[Envelope] {

import CassandraProjectionImpl._

Expand Down Expand Up @@ -222,7 +229,7 @@ import akka.stream.scaladsl.Source
}
.mapMaterializedValue(_ => NotUsed)

def handlerFlow(
def atLeastOnceHandlerFlow(
recoveryStrategy: HandlerRecoveryStrategy): Flow[(Offset, Envelope), (Offset, Envelope), NotUsed] =
handlerStrategy match {
case SingleHandlerStrategy(handler) =>
Expand Down Expand Up @@ -258,6 +265,13 @@ import akka.stream.scaladsl.Source
() => grouped.handler.process(envelopes))
.map(_ => lastOffset -> lastEnvelope)
}

case f: FlowHandlerStrategy[Envelope] =>
val flow: Flow[(Envelope, Envelope), (Done, Envelope), _] = f.flowCtx.asFlow
Flow[(Offset, Envelope)]
.map { case (_, env) => env -> env }
.via(flow)
.map { case (_, env) => sourceProvider.extractOffset(env) -> env }
}

def reportProgress[T](after: Future[T], env: Envelope): Future[T] = {
Expand All @@ -279,13 +293,13 @@ import akka.stream.scaladsl.Source

if (afterEnvelopes == 1)
// optimization of general AtLeastOnce case
source.via(handlerFlow(recoveryStrategy)).mapAsync(1) {
source.via(atLeastOnceHandlerFlow(recoveryStrategy)).mapAsync(1) {
case (offset, envelope) =>
reportProgress(offsetStore.saveOffset(projectionId, offset), envelope)
}
else
source
.via(handlerFlow(recoveryStrategy))
.via(atLeastOnceHandlerFlow(recoveryStrategy))
.groupedWithin(afterEnvelopes, orAfterDuration)
.collect { case grouped if grouped.nonEmpty => grouped.last }
.mapAsync(parallelism = 1) {
Expand All @@ -300,7 +314,7 @@ import akka.stream.scaladsl.Source
val handler = handlerStrategy match {
case SingleHandlerStrategy(handler) => handler
case _ =>
// not possible
// not possible, no API for this
throw new IllegalStateException("Unsupported combination of atMostOnce and grouped")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.projection.internal.NoopStatusObserver
import akka.projection.internal.SourceProviderAdapter
import akka.projection.javadsl.Handler
import akka.projection.javadsl.SourceProvider
import akka.stream.javadsl.FlowWithContext

/**
* Factories of [[Projection]] where the offset is stored in Cassandra. The envelope handler can
Expand All @@ -34,6 +35,7 @@ object CassandraProjection {
import CassandraProjectionImpl.AtMostOnce
import CassandraProjectionImpl.GroupedHandlerStrategy
import CassandraProjectionImpl.SingleHandlerStrategy
import CassandraProjectionImpl.FlowHandlerStrategy

/**
* Create a [[Projection]] with at-least-once processing semantics. It stores the offset in Cassandra
Expand Down Expand Up @@ -81,6 +83,39 @@ object CassandraProjection {
handlerStrategy = GroupedHandlerStrategy(new GroupedHandlerAdapter(handler)),
statusObserver = NoopStatusObserver)

/**
* Create a [[Projection]] with a [[FlowWithContext]] as the envelope handler. It has at-least-once processing
* semantics.
*
* The flow should emit a `Done` element for each completed envelope. The offset of the envelope is carried
* in the context of the `FlowWithContext` and is stored in Cassandra when corresponding `Done` is emitted.
* Since the offset is stored after processing the envelope it means that if the
* projection is restarted from previously stored offset then some envelopes may be processed more than once.
*
* If the flow filters out envelopes the corresponding offset will not be stored, and such envelope
* will be processed again if the projection is restarted and no later offset was stored.
*
* The flow should not duplicate emitted envelopes (`mapConcat`) with same offset, because then it can result in
* that the first offset is stored and when the projection is restarted that offset is considered completed even
* though more of the duplicated enveloped were never processed.
*
* The flow must not reorder elements, because the offsets may be stored in the wrong order and
* and when the projection is restarted all envelopes up to the latest stored offset are considered
* completed even though some of them may not have been processed. This is the reason the flow is
* restricted to `FlowWithContext` rather than ordinary `Flow`.
*/
def atLeastOnceFlow[Offset, Envelope](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
handler: FlowWithContext[Envelope, Envelope, Done, Envelope, _]): AtLeastOnceFlowCassandraProjection[Envelope] =
new CassandraProjectionImpl(
projectionId,
new SourceProviderAdapter(sourceProvider),
settingsOpt = None,
offsetStrategy = AtLeastOnce(),
handlerStrategy = FlowHandlerStrategy(handler.asScala),
statusObserver = NoopStatusObserver)

/**
* Create a [[Projection]] with at-most-once processing semantics. It stores the offset in Cassandra
* before the `handler` has processed the envelope. This means that if the projection is restarted
Expand Down Expand Up @@ -123,7 +158,7 @@ object CassandraProjection {
def withRecoveryStrategy(recoveryStrategy: HandlerRecoveryStrategy): AtLeastOnceCassandraProjection[Envelope]
}

trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
@DoNotInherit trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): GroupedCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): GroupedCassandraProjection[Envelope]
Expand All @@ -140,3 +175,11 @@ trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope]

def withRecoveryStrategy(recoveryStrategy: StrictRecoveryStrategy): AtMostOnceCassandraProjection[Envelope]
}

@DoNotInherit trait AtLeastOnceFlowCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): AtLeastOnceFlowCassandraProjection[Envelope]

def withSaveOffset(
afterEnvelopes: Int,
afterDuration: java.time.Duration): AtLeastOnceFlowCassandraProjection[Envelope]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.concurrent.duration.FiniteDuration
import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.projection.HandlerRecoveryStrategy
import akka.projection.Projection
import akka.projection.ProjectionId
Expand All @@ -25,6 +26,7 @@ import akka.projection.cassandra.internal.CassandraProjectionImpl.OffsetStrategy
import akka.projection.internal.NoopStatusObserver
import akka.projection.scaladsl.Handler
import akka.projection.scaladsl.SourceProvider
import akka.stream.scaladsl.FlowWithContext

/**
* Factories of [[Projection]] where the offset is stored in Cassandra. The envelope handler can
Expand All @@ -37,10 +39,9 @@ import akka.projection.scaladsl.SourceProvider
*/
@ApiMayChange
object CassandraProjection {
import CassandraProjectionImpl.AtLeastOnce
import CassandraProjectionImpl.AtMostOnce
import CassandraProjectionImpl.GroupedHandlerStrategy
import CassandraProjectionImpl.SingleHandlerStrategy
import CassandraProjectionImpl.FlowHandlerStrategy

/**
* Create a [[Projection]] with at-least-once processing semantics. It stores the offset in Cassandra
Expand Down Expand Up @@ -87,6 +88,39 @@ object CassandraProjection {
handlerStrategy = GroupedHandlerStrategy(handler),
statusObserver = NoopStatusObserver)

/**
* Create a [[Projection]] with a [[FlowWithContext]] as the envelope handler. It has at-least-once processing
* semantics.
*
* The flow should emit a `Done` element for each completed envelope. The offset of the envelope is carried
* in the context of the `FlowWithContext` and is stored in Cassandra when corresponding `Done` is emitted.
* Since the offset is stored after processing the envelope it means that if the
* projection is restarted from previously stored offset then some envelopes may be processed more than once.
*
* If the flow filters out envelopes the corresponding offset will not be stored, and such envelope
* will be processed again if the projection is restarted and no later offset was stored.
*
* The flow should not duplicate emitted envelopes (`mapConcat`) with same offset, because then it can result in
* that the first offset is stored and when the projection is restarted that offset is considered completed even
* though more of the duplicated enveloped were never processed.
*
* The flow must not reorder elements, because the offsets may be stored in the wrong order and
* and when the projection is restarted all envelopes up to the latest stored offset are considered
* completed even though some of them may not have been processed. This is the reason the flow is
* restricted to `FlowWithContext` rather than ordinary `Flow`.
*/
def atLeastOnceFlow[Offset, Envelope](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
handler: FlowWithContext[Envelope, Envelope, Done, Envelope, _]): AtLeastOnceFlowCassandraProjection[Envelope] =
new CassandraProjectionImpl(
projectionId,
sourceProvider,
settingsOpt = None,
offsetStrategy = AtLeastOnce(),
handlerStrategy = FlowHandlerStrategy(handler),
statusObserver = NoopStatusObserver)

/**
* Create a [[Projection]] with at-most-once processing semantics. It stores the offset in Cassandra
* before the `handler` has processed the envelope. This means that if the projection is restarted
Expand All @@ -105,7 +139,7 @@ object CassandraProjection {
statusObserver = NoopStatusObserver)
}

trait CassandraProjection[Envelope] extends Projection[Envelope] {
@DoNotInherit trait CassandraProjection[Envelope] extends Projection[Envelope] {
private[cassandra] def offsetStrategy: OffsetStrategy

override def withSettings(settings: ProjectionSettings): CassandraProjection[Envelope]
Expand All @@ -120,7 +154,7 @@ trait CassandraProjection[Envelope] extends Projection[Envelope] {
def createOffsetTableIfNotExists()(implicit system: ActorSystem[_]): Future[Done]
}

trait AtLeastOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
@DoNotInherit trait AtLeastOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
private[cassandra] def atLeastOnceStrategy: AtLeastOnce = offsetStrategy.asInstanceOf[AtLeastOnce]

override def withSettings(settings: ProjectionSettings): AtLeastOnceCassandraProjection[Envelope]
Expand All @@ -132,7 +166,7 @@ trait AtLeastOnceCassandraProjection[Envelope] extends CassandraProjection[Envel
def withRecoveryStrategy(recoveryStrategy: HandlerRecoveryStrategy): AtLeastOnceCassandraProjection[Envelope]
}

trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
@DoNotInherit trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): GroupedCassandraProjection[Envelope]

override def withStatusObserver(observer: StatusObserver[Envelope]): GroupedCassandraProjection[Envelope]
Expand All @@ -142,7 +176,7 @@ trait GroupedCassandraProjection[Envelope] extends CassandraProjection[Envelope]
def withRecoveryStrategy(recoveryStrategy: HandlerRecoveryStrategy): GroupedCassandraProjection[Envelope]
}

trait AtMostOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
@DoNotInherit trait AtMostOnceCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
private[cassandra] def atMostOnceStrategy: AtMostOnce = offsetStrategy.asInstanceOf[AtMostOnce]

override def withSettings(settings: ProjectionSettings): AtMostOnceCassandraProjection[Envelope]
Expand All @@ -151,3 +185,9 @@ trait AtMostOnceCassandraProjection[Envelope] extends CassandraProjection[Envelo

def withRecoveryStrategy(recoveryStrategy: StrictRecoveryStrategy): AtMostOnceCassandraProjection[Envelope]
}

@DoNotInherit trait AtLeastOnceFlowCassandraProjection[Envelope] extends CassandraProjection[Envelope] {
override def withSettings(settings: ProjectionSettings): AtLeastOnceFlowCassandraProjection[Envelope]

def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): AtLeastOnceFlowCassandraProjection[Envelope]
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import akka.projection.scaladsl.SourceProvider
import akka.projection.testkit.scaladsl.ProjectionTestKit
import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry
import akka.stream.scaladsl.FlowWithContext
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Source
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
Expand Down Expand Up @@ -616,6 +618,47 @@ class CassandraProjectionSpec
}
}

"A Cassandra flow projection" must {

"persist projection and offset" in {
val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val flowHandler =
FlowWithContext[Envelope, Envelope]
.mapAsync(1) { env =>
repository.concatToText(env.id, env.message)
}

// throttle not in FlowWithContext yet. It's possible to do like this.
val _ =
Flow[Envelope]
.throttle(1, 50.millis)
.asFlowWithContext[Envelope, Envelope, Envelope]({
case (env, _) => env
}) { env => env }
.mapAsync(1) { env =>
repository.concatToText(env.id, env.message)
}

val projection =
CassandraProjection
.atLeastOnceFlow(projectionId, sourceProvider(system, entityId), flowHandler)
.withSaveOffset(1, 1.minute)

projectionTestKit.run(projection) {
withClue("check - all values were concatenated") {
val concatStr = repository.findById(entityId).futureValue.get
concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr"
}
}
withClue("check - all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue.get
offset shouldBe 6L
}
}
}

"A Cassandra at-most-once projection" must {

"persist projection and offset" in {
Expand Down
Loading

0 comments on commit 04d2bf4

Please sign in to comment.