diff --git a/rxjava-contrib/rxjava-async-util/build.gradle b/rxjava-contrib/rxjava-async-util/build.gradle
deleted file mode 100644
index e7d19ce23f..0000000000
--- a/rxjava-contrib/rxjava-async-util/build.gradle
+++ /dev/null
@@ -1,21 +0,0 @@
-apply plugin: 'osgi'
-
-sourceCompatibility = JavaVersion.VERSION_1_6
-targetCompatibility = JavaVersion.VERSION_1_6
-
-dependencies {
- compile project(':rxjava')
- testCompile project(":rxjava").sourceSets.test.output
- testCompile 'junit:junit-dep:4.10'
- testCompile 'org.mockito:mockito-core:1.8.5'
-}
-
-jar {
- manifest {
- name = 'rxjava-async-util'
- instruction 'Bundle-Vendor', 'Netflix'
- instruction 'Bundle-DocURL', 'https://github.com/ReactiveX/RxJava'
- instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
- instruction 'Fragment-Host', 'com.netflix.rxjava.core'
- }
-}
diff --git a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java
deleted file mode 100644
index 8aa13f6119..0000000000
--- a/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java
+++ /dev/null
@@ -1,1619 +0,0 @@
-/**
- * Copyright 2014 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package rx.util.async;
-
-import rx.*;
-import rx.Scheduler.Worker;
-import rx.functions.*;
-import rx.schedulers.Schedulers;
-import rx.subjects.AsyncSubject;
-import rx.subjects.PublishSubject;
-import rx.subjects.Subject;
-import rx.subscriptions.SerialSubscription;
-import rx.util.async.operators.*;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-
-/**
- * Utility methods to convert functions and actions into asynchronous operations through the Observable/Observer
- * pattern.
- */
-public final class Async {
-
- private Async() {
- throw new IllegalStateException("No instances!");
- }
-
- /**
- * Invokes the specified function asynchronously and returns an Observable that emits the result.
- *
- * Note: The function is called immediately and once, not whenever an observer subscribes to the resulting
- * Observable. Multiple subscriptions to this Observable observe the same return value.
- *
- *
- *
- * @param the result value type
- * @param func function to run asynchronously
- * @return an Observable that emits the function's result value, or notifies observers of an exception
- * @see RxJava Wiki: start()
- * @see MSDN: Observable.Start
- */
- public static Observable start(Func0 func) {
- return Async.toAsync(func).call();
- }
-
- /**
- * Invokes the specified function asynchronously on the specified Scheduler and returns an Observable that
- * emits the result.
- *
- * Note: The function is called immediately and once, not whenever an observer subscribes to the resulting
- * Observable. Multiple subscriptions to this Observable observe the same return value.
- *
- *
- *
- * @param the result value type
- * @param func function to run asynchronously
- * @param scheduler Scheduler to run the function on
- * @return an Observable that emits the function's result value, or notifies observers of an exception
- * @see RxJava Wiki: start()
- * @see MSDN: Observable.Start
- */
- public static Observable start(Func0 func, Scheduler scheduler) {
- return Async.toAsync(func, scheduler).call();
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func0> toAsync(Action0 action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the result value type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func0> toAsync(Func0 extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param first parameter type of the action
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func1> toAsync(Action1 super T1> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param first parameter type of the action
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func1> toAsync(Func1 super T1, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func2> toAsync(Action2 super T1, ? super T2> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func2> toAsync(Func2 super T1, ? super T2, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func3> toAsync(Action3 super T1, ? super T2, ? super T3> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func3> toAsync(Func3 super T1, ? super T2, ? super T3, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func4> toAsync(Action4 super T1, ? super T2, ? super T3, ? super T4> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func4> toAsync(Func4 super T1, ? super T2, ? super T3, ? super T4, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func5> toAsync(Action5 super T1, ? super T2, ? super T3, ? super T4, ? super T5> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func5> toAsync(Func5 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func6> toAsync(Action6 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func6> toAsync(Func6 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param the seventh parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func7> toAsync(Action7 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param the seventh parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func7> toAsync(Func7 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param the seventh parameter type
- * @param the eighth parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func8> toAsync(Action8 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param the seventh parameter type
- * @param the eighth parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func8> toAsync(Func8 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param the seventh parameter type
- * @param the eighth parameter type
- * @param the ninth parameter type
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func9> toAsync(Action9 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9> action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param the sixth parameter type
- * @param the seventh parameter type
- * @param the eighth parameter type
- * @param the ninth parameter type
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func9> toAsync(Func9 super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param action the action to convert
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- */
- public static FuncN> toAsync(ActionN action) {
- return toAsync(action, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the result type
- * @param func the function to convert
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- */
- public static FuncN> toAsync(FuncN extends R> func) {
- return toAsync(func, Schedulers.computation());
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param action the action to convert
- * @param scheduler the Scheduler used to execute the {@code action}
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func0> toAsync(final Action0 action, final Scheduler scheduler) {
- return toAsync(Actions.toFunc(action), scheduler);
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the result type
- * @param func the function to convert
- * @param scheduler the Scheduler used to call the {@code func}
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func0> toAsync(final Func0 extends R> func, final Scheduler scheduler) {
- return new Func0>() {
- @Override
- public Observable call() {
- final AsyncSubject subject = AsyncSubject.create();
- final Worker inner = scheduler.createWorker();
- inner.schedule(new Action0() {
- @Override
- public void call() {
- R result;
- try {
- result = func.call();
- } catch (Throwable t) {
- subject.onError(t);
- return;
- } finally {
- inner.unsubscribe();
- }
- subject.onNext(result);
- subject.onCompleted();
- }
- });
- return subject;
- }
- };
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param action the Action to convert
- * @param scheduler the Scheduler used to execute the {@code action}
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func1> toAsync(final Action1 super T1> action, final Scheduler scheduler) {
- return toAsync(Actions.toFunc(action), scheduler);
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the result type
- * @param func the function to convert
- * @param scheduler the Scheduler used to call the {@code func}
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func1> toAsync(final Func1 super T1, ? extends R> func, final Scheduler scheduler) {
- return new Func1>() {
- @Override
- public Observable call(final T1 t1) {
- final AsyncSubject subject = AsyncSubject.create();
- final Worker inner = scheduler.createWorker();
- inner.schedule(new Action0() {
- @Override
- public void call() {
- R result;
- try {
- result = func.call(t1);
- } catch (Throwable t) {
- subject.onError(t);
- return;
- } finally {
- inner.unsubscribe();
- }
- subject.onNext(result);
- subject.onCompleted();
- }
- });
- return subject;
- }
- };
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param action the action to convert
- * @param scheduler the Scheduler used to execute the {@code action}
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func2> toAsync(final Action2 super T1, ? super T2> action, final Scheduler scheduler) {
- return toAsync(Actions.toFunc(action), scheduler);
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the result type
- * @param func the function to convert
- * @param scheduler the Scheduler used to call the {@code func}
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func2> toAsync(final Func2 super T1, ? super T2, ? extends R> func, final Scheduler scheduler) {
- return new Func2>() {
- @Override
- public Observable call(final T1 t1, final T2 t2) {
- final AsyncSubject subject = AsyncSubject.create();
- final Worker inner = scheduler.createWorker();
- inner.schedule(new Action0() {
- @Override
- public void call() {
- R result;
- try {
- result = func.call(t1, t2);
- } catch (Throwable t) {
- subject.onError(t);
- return;
- } finally {
- inner.unsubscribe();
- }
- subject.onNext(result);
- subject.onCompleted();
- }
- });
- return subject;
- }
- };
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param action the action to convert
- * @param scheduler the Scheduler used to execute the {@code action}
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func3> toAsync(final Action3 super T1, ? super T2, ? super T3> action, final Scheduler scheduler) {
- return toAsync(Actions.toFunc(action), scheduler);
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the result type
- * @param func the function to convert
- * @param scheduler the Scheduler used to call the {@code func}
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func3> toAsync(final Func3 super T1, ? super T2, ? super T3, ? extends R> func, final Scheduler scheduler) {
- return new Func3>() {
- @Override
- public Observable call(final T1 t1, final T2 t2, final T3 t3) {
- final AsyncSubject subject = AsyncSubject.create();
- final Worker inner = scheduler.createWorker();
- inner.schedule(new Action0() {
- @Override
- public void call() {
- R result;
- try {
- result = func.call(t1, t2, t3);
- } catch (Throwable t) {
- subject.onError(t);
- return;
- } finally {
- inner.unsubscribe();
- }
- subject.onNext(result);
- subject.onCompleted();
- }
- });
- return subject;
- }
- };
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param action the action to convert
- * @param scheduler the Scheduler used to execute the {@code action}
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func4> toAsync(final Action4 super T1, ? super T2, ? super T3, ? super T4> action, final Scheduler scheduler) {
- return toAsync(Actions.toFunc(action), scheduler);
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the result type
- * @param func the function to convert
- * @param scheduler the Scheduler used to call the {@code func}
- * @return a function that returns an Observable that executes the {@code func} and emits its returned value
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func4> toAsync(final Func4 super T1, ? super T2, ? super T3, ? super T4, ? extends R> func, final Scheduler scheduler) {
- return new Func4>() {
- @Override
- public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) {
- final AsyncSubject subject = AsyncSubject.create();
- final Worker inner = scheduler.createWorker();
- inner.schedule(new Action0() {
- @Override
- public void call() {
- R result;
- try {
- result = func.call(t1, t2, t3, t4);
- } catch (Throwable t) {
- subject.onError(t);
- return;
- } finally {
- inner.unsubscribe();
- }
- subject.onNext(result);
- subject.onCompleted();
- }
- });
- return subject;
- }
- };
- }
-
- /**
- * Convert a synchronous action call into an asynchronous function call through an Observable.
- *
- *
- *
- * @param the first parameter type
- * @param the second parameter type
- * @param the third parameter type
- * @param the fourth parameter type
- * @param the fifth parameter type
- * @param action the action to convert
- * @param scheduler the Scheduler used to execute the {@code action}
- * @return a function that returns an Observable that executes the {@code action} and emits {@code null}
- * @see RxJava Wiki: toAsync()
- * @see MSDN: Observable.ToAsync
- */
- public static Func5> toAsync(final Action5 super T1, ? super T2, ? super T3, ? super T4, ? super T5> action, final Scheduler scheduler) {
- return toAsync(Actions.toFunc(action), scheduler);
- }
-
- /**
- * Convert a synchronous function call into an asynchronous function call through an Observable.
- *
- *