diff --git a/src/Operator/TakeOperator.php b/src/Operator/TakeOperator.php index f9585e96..a4b68fe7 100644 --- a/src/Operator/TakeOperator.php +++ b/src/Operator/TakeOperator.php @@ -27,12 +27,15 @@ public function __invoke(ObservableInterface $observable, ObserverInterface $obs $remaining = $this->count; $callbackObserver = new CallbackObserver( - function ($nextValue) use ($observer, &$remaining) { + function ($nextValue) use ($observer, &$remaining, &$disposable) { if ($remaining > 0) { $remaining--; $observer->onNext($nextValue); if ($remaining === 0) { $observer->onCompleted(); + if ($disposable instanceof DisposableInterface) { + $disposable->dispose(); + } } } }, @@ -40,6 +43,7 @@ function ($nextValue) use ($observer, &$remaining) { [$observer, 'onCompleted'] ); - return $observable->subscribe($callbackObserver); + $disposable = $observable->subscribe($callbackObserver); + return $disposable; } }