diff --git a/pmd.xml b/pmd.xml
index e6fbccdd95..728e1e4d1d 100644
--- a/pmd.xml
+++ b/pmd.xml
@@ -71,7 +71,6 @@
-
@@ -110,7 +109,6 @@
-
@@ -134,9 +132,6 @@
-
-
-
@@ -160,7 +155,6 @@
-
@@ -183,8 +177,6 @@
-
-
diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index 89c268f06e..9e91e992f7 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -31,6 +31,12 @@
* The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?
*/
public abstract class Completable implements CompletableConsumable {
+ /** Single instance of a complete Completable. */
+ static final Completable COMPLETE = new CompletableEmpty();
+
+ /** Single instance of a never Completable. */
+ static final Completable NEVER = new CompletableNever();
+
/**
* Convenience interface and callback used by the lift operator that given a child CompletableSubscriber,
* return a parent CompletableSubscriber that does any kind of lifecycle-related transformations.
@@ -47,12 +53,6 @@ public interface CompletableTransformer extends Function implements Publisher {
+ private static final Object OBJECT = new Object();
+ /** The default buffer size. */
+ static final int BUFFER_SIZE;
+ static {
+ BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
+ }
+
/**
* Interface to map/wrap a downstream subscriber to an upstream subscriber.
*
@@ -53,12 +60,6 @@ public interface Transformer extends Function, Publisher ext
}
- /** The default buffer size. */
- static final int BUFFER_SIZE;
- static {
- BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
- }
-
/** A never observable instance as there is no need to instantiate this more than once. */
static final Flowable NEVER = create(new Publisher() {
@Override
@@ -459,8 +460,7 @@ public static Flowable fromFuture(Future extends T> future) {
public static Flowable fromFuture(Future extends T> future, long timeout, TimeUnit unit) {
Objects.requireNonNull(future, "future is null");
Objects.requireNonNull(unit, "unit is null");
- Flowable o = new FlowableFromFuture(future, timeout, unit);
- return o;
+ return new FlowableFromFuture(future, timeout, unit);
}
@BackpressureSupport(BackpressureKind.FULL)
@@ -1613,8 +1613,6 @@ public Publisher apply(Long v) {
});
}
- private static final Object OBJECT = new Object();
-
@SuppressWarnings({ "rawtypes", "unchecked" })
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1622,7 +1620,7 @@ public final Flowable delaySubscription(final Supplier extends Publishe
Objects.requireNonNull(delaySupplier, "delaySupplier is null");
return fromCallable(new Callable() {
@Override
- public Object call() throws Exception {
+ public Object call() {
return delaySupplier.get();
}
})
@@ -2936,7 +2934,7 @@ public final void subscribe(Subscriber super T> s) {
}
subscribeActual(s);
- } catch (NullPointerException e) {
+ } catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
// TODO throw if fatal?
diff --git a/src/main/java/io/reactivex/Notification.java b/src/main/java/io/reactivex/Notification.java
index 2a2a1dd871..d21c53418e 100644
--- a/src/main/java/io/reactivex/Notification.java
+++ b/src/main/java/io/reactivex/Notification.java
@@ -19,12 +19,12 @@
* Utility class to help construct notification objects.
*/
public final class Notification {
+ static final Try> COMPLETE = Try.ofValue(Optional.empty());
+
private Notification() {
throw new IllegalStateException();
}
- static final Try> COMPLETE = Try.ofValue(Optional.empty());
-
@SuppressWarnings({ "rawtypes", "unchecked" })
public static Try> complete() {
return (Try)COMPLETE;
diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java
index 4ae2a24f52..85f6b29e60 100644
--- a/src/main/java/io/reactivex/Observable.java
+++ b/src/main/java/io/reactivex/Observable.java
@@ -38,7 +38,8 @@
* @param
*/
public abstract class Observable implements ObservableConsumable {
-
+ static final Object OBJECT = new Object();
+
public interface NbpOperator extends Function, Observer super Upstream>> {
}
@@ -64,11 +65,9 @@ public void subscribe(Observer super Object> s) {
}
});
- static final Object OBJECT = new Object();
-
public static Observable amb(Iterable extends ObservableConsumable extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
- return (new ObservableAmb(null, sources));
+ return new ObservableAmb(null, sources);
}
@SuppressWarnings("unchecked")
@@ -82,7 +81,7 @@ public static Observable amb(ObservableConsumable extends T>... sources
if (len == 1) {
return (Observable)sources[0]; // FIXME wrap()
}
- return (new ObservableAmb(sources, null));
+ return new ObservableAmb(sources, null);
}
/**
@@ -116,7 +115,7 @@ public static Observable combineLatest(Iterable extends ObservableCo
// the queue holds a pair of values so we need to double the capacity
int s = bufferSize << 1;
- return (new ObservableCombineLatest(null, sources, combiner, s, delayError));
+ return new ObservableCombineLatest(null, sources, combiner, s, delayError);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -138,7 +137,7 @@ public static Observable combineLatest(ObservableConsumable extends
}
// the queue holds a pair of values so we need to double the capacity
int s = bufferSize << 1;
- return (new ObservableCombineLatest(sources, null, combiner, s, delayError));
+ return new ObservableCombineLatest(sources, null, combiner, s, delayError);
}
@SuppressWarnings("unchecked")
@@ -368,7 +367,7 @@ public static Observable wrap(ObservableConsumable onSubscribe) {
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable defer(Supplier extends ObservableConsumable extends T>> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
- return (new ObservableDefer(supplier));
+ return new ObservableDefer(supplier);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -380,7 +379,7 @@ public static Observable empty() {
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable error(Supplier extends Throwable> errorSupplier) {
Objects.requireNonNull(errorSupplier, "errorSupplier is null");
- return (new ObservableError(errorSupplier));
+ return new ObservableError(errorSupplier);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -403,13 +402,13 @@ public static Observable fromArray(T... values) {
if (values.length == 1) {
return just(values[0]);
}
- return (new ObservableFromArray(values));
+ return new ObservableFromArray(values);
}
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable fromCallable(Callable extends T> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
- return (new ObservableFromCallable(supplier));
+ return new ObservableFromCallable(supplier);
}
/*
@@ -420,15 +419,14 @@ public static Observable fromCallable(Callable extends T> supplier) {
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable fromFuture(Future extends T> future) {
Objects.requireNonNull(future, "future is null");
- return (new ObservableFromFuture(future, 0L, null));
+ return new ObservableFromFuture(future, 0L, null);
}
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable fromFuture(Future extends T> future, long timeout, TimeUnit unit) {
Objects.requireNonNull(future, "future is null");
Objects.requireNonNull(unit, "unit is null");
- Observable o = create(new ObservableFromFuture(future, timeout, unit));
- return o;
+ return new ObservableFromFuture(future, timeout, unit);
}
@SchedulerSupport(SchedulerSupport.CUSTOM)
@@ -447,7 +445,7 @@ public static Observable fromFuture(Future extends T> future, Scheduler
public static Observable fromIterable(Iterable extends T> source) {
Objects.requireNonNull(source, "source is null");
- return (new ObservableFromIterable(source));
+ return new ObservableFromIterable(source);
}
public static Observable fromPublisher(final Publisher extends T> publisher) {
@@ -533,7 +531,7 @@ public static Observable generate(Supplier initialState, BiFunction
Objects.requireNonNull(initialState, "initialState is null");
Objects.requireNonNull(generator, "generator is null");
Objects.requireNonNull(disposeState, "diposeState is null");
- return (new ObservableGenerate(initialState, generator, disposeState));
+ return new ObservableGenerate(initialState, generator, disposeState);
}
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@@ -552,7 +550,7 @@ public static Observable interval(long initialDelay, long period, TimeUnit
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return (new ObservableInterval(initialDelay, period, unit, scheduler));
+ return new ObservableInterval(initialDelay, period, unit, scheduler);
}
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@@ -587,7 +585,7 @@ public static Observable intervalRange(long start, long count, long initia
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return (new ObservableIntervalRange(start, end, initialDelay, period, unit, scheduler));
+ return new ObservableIntervalRange(start, end, initialDelay, period, unit, scheduler);
}
public static Observable just(T value) {
@@ -907,7 +905,7 @@ public static Observable sequenceEqual(ObservableConsumable exten
Objects.requireNonNull(p2, "p2 is null");
Objects.requireNonNull(isEqual, "isEqual is null");
validateBufferSize(bufferSize);
- return (new ObservableSequenceEqual(p1, p2, isEqual, bufferSize));
+ return new ObservableSequenceEqual(p1, p2, isEqual, bufferSize);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -941,7 +939,7 @@ public static Observable timer(long delay, TimeUnit unit, Scheduler schedu
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return (new ObservableTimer(delay, unit, scheduler));
+ return new ObservableTimer(delay, unit, scheduler);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -954,7 +952,7 @@ public static Observable using(Supplier extends D> resourceSupplier,
Objects.requireNonNull(resourceSupplier, "resourceSupplier is null");
Objects.requireNonNull(sourceSupplier, "sourceSupplier is null");
Objects.requireNonNull(disposer, "disposer is null");
- return (new ObservableUsing(resourceSupplier, sourceSupplier, disposer, eager));
+ return new ObservableUsing(resourceSupplier, sourceSupplier, disposer, eager);
}
private static void validateBufferSize(int bufferSize) {
@@ -967,7 +965,7 @@ private static void validateBufferSize(int bufferSize) {
public static Observable zip(Iterable extends ObservableConsumable extends T>> sources, Function super Object[], ? extends R> zipper) {
Objects.requireNonNull(zipper, "zipper is null");
Objects.requireNonNull(sources, "sources is null");
- return (new ObservableZip(null, sources, zipper, bufferSize(), false));
+ return new ObservableZip(null, sources, zipper, bufferSize(), false);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -1082,7 +1080,7 @@ public static Observable zipArray(Function super Object[], ? extends
}
Objects.requireNonNull(zipper, "zipper is null");
validateBufferSize(bufferSize);
- return (new ObservableZip(sources, null, zipper, bufferSize, delayError));
+ return new ObservableZip(sources, null, zipper, bufferSize, delayError);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1092,14 +1090,14 @@ public static Observable zipIterable(Function super Object[], ? exte
Objects.requireNonNull(zipper, "zipper is null");
Objects.requireNonNull(sources, "sources is null");
validateBufferSize(bufferSize);
- return (new ObservableZip(null, sources, zipper, bufferSize, delayError));
+ return new ObservableZip(null, sources, zipper, bufferSize, delayError);
}
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable all(Predicate super T> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
- return (new ObservableAll(this, predicate));
+ return new ObservableAll(this, predicate);
}
@SuppressWarnings("unchecked")
@@ -1493,7 +1491,7 @@ public final Observable delay(Supplier extends ObservableConsumable<
@Experimental
public final Observable delaySubscription(ObservableConsumable other) {
Objects.requireNonNull(other, "other is null");
- return (new ObservableDelaySubscriptionOther(this, other));
+ return new ObservableDelaySubscriptionOther(this, other);
}
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@@ -1521,7 +1519,7 @@ public final Observable delaySubscription(final Supplier extends Observ
Objects.requireNonNull(delaySupplier, "delaySupplier is null");
return fromCallable(new Callable>() {
@Override
- public ObservableConsumable call() throws Exception {
+ public ObservableConsumable call() {
return delaySupplier.get();
}
})
@@ -1778,7 +1776,7 @@ public final Observable flatMap(Function super T, ? extends ObservableC
validateBufferSize(bufferSize);
if (this instanceof ObservableJust) {
ObservableJust scalar = (ObservableJust) this;
- return (scalar.scalarFlatMap(mapper));
+ return scalar.scalarFlatMap(mapper);
}
return lift(new NbpOperatorFlatMap(mapper, delayErrors, maxConcurrency, bufferSize));
}
@@ -2024,7 +2022,7 @@ public final Observable last(T defaultValue) {
public final Observable lift(NbpOperator extends R, ? super T> onLift) {
Objects.requireNonNull(onLift, "onLift is null");
- return (new ObservableLift(this, onLift));
+ return new ObservableLift(this, onLift);
}
public final Observable map(Function super T, ? extends R> mapper) {
@@ -2175,13 +2173,13 @@ public final Observable repeat(long times) {
if (times == 0) {
return empty();
}
- return (new ObservableRepeat(this, times));
+ return new ObservableRepeat(this, times);
}
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable repeatUntil(BooleanSupplier stop) {
Objects.requireNonNull(stop, "stop is null");
- return (new ObservableRepeatUntil(this, stop));
+ return new ObservableRepeatUntil(this, stop);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -2201,7 +2199,7 @@ public Object apply(Try> v) {
}
;
- return (new ObservableRedo(this, f));
+ return new ObservableRedo(this, f);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -2354,7 +2352,7 @@ public final Observable retry() {
public final Observable retry(BiPredicate super Integer, ? super Throwable> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
- return (new ObservableRetryBiPredicate(this, predicate));
+ return new ObservableRetryBiPredicate(this, predicate);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -2370,7 +2368,7 @@ public final Observable retry(long times, Predicate super Throwable> predic
}
Objects.requireNonNull(predicate, "predicate is null");
- return (new ObservableRetryPredicate(this, times, predicate));
+ return new ObservableRetryPredicate(this, times, predicate);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -2415,7 +2413,7 @@ public Throwable apply(Try> t) {
}
;
- return (new ObservableRedo(this, f));
+ return new ObservableRedo(this, f);
}
// TODO decide if safe subscription or unsafe should be the default
@@ -2649,7 +2647,7 @@ public final void subscribe(Observer super T> observer) {
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable subscribeOn(Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
- return (new ObservableSubscribeOn(this, scheduler));
+ return new ObservableSubscribeOn(this, scheduler);
}
@SchedulerSupport(SchedulerSupport.NONE)
@@ -3381,7 +3379,7 @@ public final Observable withLatestFrom(ObservableConsumable extends
public final Observable zipWith(Iterable other, BiFunction super T, ? super U, ? extends R> zipper) {
Objects.requireNonNull(other, "other is null");
Objects.requireNonNull(zipper, "zipper is null");
- return (new ObservableZipIterable(this, other, zipper));
+ return new ObservableZipIterable(this, other, zipper);
}
@SchedulerSupport(SchedulerSupport.NONE)
diff --git a/src/main/java/io/reactivex/Optional.java b/src/main/java/io/reactivex/Optional.java
index b8d0565bf0..e366639554 100644
--- a/src/main/java/io/reactivex/Optional.java
+++ b/src/main/java/io/reactivex/Optional.java
@@ -21,13 +21,13 @@
* @param the value type
*/
public final class Optional {
+ static final Optional EMPTY = new Optional(null);
+
final T value;
protected Optional(T value) {
this.value = value;
}
- static final Optional EMPTY = new Optional(null);
-
@SuppressWarnings("unchecked")
public static Optional empty() {
return (Optional)EMPTY;
@@ -48,7 +48,7 @@ public T get() {
@Override
public int hashCode() {
- final int prime = 31;
+ int prime = 31;
int result = 1;
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java
index b3ec304b07..8aaf298042 100644
--- a/src/main/java/io/reactivex/Single.java
+++ b/src/main/java/io/reactivex/Single.java
@@ -33,6 +33,8 @@
* @param the value type
*/
public abstract class Single implements SingleConsumable {
+
+ static final Single NEVER = new SingleNever();
public interface SingleOperator extends Function, SingleSubscriber super Upstream>> {
@@ -392,8 +394,6 @@ public static Flowable merge(
return merge(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9));
}
- static final Single NEVER = new SingleNever();
-
@SuppressWarnings("unchecked")
public static Single never() {
return (Single)NEVER;
@@ -409,7 +409,7 @@ public static Single timer(final long delay, final TimeUnit unit, final Sc
return new SingleTimer(delay, unit, scheduler);
}
- public static Single equals(final SingleConsumable extends T> first, final SingleConsumable extends T> second) {
+ public static Single equals(final SingleConsumable extends T> first, final SingleConsumable extends T> second) { // NOPMD
Objects.requireNonNull(first, "first is null");
Objects.requireNonNull(second, "second is null");
return new SingleEquals(first, second);
diff --git a/src/main/java/io/reactivex/exceptions/CompositeException.java b/src/main/java/io/reactivex/exceptions/CompositeException.java
index a10c428133..3214140df8 100644
--- a/src/main/java/io/reactivex/exceptions/CompositeException.java
+++ b/src/main/java/io/reactivex/exceptions/CompositeException.java
@@ -38,6 +38,7 @@ public final class CompositeException extends RuntimeException {
private final List exceptions;
private final String message;
+ private Throwable cause;
public CompositeException() {
this.exceptions = new ArrayList();
@@ -57,9 +58,9 @@ public CompositeException(Throwable... exceptions) {
}
- public CompositeException(String messagePrefix, Collection extends Throwable> errors) {
+ public CompositeException(Collection extends Throwable> errors) {
Set deDupedExceptions = new LinkedHashSet();
- List _exceptions = new ArrayList();
+ List localExceptions = new ArrayList();
if (errors != null) {
for (Throwable ex : errors) {
if (ex instanceof CompositeException) {
@@ -75,15 +76,11 @@ public CompositeException(String messagePrefix, Collection extends Throwable>
deDupedExceptions.add(new NullPointerException());
}
- _exceptions.addAll(deDupedExceptions);
- this.exceptions = Collections.unmodifiableList(_exceptions);
+ localExceptions.addAll(deDupedExceptions);
+ this.exceptions = Collections.unmodifiableList(localExceptions);
this.message = exceptions.size() + " exceptions occurred. ";
}
- public CompositeException(Collection extends Throwable> errors) {
- this(null, errors);
- }
-
/**
* Retrieves the list of exceptions that make up the {@code CompositeException}
*
@@ -109,16 +106,14 @@ public void suppress(Throwable e) {
}
- private Throwable cause = null;
-
@Override
- public synchronized Throwable getCause() {
+ public synchronized Throwable getCause() { // NOPMD
if (cause == null) {
// we lazily generate this causal chain if this is called
- CompositeExceptionCausalChain _cause = new CompositeExceptionCausalChain();
+ CompositeExceptionCausalChain localCause = new CompositeExceptionCausalChain();
Set seenCauses = new HashSet();
- Throwable chain = _cause;
+ Throwable chain = localCause;
for (Throwable e : exceptions) {
if (seenCauses.contains(e)) {
// already seen this outer Throwable so skip
@@ -147,7 +142,7 @@ public synchronized Throwable getCause() {
}
chain = chain.getCause();
}
- cause = _cause;
+ cause = localCause;
}
return cause;
}
@@ -185,14 +180,14 @@ public void printStackTrace(PrintWriter s) {
* stream to print to
*/
private void printStackTrace(PrintStreamOrWriter s) {
- StringBuilder bldr = new StringBuilder();
- bldr.append(this).append("\n");
+ StringBuilder bldr = new StringBuilder(128);
+ bldr.append(this).append('\n');
for (StackTraceElement myStackElement : getStackTrace()) {
- bldr.append("\tat ").append(myStackElement).append("\n");
+ bldr.append("\tat ").append(myStackElement).append('\n');
}
int i = 1;
for (Throwable ex : exceptions) {
- bldr.append(" ComposedException ").append(i).append(" :").append("\n");
+ bldr.append(" ComposedException ").append(i).append(" :\n");
appendStackTrace(bldr, ex, "\t");
i++;
}
@@ -202,9 +197,9 @@ private void printStackTrace(PrintStreamOrWriter s) {
}
private void appendStackTrace(StringBuilder bldr, Throwable ex, String prefix) {
- bldr.append(prefix).append(ex).append("\n");
+ bldr.append(prefix).append(ex).append('\n');
for (StackTraceElement stackElement : ex.getStackTrace()) {
- bldr.append("\t\tat ").append(stackElement).append("\n");
+ bldr.append("\t\tat ").append(stackElement).append('\n');
}
if (ex.getCause() != null) {
bldr.append("\tCaused by: ");
@@ -212,7 +207,7 @@ private void appendStackTrace(StringBuilder bldr, Throwable ex, String prefix) {
}
}
- private abstract static class PrintStreamOrWriter {
+ abstract static class PrintStreamOrWriter {
/** Returns the object to be locked when using this StreamOrWriter */
abstract Object lock();
@@ -223,7 +218,7 @@ private abstract static class PrintStreamOrWriter {
/**
* Same abstraction and implementation as in JDK to allow PrintStream and PrintWriter to share implementation
*/
- private static class WrappedPrintStream extends PrintStreamOrWriter {
+ static final class WrappedPrintStream extends PrintStreamOrWriter {
private final PrintStream printStream;
WrappedPrintStream(PrintStream printStream) {
@@ -241,7 +236,7 @@ void println(Object o) {
}
}
- private static class WrappedPrintWriter extends PrintStreamOrWriter {
+ static final class WrappedPrintWriter extends PrintStreamOrWriter {
private final PrintWriter printWriter;
WrappedPrintWriter(PrintWriter printWriter) {
@@ -259,9 +254,9 @@ void println(Object o) {
}
}
- /* package-private */final static class CompositeExceptionCausalChain extends RuntimeException {
+ final static class CompositeExceptionCausalChain extends RuntimeException {
private static final long serialVersionUID = 3875212506787802066L;
- /* package-private */static String MESSAGE = "Chain of Causes for CompositeException In Order Received =>";
+ /* package-private */static final String MESSAGE = "Chain of Causes for CompositeException In Order Received =>";
@Override
public String getMessage() {
diff --git a/src/main/java/io/reactivex/flowables/BlockingFlowable.java b/src/main/java/io/reactivex/flowables/BlockingFlowable.java
index 0870f390e2..5fad82fff5 100644
--- a/src/main/java/io/reactivex/flowables/BlockingFlowable.java
+++ b/src/main/java/io/reactivex/flowables/BlockingFlowable.java
@@ -62,7 +62,7 @@ public void forEach(Consumer super T> action) {
}
}
- static final BlockingIterator iterate(Publisher extends T> p) {
+ static BlockingIterator iterate(Publisher extends T> p) {
final BlockingQueue queue = new LinkedBlockingQueue();
LambdaSubscriber ls = new LambdaSubscriber(
@@ -431,7 +431,7 @@ private void awaitForComplete(CountDownLatch latch, Disposable subscription) {
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
Thread.currentThread().interrupt();
// using Runtime so it is not checked
- throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
+ throw new IllegalStateException("Interrupted while waiting for subscription to complete.", e);
}
}
@@ -465,11 +465,7 @@ public void accept(Subscription s) {
awaitForComplete(cdl, ls);
Throwable e = error[0];
if (e != null) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException)e;
- } else {
- throw new RuntimeException(e);
- }
+ Exceptions.propagate(e);
}
}
diff --git a/src/main/java/io/reactivex/functions/BooleanSupplier.java b/src/main/java/io/reactivex/functions/BooleanSupplier.java
index dfad42f3bb..14748c5a98 100644
--- a/src/main/java/io/reactivex/functions/BooleanSupplier.java
+++ b/src/main/java/io/reactivex/functions/BooleanSupplier.java
@@ -14,5 +14,5 @@
package io.reactivex.functions;
public interface BooleanSupplier {
- boolean getAsBoolean();
+ boolean getAsBoolean(); // NOPMD
}
diff --git a/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java b/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java
index 99557278db..8464098bc3 100644
--- a/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java
+++ b/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java
@@ -42,7 +42,7 @@ public boolean setResource(int index, Disposable resource) {
for (;;) {
Disposable o = get(index);
if (o == DisposableHelper.DISPOSED) {
- resource.dispose();;
+ resource.dispose();
return false;
}
if (compareAndSet(index, o, resource)) {
diff --git a/src/main/java/io/reactivex/internal/functions/Objects.java b/src/main/java/io/reactivex/internal/functions/Objects.java
index 5f44695420..fefb497c6e 100644
--- a/src/main/java/io/reactivex/internal/functions/Objects.java
+++ b/src/main/java/io/reactivex/internal/functions/Objects.java
@@ -42,7 +42,7 @@ public static final T requireNonNull(T object, String message) {
* @param o2 the second object
* @return the comparison result
*/
- public static boolean equals(Object o1, Object o2) {
+ public static boolean equals(Object o1, Object o2) { // NOPMD
return o1 == o2 || (o1 != null && o1.equals(o2));
}
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java
index 3e6de49d73..0712acc2ff 100644
--- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java
@@ -33,6 +33,22 @@ protected void subscribeActual(final CompletableSubscriber s) {
final CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);
+ Iterator extends CompletableConsumable> it;
+
+ try {
+ it = sources.iterator();
+ } catch (Throwable e) {
+ s.onError(e);
+ return;
+ }
+
+ if (it == null) {
+ s.onError(new NullPointerException("The iterator returned is null"));
+ return;
+ }
+
+ boolean empty = true;
+
final AtomicBoolean once = new AtomicBoolean();
CompletableSubscriber inner = new CompletableSubscriber() {
@@ -60,23 +76,7 @@ public void onSubscribe(Disposable d) {
}
};
-
- Iterator extends CompletableConsumable> it;
-
- try {
- it = sources.iterator();
- } catch (Throwable e) {
- s.onError(e);
- return;
- }
-
- if (it == null) {
- s.onError(new NullPointerException("The iterator returned is null"));
- return;
- }
-
- boolean empty = true;
-
+
for (;;) {
if (once.get() || set.isDisposed()) {
return;
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java
index 46fbbac7f0..b5647be14d 100644
--- a/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java
@@ -36,7 +36,7 @@ protected void subscribeActual(CompletableSubscriber s) {
CompletableSubscriber sw = onLift.apply(s);
source.subscribe(sw);
- } catch (NullPointerException ex) {
+ } catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java
index 39e9ec8236..b2dca0c61b 100644
--- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java
@@ -30,9 +30,6 @@ public CompletableMergeDelayErrorIterable(Iterable extends CompletableConsumab
@Override
public void subscribeActual(final CompletableSubscriber s) {
final CompositeDisposable set = new CompositeDisposable();
- final AtomicInteger wip = new AtomicInteger(1);
-
- final Queue queue = new MpscLinkedQueue();
s.onSubscribe(set);
@@ -49,7 +46,11 @@ public void subscribeActual(final CompletableSubscriber s) {
s.onError(new NullPointerException("The source iterator returned is null"));
return;
}
+
+ final AtomicInteger wip = new AtomicInteger(1);
+ final Queue queue = new MpscLinkedQueue();
+
for (;;) {
if (set.isDisposed()) {
return;
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java
index aebef6c349..57eac4b692 100644
--- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java
@@ -30,8 +30,6 @@ public CompletableMergeIterable(Iterable extends CompletableConsumable> source
@Override
public void subscribeActual(final CompletableSubscriber s) {
final CompositeDisposable set = new CompositeDisposable();
- final AtomicInteger wip = new AtomicInteger(1);
- final AtomicBoolean once = new AtomicBoolean();
s.onSubscribe(set);
@@ -48,7 +46,10 @@ public void subscribeActual(final CompletableSubscriber s) {
s.onError(new NullPointerException("The source iterator returned is null"));
return;
}
-
+
+ final AtomicInteger wip = new AtomicInteger(1);
+ final AtomicBoolean once = new AtomicBoolean();
+
for (;;) {
if (set.isDisposed()) {
return;
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java
index 36611d8b7f..2fa2327b6e 100644
--- a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java
@@ -43,7 +43,7 @@ public CompletableUsing(Supplier resourceSupplier,
@Override
protected void subscribeActual(final CompletableSubscriber s) {
- final R resource;
+ final R resource; // NOPMD
try {
resource = resourceSupplier.get();
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java
index 76c8289100..6d1835aadb 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java
@@ -58,6 +58,9 @@ static final class LatestObserverIterator extends DisposableSubscriber>> value = new AtomicReference>>();
+ // iterator's notification
+ Try> iNotif;
+
@Override
public void onNext(Try> args) {
boolean wasntAvailable = value.getAndSet(args) == null;
@@ -76,9 +79,6 @@ public void onComplete() {
// not expected
}
- // iterator's notification
- Try> iNotif;
-
@Override
public boolean hasNext() {
if (iNotif != null && iNotif.hasError()) {
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java
index d5f4f2df28..febbcbb7e4 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java
@@ -57,7 +57,7 @@ public Iterator iterator() {
};
}
- private static final class MostRecentObserver extends DefaultObserver {
+ static final class MostRecentObserver extends DefaultObserver {
volatile Object value;
private MostRecentObserver(T value) {
@@ -89,7 +89,7 @@ public Iterator getIterable() {
/**
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
*/
- private Object buf = null;
+ private Object buf;
@Override
public boolean hasNext() {
@@ -101,10 +101,12 @@ public boolean hasNext() {
public T next() {
try {
// if hasNext wasn't called before calling next.
- if (buf == null)
+ if (buf == null) {
buf = value;
- if (NotificationLite.isComplete(buf))
+ }
+ if (NotificationLite.isComplete(buf)) {
throw new NoSuchElementException();
+ }
if (NotificationLite.isError(buf)) {
throw Exceptions.propagate(NotificationLite.getError(buf));
}
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java
index dea313c66b..9b537263a6 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java
@@ -52,15 +52,15 @@ public Iterator iterator() {
}
// test needs to access the observer.waiting flag non-blockingly.
- /* private */static final class NextIterator implements Iterator {
+ static final class NextIterator implements Iterator {
private final NextObserver observer;
private final Publisher extends T> items;
private T next;
private boolean hasNext = true;
private boolean isNextConsumed = true;
- private Throwable error = null;
- private boolean started = false;
+ private Throwable error;
+ private boolean started;
private NextIterator(Publisher extends T> items, NextObserver observer) {
this.items = items;
@@ -75,11 +75,11 @@ public boolean hasNext() {
}
// Since an iterator should not be used in different thread,
// so we do not need any synchronization.
- if (hasNext == false) {
+ if (!hasNext) {
// the iterator has reached the end.
return false;
}
- if (isNextConsumed == false) {
+ if (!isNextConsumed) {
// next has not been used yet.
return true;
}
@@ -117,7 +117,7 @@ private boolean moveToNext() {
observer.dispose();
Thread.currentThread().interrupt();
error = e;
- throw Exceptions.propagate(error);
+ throw Exceptions.propagate(e);
}
}
@@ -142,7 +142,7 @@ public void remove() {
}
}
- private static class NextObserver extends DisposableSubscriber>> {
+ static final class NextObserver extends DisposableSubscriber>> {
private final BlockingQueue>> buf = new ArrayBlockingQueue>>(1);
final AtomicInteger waiting = new AtomicInteger();
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java
index 32b5495cd1..bae79ac8df 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java
@@ -181,8 +181,6 @@ void disposeOther() {
void next() {
- Disposable o = other.get();
-
U next;
try {
@@ -219,6 +217,8 @@ void next() {
BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this);
+ Disposable o = other.get();
+
if (!other.compareAndSet(o, bs)) {
return;
}
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java
index 29eb4b4e2b..2e95152186 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java
@@ -288,7 +288,7 @@ public void onSubscribe(Subscription s) {
}
this.s = s;
- final U b;
+ final U b; // NOPMD
try {
b = bufferSupplier.get();
@@ -383,7 +383,7 @@ public void run() {
if (cancelled) {
return;
}
- final U b;
+ final U b; // NOPMD
try {
b = bufferSupplier.get();
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java
index 64490f999e..b0fa730362 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java
@@ -30,9 +30,9 @@
*/
public final class FlowableCache extends Flowable {
/** The cache and replay state. */
- private CacheState state;
+ final CacheState