-
Notifications
You must be signed in to change notification settings - Fork 141
/
Copy pathTakeOperator.php
49 lines (40 loc) · 1.31 KB
/
TakeOperator.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?php
declare(strict_types = 1);
namespace Rx\Operator;
use Rx\DisposableInterface;
use Rx\ObservableInterface;
use Rx\Observer\CallbackObserver;
use Rx\ObserverInterface;
final class TakeOperator implements OperatorInterface
{
private $count;
public function __construct(int $count)
{
if ($count < 0) {
throw new \InvalidArgumentException('Count must be >= 0');
}
$this->count = $count;
}
public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
{
$remaining = $this->count;
$callbackObserver = new CallbackObserver(
function ($nextValue) use ($observer, &$remaining, &$disposable) {
if ($remaining > 0) {
$remaining--;
$observer->onNext($nextValue);
if ($remaining === 0) {
$observer->onCompleted();
if ($disposable instanceof DisposableInterface) {
$disposable->dispose();
}
}
}
},
[$observer, 'onError'],
[$observer, 'onCompleted']
);
$disposable = $observable->subscribe($callbackObserver);
return $disposable;
}
}