Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions airframe-rx/src/main/scala/wvlet/airframe/rx/Rx.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ trait Rx[+A] extends RxOps[A] {
*/
def map[B](f: A => B): Rx[B] = MapOp(this, f)

/**
* Applies `f` to the input value and return the result. The function can signal completion by returning None. When
* None is returned, the stream will complete gracefully.
* @param f
* function that returns Some(result) to continue, or None to complete the stream
* @tparam B
* @return
*/
def mapWithCompletion[B](f: A => Option[B]): Rx[B] = MapWithCompletionOp(this, f)

/**
* Applies `f` to the input value that produces another Rx stream. This method is an alias of flatMap(f)
* @param f
Expand All @@ -175,6 +185,16 @@ trait Rx[+A] extends RxOps[A] {
*/
def flatMap[B](f: A => RxOps[B]): Rx[B] = FlatMapOp(this, f)

/**
* Applies `f` to the input value that produces another Rx stream. The function can signal completion by returning
* None. When None is returned, the stream will complete gracefully.
* @param f
* function that returns Some(Rx) to continue, or None to complete the stream
* @tparam B
* @return
*/
def flatMapWithCompletion[B](f: A => Option[RxOps[B]]): Rx[B] = FlatMapWithCompletionOp(this, f)

/**
* Applies the given filter and emit the value only when the filter condition matches
* @param f
Expand Down Expand Up @@ -880,9 +900,11 @@ object Rx extends LogSupport {
override def parents: Seq[RxOps[_]] = Seq(input)
}

case class MapOp[A, B](input: Rx[A], f: A => B) extends UnaryRx[A, B]
case class FlatMapOp[A, B](input: Rx[A], f: A => RxOps[B]) extends UnaryRx[A, B]
case class FilterOp[A](input: Rx[A], cond: A => Boolean) extends UnaryRx[A, A]
case class MapOp[A, B](input: Rx[A], f: A => B) extends UnaryRx[A, B]
case class MapWithCompletionOp[A, B](input: Rx[A], f: A => Option[B]) extends UnaryRx[A, B]
case class FlatMapOp[A, B](input: Rx[A], f: A => RxOps[B]) extends UnaryRx[A, B]
case class FlatMapWithCompletionOp[A, B](input: Rx[A], f: A => Option[RxOps[B]]) extends UnaryRx[A, B]
case class FilterOp[A](input: Rx[A], cond: A => Boolean) extends UnaryRx[A, A]
case class ZipOp[A, B](a: RxOps[A], b: RxOps[B]) extends Rx[(A, B)] {
override def parents: Seq[RxOps[_]] = Seq(a, b)
}
Expand Down
51 changes: 51 additions & 0 deletions airframe-rx/src/main/scala/wvlet/airframe/rx/RxRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ class RxRunner(
case other =>
effect(other)
}
case MapWithCompletionOp(in, f) =>
run(in) {
case OnNext(v) =>
Try(f.asInstanceOf[Any => Option[A]](v)) match {
case Success(Some(x)) =>
effect(OnNext(x))
case Success(None) =>
// The function requested completion
effect(OnCompletion)
RxResult.Stop
case Failure(e) =>
effect(OnError(e))
}
case other =>
effect(other)
}
case fm @ FlatMapOp(in, f) =>
// This var is a placeholder to remember the preceding Cancelable operator, which will be updated later
var c1 = Cancelable.empty
Expand Down Expand Up @@ -147,6 +163,41 @@ class RxRunner(
Cancelable { () =>
c1.cancel; c2.cancel
}
case fmwc @ FlatMapWithCompletionOp(in, f) =>
// This var is a placeholder to remember the preceding Cancelable operator, which will be updated later
var c1 = Cancelable.empty
val c2 = run(fmwc.input) {
case OnNext(x) =>
var toContinue: RxResult = RxResult.Continue
Try(fmwc.f.asInstanceOf[Function[Any, Option[RxOps[_]]]](x)) match {
case Success(Some(rxb)) =>
// This code is necessary to properly cancel the effect if this operator is evaluated before
c1.cancel
c1 = run(rxb) {
case n @ OnNext(x) =>
toContinue = effect(n)
toContinue
case OnCompletion =>
// skip the end of the nested flatMap body stream
RxResult.Continue
case ev @ OnError(e) =>
toContinue = effect(ev)
toContinue
}
toContinue
case Success(None) =>
// The function requested completion
effect(OnCompletion)
RxResult.Stop
case Failure(e) =>
effect(OnError(e))
}
case other =>
effect(other)
}
Cancelable { () =>
c1.cancel; c2.cancel
}
case FilterOp(in, cond) =>
run(in) { ev =>
ev match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package wvlet.airframe.rx

import wvlet.airspec.AirSpec

class RxSafeCancellationTest extends AirSpec {

test("mapWithCompletion should allow safe completion signaling") {
val results = scala.collection.mutable.ListBuffer[Int]()
val rx = Rx.sequence(1, 2, 3, 4, 5)

val c = rx
.mapWithCompletion { x =>
results += x
if (x == 3) {
// Request completion by returning None
None
} else {
Some(x * 2)
}
}.run()

// Should process 1, 2, 3 but not 4, 5
results.toList shouldBe List(1, 2, 3)
}

test("mapWithCompletion should work with early completion") {
val results = scala.collection.mutable.ListBuffer[Int]()

val c = Rx
.sequence(1, 2, 3, 4, 5).mapWithCompletion { x =>
if (x == 2) {
None // Complete immediately when seeing 2
} else {
results += x
Some(x * 10)
}
}.run()

// Should only process 1, then complete when seeing 2
results.toList shouldBe List(1)
}

test("mapWithCompletion should handle empty sequence") {
val results = scala.collection.mutable.ListBuffer[Int]()

val c = Rx
.empty[Int].mapWithCompletion { x =>
results += x
Some(x * 2)
}.run()

// Should have no results for empty sequence
results.toList shouldBe List()
}

test("mapWithCompletion should propagate errors") {
val ex = new RuntimeException("test error")

val thrown = intercept[RuntimeException] {
Rx.sequence(1, 2, 3).mapWithCompletion { x =>
if (x == 2) {
throw ex
}
Some(x * 2)
}.run()
}

thrown shouldBe ex
}

test("flatMapWithCompletion should allow safe completion signaling") {
val results = scala.collection.mutable.ListBuffer[Int]()
val rx = Rx.sequence(1, 2, 3, 4, 5)

val c = rx
.flatMapWithCompletion { x =>
results += x
if (x == 3) {
// Request completion by returning None
None
} else {
Some(Rx.single(x * 10))
}
}.run()

// Should process 1, 2, 3 but not 4, 5
results.toList shouldBe List(1, 2, 3)
}

test("flatMapWithCompletion should work with early completion") {
val results = scala.collection.mutable.ListBuffer[Int]()

val c = Rx
.sequence(1, 2, 3, 4, 5).flatMapWithCompletion { x =>
if (x == 2) {
None // Complete immediately when seeing 2
} else {
results += x
Some(Rx.single(x * 100))
}
}.run()

// Should only process 1, then complete when seeing 2
results.toList shouldBe List(1)
}

}