Skip to content

Commit 50152f5

Browse files
authored
Merge pull request #223 from mbonneau/takeWhile-inclusive
Implement inclusive takeWhile
2 parents e910b35 + 7d326cd commit 50152f5

File tree

3 files changed

+54
-4
lines changed

3 files changed

+54
-4
lines changed

src/Observable.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -675,10 +675,10 @@ public function takeUntil(ObservableInterface $other): Observable
675675
* @operator
676676
* @reactivex takeWhile
677677
*/
678-
public function takeWhile(callable $predicate): Observable
678+
public function takeWhile(callable $predicate, bool $inclusive = false): Observable
679679
{
680-
return $this->lift(function () use ($predicate) {
681-
return new TakeWhileOperator($predicate);
680+
return $this->lift(function () use ($predicate, $inclusive) {
681+
return new TakeWhileOperator($predicate, $inclusive);
682682
});
683683
}
684684

src/Operator/TakeWhileOperator.php

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
final class TakeWhileOperator implements OperatorInterface
1313
{
1414
private $predicate;
15+
private $inclusive;
1516

16-
public function __construct(callable $predicate)
17+
public function __construct(callable $predicate, bool $inclusive = false)
1718
{
1819
$this->predicate = $predicate;
20+
$this->inclusive = $inclusive;
1921
}
2022

2123
public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
@@ -25,6 +27,9 @@ public function __invoke(ObservableInterface $observable, ObserverInterface $obs
2527
if (($this->predicate)($value)) {
2628
$observer->onNext($value);
2729
} else {
30+
if ($this->inclusive) {
31+
$observer->onNext($value);
32+
}
2833
$observer->onCompleted();
2934
}
3035
} catch (\Throwable $e) {

test/Rx/Functional/Operator/TakeWhileTest.php

+45
Original file line numberDiff line numberDiff line change
@@ -440,4 +440,49 @@ private function isPrime($num)
440440

441441
return true;
442442
}
443+
444+
/**
445+
* @test
446+
*/
447+
public function takeWhile_inclusive()
448+
{
449+
$error = new \Exception();
450+
451+
$xs = $this->createHotObservable([
452+
onNext(90, -1),
453+
onNext(110, -1),
454+
onNext(210, 2),
455+
onNext(260, 5),
456+
onError(270, $error),
457+
onCompleted(280),
458+
onNext(290, 13),
459+
onNext(320, 3),
460+
onNext(350, 7),
461+
onNext(390, 4),
462+
onNext(410, 17),
463+
onNext(450, 8),
464+
onNext(500, 23)
465+
]);
466+
467+
$invoked = 0;
468+
469+
$results = $this->scheduler->startWithCreate(function () use ($xs, &$invoked) {
470+
return $xs->takeWhile(function ($x) use (&$invoked) {
471+
$invoked++;
472+
return $x != 5;
473+
}, true);
474+
});
475+
476+
$this->assertMessages([
477+
onNext(210, 2),
478+
onNext(260, 5),
479+
onCompleted(260)
480+
], $results->getMessages());
481+
482+
$this->assertSubscriptions([
483+
subscribe(200, 260)
484+
], $xs->getSubscriptions());
485+
486+
$this->assertEquals(2, $invoked);
487+
}
443488
}

0 commit comments

Comments
 (0)