From 6e508e5f619e6931d333a1386568834c31362f42 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Sun, 26 Jun 2016 11:11:02 +0200 Subject: [PATCH] 2.x: cleanup of PMD suggestions (#4129) --- pmd.xml | 8 -- src/main/java/io/reactivex/Completable.java | 16 +-- src/main/java/io/reactivex/Flowable.java | 22 ++- src/main/java/io/reactivex/Notification.java | 4 +- src/main/java/io/reactivex/Observable.java | 72 +++++----- src/main/java/io/reactivex/Optional.java | 6 +- src/main/java/io/reactivex/Single.java | 6 +- .../exceptions/CompositeException.java | 45 +++--- .../reactivex/flowables/BlockingFlowable.java | 10 +- .../reactivex/functions/BooleanSupplier.java | 2 +- .../disposables/ArrayCompositeDisposable.java | 2 +- .../reactivex/internal/functions/Objects.java | 2 +- .../completable/CompletableAmbIterable.java | 34 ++--- .../completable/CompletableLift.java | 2 +- .../CompletableMergeDelayErrorIterable.java | 7 +- .../completable/CompletableMergeIterable.java | 7 +- .../completable/CompletableUsing.java | 2 +- .../flowable/BlockingFlowableLatest.java | 6 +- .../flowable/BlockingFlowableMostRecent.java | 10 +- .../flowable/BlockingFlowableNext.java | 14 +- .../FlowableBufferBoundarySupplier.java | 4 +- .../flowable/FlowableBufferTimed.java | 4 +- .../operators/flowable/FlowableCache.java | 4 +- .../flowable/FlowableCombineLatest.java | 10 +- .../operators/flowable/FlowableFromArray.java | 8 +- .../operators/flowable/FlowableGenerate.java | 2 +- .../operators/flowable/FlowableGroupBy.java | 28 ++-- .../operators/flowable/FlowableLift.java | 2 +- .../operators/flowable/FlowableObserveOn.java | 6 +- .../FlowableOnBackpressureBuffer.java | 10 +- .../operators/flowable/FlowablePublish.java | 8 +- .../operators/flowable/FlowableRange.java | 2 +- .../operators/flowable/FlowableRefCount.java | 19 ++- .../operators/flowable/FlowableReplay.java | 21 ++- .../flowable/FlowableSequenceEqual.java | 11 +- .../flowable/FlowableSkipLastTimed.java | 10 +- .../operators/flowable/FlowableSwitchMap.java | 19 ++- .../operators/flowable/FlowableTakeLast.java | 6 +- .../flowable/FlowableTakeLastTimed.java | 2 +- .../operators/flowable/FlowableTimeout.java | 4 +- .../flowable/FlowableWindowTimed.java | 4 +- .../operators/flowable/FlowableZip.java | 6 +- .../observable/BlockingObservableLatest.java | 6 +- .../BlockingObservableMostRecent.java | 8 +- .../observable/BlockingObservableNext.java | 12 +- .../observable/NbpOperatorDistinct.java | 50 +++---- .../observable/NbpOperatorGroupBy.java | 4 +- .../observable/NbpOperatorPublish.java | 8 +- .../observable/NbpOperatorReplay.java | 15 +- .../observable/NbpOperatorSingle.java | 3 +- .../observable/NbpOperatorSwitchMap.java | 10 +- .../observable/NbpOperatorTimeout.java | 4 +- .../observable/NbpOperatorToList.java | 4 +- .../observable/NbpOperatorWindowTimed.java | 4 +- .../ObservableBufferBoundarySupplier.java | 6 +- .../observable/ObservableBufferTimed.java | 4 +- .../operators/observable/ObservableCache.java | 2 +- .../observable/ObservableFromArray.java | 2 +- .../observable/ObservableGenerate.java | 6 +- .../operators/observable/ObservableLift.java | 2 +- .../observable/ObservableRefCount.java | 133 +++++++++--------- .../observable/ObservableSequenceEqual.java | 13 +- .../operators/single/SingleAmbIterable.java | 10 +- .../operators/single/SingleAwait.java | 2 +- .../internal/operators/single/SingleLift.java | 2 +- .../operators/single/SingleUsing.java | 2 +- .../operators/single/SingleWrapper.java | 2 +- .../internal/queue/BaseArrayQueue.java | 3 +- .../internal/queue/BaseLinkedQueue.java | 2 +- .../internal/queue/MpscLinkedQueue.java | 10 +- .../internal/queue/SpscArrayQueue.java | 2 +- .../internal/queue/SpscExactArrayQueue.java | 2 +- .../internal/queue/SpscLinkedArrayQueue.java | 36 ++--- .../schedulers/ComputationScheduler.java | 22 +-- .../internal/schedulers/IoScheduler.java | 20 ++- .../internal/schedulers/SingleScheduler.java | 3 +- .../schedulers/TrampolineScheduler.java | 2 +- .../subscriptions/SubscriptionHelper.java | 10 +- .../reactivex/internal/util/Exceptions.java | 2 +- .../internal/util/LinkedArrayList.java | 4 +- .../internal/util/NotificationLite.java | 6 +- .../reactivex/internal/util/OpenHashSet.java | 8 +- .../internal/util/QueueDrainHelper.java | 3 +- .../observables/BlockingObservable.java | 10 +- .../io/reactivex/observers/TestObserver.java | 2 +- .../io/reactivex/plugins/RxJavaPlugins.java | 6 +- .../reactivex/processors/AsyncProcessor.java | 17 ++- .../processors/BehaviorProcessor.java | 3 +- .../reactivex/processors/FlowProcessor.java | 6 +- .../processors/PublishProcessor.java | 18 +-- .../reactivex/processors/ReplayProcessor.java | 16 +-- .../processors/UnicastProcessor.java | 17 ++- .../io/reactivex/schedulers/Schedulers.java | 19 ++- .../reactivex/schedulers/TestScheduler.java | 9 +- .../java/io/reactivex/schedulers/Timed.java | 8 ++ .../io/reactivex/subjects/AsyncSubject.java | 4 +- .../reactivex/subjects/BehaviorSubject.java | 2 +- .../io/reactivex/subjects/PublishSubject.java | 2 +- .../io/reactivex/subjects/ReplaySubject.java | 5 +- .../java/io/reactivex/subjects/Subject.java | 5 +- .../io/reactivex/subjects/UnicastSubject.java | 4 +- .../reactivex/subscribers/SafeSubscriber.java | 2 +- .../reactivex/subscribers/TestSubscriber.java | 2 +- .../reactivex/flowable/BackpressureTests.java | 2 +- .../flowable/FlowableBufferTest.java | 2 +- 105 files changed, 518 insertions(+), 559 deletions(-) diff --git a/pmd.xml b/pmd.xml index e6fbccdd95..728e1e4d1d 100644 --- a/pmd.xml +++ b/pmd.xml @@ -71,7 +71,6 @@ - @@ -110,7 +109,6 @@ - @@ -134,9 +132,6 @@ - - - @@ -160,7 +155,6 @@ - @@ -183,8 +177,6 @@ - - diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 89c268f06e..9e91e992f7 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -31,6 +31,12 @@ * The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)? */ public abstract class Completable implements CompletableConsumable { + /** Single instance of a complete Completable. */ + static final Completable COMPLETE = new CompletableEmpty(); + + /** Single instance of a never Completable. */ + static final Completable NEVER = new CompletableNever(); + /** * Convenience interface and callback used by the lift operator that given a child CompletableSubscriber, * return a parent CompletableSubscriber that does any kind of lifecycle-related transformations. @@ -47,12 +53,6 @@ public interface CompletableTransformer extends Function implements Publisher { + private static final Object OBJECT = new Object(); + /** The default buffer size. */ + static final int BUFFER_SIZE; + static { + BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128)); + } + /** * Interface to map/wrap a downstream subscriber to an upstream subscriber. * @@ -53,12 +60,6 @@ public interface Transformer extends Function, Publisher NEVER = create(new Publisher() { @Override @@ -459,8 +460,7 @@ public static Flowable fromFuture(Future future) { public static Flowable fromFuture(Future future, long timeout, TimeUnit unit) { Objects.requireNonNull(future, "future is null"); Objects.requireNonNull(unit, "unit is null"); - Flowable o = new FlowableFromFuture(future, timeout, unit); - return o; + return new FlowableFromFuture(future, timeout, unit); } @BackpressureSupport(BackpressureKind.FULL) @@ -1613,8 +1613,6 @@ public Publisher apply(Long v) { }); } - private static final Object OBJECT = new Object(); - @SuppressWarnings({ "rawtypes", "unchecked" }) @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @@ -1622,7 +1620,7 @@ public final Flowable delaySubscription(final Supplier() { @Override - public Object call() throws Exception { + public Object call() { return delaySupplier.get(); } }) @@ -2936,7 +2934,7 @@ public final void subscribe(Subscriber s) { } subscribeActual(s); - } catch (NullPointerException e) { + } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { // TODO throw if fatal? diff --git a/src/main/java/io/reactivex/Notification.java b/src/main/java/io/reactivex/Notification.java index 2a2a1dd871..d21c53418e 100644 --- a/src/main/java/io/reactivex/Notification.java +++ b/src/main/java/io/reactivex/Notification.java @@ -19,12 +19,12 @@ * Utility class to help construct notification objects. */ public final class Notification { + static final Try> COMPLETE = Try.ofValue(Optional.empty()); + private Notification() { throw new IllegalStateException(); } - static final Try> COMPLETE = Try.ofValue(Optional.empty()); - @SuppressWarnings({ "rawtypes", "unchecked" }) public static Try> complete() { return (Try)COMPLETE; diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 4ae2a24f52..85f6b29e60 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -38,7 +38,8 @@ * @param */ public abstract class Observable implements ObservableConsumable { - + static final Object OBJECT = new Object(); + public interface NbpOperator extends Function, Observer> { } @@ -64,11 +65,9 @@ public void subscribe(Observer s) { } }); - static final Object OBJECT = new Object(); - public static Observable amb(Iterable> sources) { Objects.requireNonNull(sources, "sources is null"); - return (new ObservableAmb(null, sources)); + return new ObservableAmb(null, sources); } @SuppressWarnings("unchecked") @@ -82,7 +81,7 @@ public static Observable amb(ObservableConsumable... sources if (len == 1) { return (Observable)sources[0]; // FIXME wrap() } - return (new ObservableAmb(sources, null)); + return new ObservableAmb(sources, null); } /** @@ -116,7 +115,7 @@ public static Observable combineLatest(Iterable(null, sources, combiner, s, delayError)); + return new ObservableCombineLatest(null, sources, combiner, s, delayError); } @SchedulerSupport(SchedulerSupport.NONE) @@ -138,7 +137,7 @@ public static Observable combineLatest(ObservableConsumable(sources, null, combiner, s, delayError)); + return new ObservableCombineLatest(sources, null, combiner, s, delayError); } @SuppressWarnings("unchecked") @@ -368,7 +367,7 @@ public static Observable wrap(ObservableConsumable onSubscribe) { @SchedulerSupport(SchedulerSupport.NONE) public static Observable defer(Supplier> supplier) { Objects.requireNonNull(supplier, "supplier is null"); - return (new ObservableDefer(supplier)); + return new ObservableDefer(supplier); } @SchedulerSupport(SchedulerSupport.NONE) @@ -380,7 +379,7 @@ public static Observable empty() { @SchedulerSupport(SchedulerSupport.NONE) public static Observable error(Supplier errorSupplier) { Objects.requireNonNull(errorSupplier, "errorSupplier is null"); - return (new ObservableError(errorSupplier)); + return new ObservableError(errorSupplier); } @SchedulerSupport(SchedulerSupport.NONE) @@ -403,13 +402,13 @@ public static Observable fromArray(T... values) { if (values.length == 1) { return just(values[0]); } - return (new ObservableFromArray(values)); + return new ObservableFromArray(values); } @SchedulerSupport(SchedulerSupport.NONE) public static Observable fromCallable(Callable supplier) { Objects.requireNonNull(supplier, "supplier is null"); - return (new ObservableFromCallable(supplier)); + return new ObservableFromCallable(supplier); } /* @@ -420,15 +419,14 @@ public static Observable fromCallable(Callable supplier) { @SchedulerSupport(SchedulerSupport.NONE) public static Observable fromFuture(Future future) { Objects.requireNonNull(future, "future is null"); - return (new ObservableFromFuture(future, 0L, null)); + return new ObservableFromFuture(future, 0L, null); } @SchedulerSupport(SchedulerSupport.NONE) public static Observable fromFuture(Future future, long timeout, TimeUnit unit) { Objects.requireNonNull(future, "future is null"); Objects.requireNonNull(unit, "unit is null"); - Observable o = create(new ObservableFromFuture(future, timeout, unit)); - return o; + return new ObservableFromFuture(future, timeout, unit); } @SchedulerSupport(SchedulerSupport.CUSTOM) @@ -447,7 +445,7 @@ public static Observable fromFuture(Future future, Scheduler public static Observable fromIterable(Iterable source) { Objects.requireNonNull(source, "source is null"); - return (new ObservableFromIterable(source)); + return new ObservableFromIterable(source); } public static Observable fromPublisher(final Publisher publisher) { @@ -533,7 +531,7 @@ public static Observable generate(Supplier initialState, BiFunction Objects.requireNonNull(initialState, "initialState is null"); Objects.requireNonNull(generator, "generator is null"); Objects.requireNonNull(disposeState, "diposeState is null"); - return (new ObservableGenerate(initialState, generator, disposeState)); + return new ObservableGenerate(initialState, generator, disposeState); } @SchedulerSupport(SchedulerSupport.COMPUTATION) @@ -552,7 +550,7 @@ public static Observable interval(long initialDelay, long period, TimeUnit Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return (new ObservableInterval(initialDelay, period, unit, scheduler)); + return new ObservableInterval(initialDelay, period, unit, scheduler); } @SchedulerSupport(SchedulerSupport.COMPUTATION) @@ -587,7 +585,7 @@ public static Observable intervalRange(long start, long count, long initia Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return (new ObservableIntervalRange(start, end, initialDelay, period, unit, scheduler)); + return new ObservableIntervalRange(start, end, initialDelay, period, unit, scheduler); } public static Observable just(T value) { @@ -907,7 +905,7 @@ public static Observable sequenceEqual(ObservableConsumable(p1, p2, isEqual, bufferSize)); + return new ObservableSequenceEqual(p1, p2, isEqual, bufferSize); } @SchedulerSupport(SchedulerSupport.NONE) @@ -941,7 +939,7 @@ public static Observable timer(long delay, TimeUnit unit, Scheduler schedu Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return (new ObservableTimer(delay, unit, scheduler)); + return new ObservableTimer(delay, unit, scheduler); } @SchedulerSupport(SchedulerSupport.NONE) @@ -954,7 +952,7 @@ public static Observable using(Supplier resourceSupplier, Objects.requireNonNull(resourceSupplier, "resourceSupplier is null"); Objects.requireNonNull(sourceSupplier, "sourceSupplier is null"); Objects.requireNonNull(disposer, "disposer is null"); - return (new ObservableUsing(resourceSupplier, sourceSupplier, disposer, eager)); + return new ObservableUsing(resourceSupplier, sourceSupplier, disposer, eager); } private static void validateBufferSize(int bufferSize) { @@ -967,7 +965,7 @@ private static void validateBufferSize(int bufferSize) { public static Observable zip(Iterable> sources, Function zipper) { Objects.requireNonNull(zipper, "zipper is null"); Objects.requireNonNull(sources, "sources is null"); - return (new ObservableZip(null, sources, zipper, bufferSize(), false)); + return new ObservableZip(null, sources, zipper, bufferSize(), false); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -1082,7 +1080,7 @@ public static Observable zipArray(Function(sources, null, zipper, bufferSize, delayError)); + return new ObservableZip(sources, null, zipper, bufferSize, delayError); } @SchedulerSupport(SchedulerSupport.NONE) @@ -1092,14 +1090,14 @@ public static Observable zipIterable(Function(null, sources, zipper, bufferSize, delayError)); + return new ObservableZip(null, sources, zipper, bufferSize, delayError); } @SchedulerSupport(SchedulerSupport.NONE) public final Observable all(Predicate predicate) { Objects.requireNonNull(predicate, "predicate is null"); - return (new ObservableAll(this, predicate)); + return new ObservableAll(this, predicate); } @SuppressWarnings("unchecked") @@ -1493,7 +1491,7 @@ public final Observable delay(Supplier Observable delaySubscription(ObservableConsumable other) { Objects.requireNonNull(other, "other is null"); - return (new ObservableDelaySubscriptionOther(this, other)); + return new ObservableDelaySubscriptionOther(this, other); } @SchedulerSupport(SchedulerSupport.COMPUTATION) @@ -1521,7 +1519,7 @@ public final Observable delaySubscription(final Supplier>() { @Override - public ObservableConsumable call() throws Exception { + public ObservableConsumable call() { return delaySupplier.get(); } }) @@ -1778,7 +1776,7 @@ public final Observable flatMap(Function scalar = (ObservableJust) this; - return (scalar.scalarFlatMap(mapper)); + return scalar.scalarFlatMap(mapper); } return lift(new NbpOperatorFlatMap(mapper, delayErrors, maxConcurrency, bufferSize)); } @@ -2024,7 +2022,7 @@ public final Observable last(T defaultValue) { public final Observable lift(NbpOperator onLift) { Objects.requireNonNull(onLift, "onLift is null"); - return (new ObservableLift(this, onLift)); + return new ObservableLift(this, onLift); } public final Observable map(Function mapper) { @@ -2175,13 +2173,13 @@ public final Observable repeat(long times) { if (times == 0) { return empty(); } - return (new ObservableRepeat(this, times)); + return new ObservableRepeat(this, times); } @SchedulerSupport(SchedulerSupport.NONE) public final Observable repeatUntil(BooleanSupplier stop) { Objects.requireNonNull(stop, "stop is null"); - return (new ObservableRepeatUntil(this, stop)); + return new ObservableRepeatUntil(this, stop); } @SchedulerSupport(SchedulerSupport.NONE) @@ -2201,7 +2199,7 @@ public Object apply(Try> v) { } ; - return (new ObservableRedo(this, f)); + return new ObservableRedo(this, f); } @SchedulerSupport(SchedulerSupport.NONE) @@ -2354,7 +2352,7 @@ public final Observable retry() { public final Observable retry(BiPredicate predicate) { Objects.requireNonNull(predicate, "predicate is null"); - return (new ObservableRetryBiPredicate(this, predicate)); + return new ObservableRetryBiPredicate(this, predicate); } @SchedulerSupport(SchedulerSupport.NONE) @@ -2370,7 +2368,7 @@ public final Observable retry(long times, Predicate predic } Objects.requireNonNull(predicate, "predicate is null"); - return (new ObservableRetryPredicate(this, times, predicate)); + return new ObservableRetryPredicate(this, times, predicate); } @SchedulerSupport(SchedulerSupport.NONE) @@ -2415,7 +2413,7 @@ public Throwable apply(Try> t) { } ; - return (new ObservableRedo(this, f)); + return new ObservableRedo(this, f); } // TODO decide if safe subscription or unsafe should be the default @@ -2649,7 +2647,7 @@ public final void subscribe(Observer observer) { @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable subscribeOn(Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); - return (new ObservableSubscribeOn(this, scheduler)); + return new ObservableSubscribeOn(this, scheduler); } @SchedulerSupport(SchedulerSupport.NONE) @@ -3381,7 +3379,7 @@ public final Observable withLatestFrom(ObservableConsumable Observable zipWith(Iterable other, BiFunction zipper) { Objects.requireNonNull(other, "other is null"); Objects.requireNonNull(zipper, "zipper is null"); - return (new ObservableZipIterable(this, other, zipper)); + return new ObservableZipIterable(this, other, zipper); } @SchedulerSupport(SchedulerSupport.NONE) diff --git a/src/main/java/io/reactivex/Optional.java b/src/main/java/io/reactivex/Optional.java index b8d0565bf0..e366639554 100644 --- a/src/main/java/io/reactivex/Optional.java +++ b/src/main/java/io/reactivex/Optional.java @@ -21,13 +21,13 @@ * @param the value type */ public final class Optional { + static final Optional EMPTY = new Optional(null); + final T value; protected Optional(T value) { this.value = value; } - static final Optional EMPTY = new Optional(null); - @SuppressWarnings("unchecked") public static Optional empty() { return (Optional)EMPTY; @@ -48,7 +48,7 @@ public T get() { @Override public int hashCode() { - final int prime = 31; + int prime = 31; int result = 1; result = prime * result + ((value == null) ? 0 : value.hashCode()); return result; diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index b3ec304b07..8aaf298042 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -33,6 +33,8 @@ * @param the value type */ public abstract class Single implements SingleConsumable { + + static final Single NEVER = new SingleNever(); public interface SingleOperator extends Function, SingleSubscriber> { @@ -392,8 +394,6 @@ public static Flowable merge( return merge(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9)); } - static final Single NEVER = new SingleNever(); - @SuppressWarnings("unchecked") public static Single never() { return (Single)NEVER; @@ -409,7 +409,7 @@ public static Single timer(final long delay, final TimeUnit unit, final Sc return new SingleTimer(delay, unit, scheduler); } - public static Single equals(final SingleConsumable first, final SingleConsumable second) { + public static Single equals(final SingleConsumable first, final SingleConsumable second) { // NOPMD Objects.requireNonNull(first, "first is null"); Objects.requireNonNull(second, "second is null"); return new SingleEquals(first, second); diff --git a/src/main/java/io/reactivex/exceptions/CompositeException.java b/src/main/java/io/reactivex/exceptions/CompositeException.java index a10c428133..3214140df8 100644 --- a/src/main/java/io/reactivex/exceptions/CompositeException.java +++ b/src/main/java/io/reactivex/exceptions/CompositeException.java @@ -38,6 +38,7 @@ public final class CompositeException extends RuntimeException { private final List exceptions; private final String message; + private Throwable cause; public CompositeException() { this.exceptions = new ArrayList(); @@ -57,9 +58,9 @@ public CompositeException(Throwable... exceptions) { } - public CompositeException(String messagePrefix, Collection errors) { + public CompositeException(Collection errors) { Set deDupedExceptions = new LinkedHashSet(); - List _exceptions = new ArrayList(); + List localExceptions = new ArrayList(); if (errors != null) { for (Throwable ex : errors) { if (ex instanceof CompositeException) { @@ -75,15 +76,11 @@ public CompositeException(String messagePrefix, Collection deDupedExceptions.add(new NullPointerException()); } - _exceptions.addAll(deDupedExceptions); - this.exceptions = Collections.unmodifiableList(_exceptions); + localExceptions.addAll(deDupedExceptions); + this.exceptions = Collections.unmodifiableList(localExceptions); this.message = exceptions.size() + " exceptions occurred. "; } - public CompositeException(Collection errors) { - this(null, errors); - } - /** * Retrieves the list of exceptions that make up the {@code CompositeException} * @@ -109,16 +106,14 @@ public void suppress(Throwable e) { } - private Throwable cause = null; - @Override - public synchronized Throwable getCause() { + public synchronized Throwable getCause() { // NOPMD if (cause == null) { // we lazily generate this causal chain if this is called - CompositeExceptionCausalChain _cause = new CompositeExceptionCausalChain(); + CompositeExceptionCausalChain localCause = new CompositeExceptionCausalChain(); Set seenCauses = new HashSet(); - Throwable chain = _cause; + Throwable chain = localCause; for (Throwable e : exceptions) { if (seenCauses.contains(e)) { // already seen this outer Throwable so skip @@ -147,7 +142,7 @@ public synchronized Throwable getCause() { } chain = chain.getCause(); } - cause = _cause; + cause = localCause; } return cause; } @@ -185,14 +180,14 @@ public void printStackTrace(PrintWriter s) { * stream to print to */ private void printStackTrace(PrintStreamOrWriter s) { - StringBuilder bldr = new StringBuilder(); - bldr.append(this).append("\n"); + StringBuilder bldr = new StringBuilder(128); + bldr.append(this).append('\n'); for (StackTraceElement myStackElement : getStackTrace()) { - bldr.append("\tat ").append(myStackElement).append("\n"); + bldr.append("\tat ").append(myStackElement).append('\n'); } int i = 1; for (Throwable ex : exceptions) { - bldr.append(" ComposedException ").append(i).append(" :").append("\n"); + bldr.append(" ComposedException ").append(i).append(" :\n"); appendStackTrace(bldr, ex, "\t"); i++; } @@ -202,9 +197,9 @@ private void printStackTrace(PrintStreamOrWriter s) { } private void appendStackTrace(StringBuilder bldr, Throwable ex, String prefix) { - bldr.append(prefix).append(ex).append("\n"); + bldr.append(prefix).append(ex).append('\n'); for (StackTraceElement stackElement : ex.getStackTrace()) { - bldr.append("\t\tat ").append(stackElement).append("\n"); + bldr.append("\t\tat ").append(stackElement).append('\n'); } if (ex.getCause() != null) { bldr.append("\tCaused by: "); @@ -212,7 +207,7 @@ private void appendStackTrace(StringBuilder bldr, Throwable ex, String prefix) { } } - private abstract static class PrintStreamOrWriter { + abstract static class PrintStreamOrWriter { /** Returns the object to be locked when using this StreamOrWriter */ abstract Object lock(); @@ -223,7 +218,7 @@ private abstract static class PrintStreamOrWriter { /** * Same abstraction and implementation as in JDK to allow PrintStream and PrintWriter to share implementation */ - private static class WrappedPrintStream extends PrintStreamOrWriter { + static final class WrappedPrintStream extends PrintStreamOrWriter { private final PrintStream printStream; WrappedPrintStream(PrintStream printStream) { @@ -241,7 +236,7 @@ void println(Object o) { } } - private static class WrappedPrintWriter extends PrintStreamOrWriter { + static final class WrappedPrintWriter extends PrintStreamOrWriter { private final PrintWriter printWriter; WrappedPrintWriter(PrintWriter printWriter) { @@ -259,9 +254,9 @@ void println(Object o) { } } - /* package-private */final static class CompositeExceptionCausalChain extends RuntimeException { + final static class CompositeExceptionCausalChain extends RuntimeException { private static final long serialVersionUID = 3875212506787802066L; - /* package-private */static String MESSAGE = "Chain of Causes for CompositeException In Order Received =>"; + /* package-private */static final String MESSAGE = "Chain of Causes for CompositeException In Order Received =>"; @Override public String getMessage() { diff --git a/src/main/java/io/reactivex/flowables/BlockingFlowable.java b/src/main/java/io/reactivex/flowables/BlockingFlowable.java index 0870f390e2..5fad82fff5 100644 --- a/src/main/java/io/reactivex/flowables/BlockingFlowable.java +++ b/src/main/java/io/reactivex/flowables/BlockingFlowable.java @@ -62,7 +62,7 @@ public void forEach(Consumer action) { } } - static final BlockingIterator iterate(Publisher p) { + static BlockingIterator iterate(Publisher p) { final BlockingQueue queue = new LinkedBlockingQueue(); LambdaSubscriber ls = new LambdaSubscriber( @@ -431,7 +431,7 @@ private void awaitForComplete(CountDownLatch latch, Disposable subscription) { // for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780 Thread.currentThread().interrupt(); // using Runtime so it is not checked - throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); + throw new IllegalStateException("Interrupted while waiting for subscription to complete.", e); } } @@ -465,11 +465,7 @@ public void accept(Subscription s) { awaitForComplete(cdl, ls); Throwable e = error[0]; if (e != null) { - if (e instanceof RuntimeException) { - throw (RuntimeException)e; - } else { - throw new RuntimeException(e); - } + Exceptions.propagate(e); } } diff --git a/src/main/java/io/reactivex/functions/BooleanSupplier.java b/src/main/java/io/reactivex/functions/BooleanSupplier.java index dfad42f3bb..14748c5a98 100644 --- a/src/main/java/io/reactivex/functions/BooleanSupplier.java +++ b/src/main/java/io/reactivex/functions/BooleanSupplier.java @@ -14,5 +14,5 @@ package io.reactivex.functions; public interface BooleanSupplier { - boolean getAsBoolean(); + boolean getAsBoolean(); // NOPMD } diff --git a/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java b/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java index 99557278db..8464098bc3 100644 --- a/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java +++ b/src/main/java/io/reactivex/internal/disposables/ArrayCompositeDisposable.java @@ -42,7 +42,7 @@ public boolean setResource(int index, Disposable resource) { for (;;) { Disposable o = get(index); if (o == DisposableHelper.DISPOSED) { - resource.dispose();; + resource.dispose(); return false; } if (compareAndSet(index, o, resource)) { diff --git a/src/main/java/io/reactivex/internal/functions/Objects.java b/src/main/java/io/reactivex/internal/functions/Objects.java index 5f44695420..fefb497c6e 100644 --- a/src/main/java/io/reactivex/internal/functions/Objects.java +++ b/src/main/java/io/reactivex/internal/functions/Objects.java @@ -42,7 +42,7 @@ public static final T requireNonNull(T object, String message) { * @param o2 the second object * @return the comparison result */ - public static boolean equals(Object o1, Object o2) { + public static boolean equals(Object o1, Object o2) { // NOPMD return o1 == o2 || (o1 != null && o1.equals(o2)); } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java index 3e6de49d73..0712acc2ff 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java @@ -33,6 +33,22 @@ protected void subscribeActual(final CompletableSubscriber s) { final CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); + Iterator it; + + try { + it = sources.iterator(); + } catch (Throwable e) { + s.onError(e); + return; + } + + if (it == null) { + s.onError(new NullPointerException("The iterator returned is null")); + return; + } + + boolean empty = true; + final AtomicBoolean once = new AtomicBoolean(); CompletableSubscriber inner = new CompletableSubscriber() { @@ -60,23 +76,7 @@ public void onSubscribe(Disposable d) { } }; - - Iterator it; - - try { - it = sources.iterator(); - } catch (Throwable e) { - s.onError(e); - return; - } - - if (it == null) { - s.onError(new NullPointerException("The iterator returned is null")); - return; - } - - boolean empty = true; - + for (;;) { if (once.get() || set.isDisposed()) { return; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java index 46fbbac7f0..b5647be14d 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableLift.java @@ -36,7 +36,7 @@ protected void subscribeActual(CompletableSubscriber s) { CompletableSubscriber sw = onLift.apply(s); source.subscribe(sw); - } catch (NullPointerException ex) { + } catch (NullPointerException ex) { // NOPMD throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java index 39e9ec8236..b2dca0c61b 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java @@ -30,9 +30,6 @@ public CompletableMergeDelayErrorIterable(Iterable queue = new MpscLinkedQueue(); s.onSubscribe(set); @@ -49,7 +46,11 @@ public void subscribeActual(final CompletableSubscriber s) { s.onError(new NullPointerException("The source iterator returned is null")); return; } + + final AtomicInteger wip = new AtomicInteger(1); + final Queue queue = new MpscLinkedQueue(); + for (;;) { if (set.isDisposed()) { return; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java index aebef6c349..57eac4b692 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java @@ -30,8 +30,6 @@ public CompletableMergeIterable(Iterable source @Override public void subscribeActual(final CompletableSubscriber s) { final CompositeDisposable set = new CompositeDisposable(); - final AtomicInteger wip = new AtomicInteger(1); - final AtomicBoolean once = new AtomicBoolean(); s.onSubscribe(set); @@ -48,7 +46,10 @@ public void subscribeActual(final CompletableSubscriber s) { s.onError(new NullPointerException("The source iterator returned is null")); return; } - + + final AtomicInteger wip = new AtomicInteger(1); + final AtomicBoolean once = new AtomicBoolean(); + for (;;) { if (set.isDisposed()) { return; diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java index 36611d8b7f..2fa2327b6e 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java @@ -43,7 +43,7 @@ public CompletableUsing(Supplier resourceSupplier, @Override protected void subscribeActual(final CompletableSubscriber s) { - final R resource; + final R resource; // NOPMD try { resource = resourceSupplier.get(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java index 76c8289100..6d1835aadb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java @@ -58,6 +58,9 @@ static final class LatestObserverIterator extends DisposableSubscriber>> value = new AtomicReference>>(); + // iterator's notification + Try> iNotif; + @Override public void onNext(Try> args) { boolean wasntAvailable = value.getAndSet(args) == null; @@ -76,9 +79,6 @@ public void onComplete() { // not expected } - // iterator's notification - Try> iNotif; - @Override public boolean hasNext() { if (iNotif != null && iNotif.hasError()) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java index d5f4f2df28..febbcbb7e4 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java @@ -57,7 +57,7 @@ public Iterator iterator() { }; } - private static final class MostRecentObserver extends DefaultObserver { + static final class MostRecentObserver extends DefaultObserver { volatile Object value; private MostRecentObserver(T value) { @@ -89,7 +89,7 @@ public Iterator getIterable() { /** * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). */ - private Object buf = null; + private Object buf; @Override public boolean hasNext() { @@ -101,10 +101,12 @@ public boolean hasNext() { public T next() { try { // if hasNext wasn't called before calling next. - if (buf == null) + if (buf == null) { buf = value; - if (NotificationLite.isComplete(buf)) + } + if (NotificationLite.isComplete(buf)) { throw new NoSuchElementException(); + } if (NotificationLite.isError(buf)) { throw Exceptions.propagate(NotificationLite.getError(buf)); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java index dea313c66b..9b537263a6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java @@ -52,15 +52,15 @@ public Iterator iterator() { } // test needs to access the observer.waiting flag non-blockingly. - /* private */static final class NextIterator implements Iterator { + static final class NextIterator implements Iterator { private final NextObserver observer; private final Publisher items; private T next; private boolean hasNext = true; private boolean isNextConsumed = true; - private Throwable error = null; - private boolean started = false; + private Throwable error; + private boolean started; private NextIterator(Publisher items, NextObserver observer) { this.items = items; @@ -75,11 +75,11 @@ public boolean hasNext() { } // Since an iterator should not be used in different thread, // so we do not need any synchronization. - if (hasNext == false) { + if (!hasNext) { // the iterator has reached the end. return false; } - if (isNextConsumed == false) { + if (!isNextConsumed) { // next has not been used yet. return true; } @@ -117,7 +117,7 @@ private boolean moveToNext() { observer.dispose(); Thread.currentThread().interrupt(); error = e; - throw Exceptions.propagate(error); + throw Exceptions.propagate(e); } } @@ -142,7 +142,7 @@ public void remove() { } } - private static class NextObserver extends DisposableSubscriber>> { + static final class NextObserver extends DisposableSubscriber>> { private final BlockingQueue>> buf = new ArrayBlockingQueue>>(1); final AtomicInteger waiting = new AtomicInteger(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java index 32b5495cd1..bae79ac8df 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java @@ -181,8 +181,6 @@ void disposeOther() { void next() { - Disposable o = other.get(); - U next; try { @@ -219,6 +217,8 @@ void next() { BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); + Disposable o = other.get(); + if (!other.compareAndSet(o, bs)) { return; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index 29eb4b4e2b..2e95152186 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -288,7 +288,7 @@ public void onSubscribe(Subscription s) { } this.s = s; - final U b; + final U b; // NOPMD try { b = bufferSupplier.get(); @@ -383,7 +383,7 @@ public void run() { if (cancelled) { return; } - final U b; + final U b; // NOPMD try { b = bufferSupplier.get(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java index 64490f999e..b0fa730362 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java @@ -30,9 +30,9 @@ */ public final class FlowableCache extends Flowable { /** The cache and replay state. */ - private CacheState state; + final CacheState state; - private final AtomicBoolean once; + final AtomicBoolean once; /** * Creates a cached Observable with a default capacity hint of 16. * @param the value type diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index 1d96f83dc8..c2812c4410 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -221,10 +221,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0L) { + while (e != r) { boolean d = done; @SuppressWarnings("unchecked") @@ -270,13 +269,12 @@ void drain() { cs.request(1); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + requested.addAndGet(-e); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java index dccd80ddc1..7ee308ad63 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java @@ -28,7 +28,7 @@ public FlowableFromArray(T[] array) { this.array = array; } public T[] array() { - return array; + return array; // NOPMD } @Override public void subscribeActual(Subscriber s) { @@ -85,10 +85,10 @@ public void request(long n) { s.onComplete(); return; } - long e = 0; if (cancelled) { return; } + long e = 0; while (r != 0 && i < len) { T t = a[i]; if (t == null) { @@ -165,15 +165,15 @@ public void request(long n) { s.onComplete(); return; } - long e = 0; if (cancelled) { return; } + long e = 0; while (r != 0 && i < len) { - boolean b = s.tryOnNext(a[i]); if (cancelled) { return; } + boolean b = s.tryOnNext(a[i]); // NOPMD if (++i == len) { s.onComplete(); return; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java index 60e30aad7e..55da85e8ec 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java @@ -95,7 +95,7 @@ public void request(long n) { return; } - boolean unbounded = n == Long.MAX_VALUE; + boolean unbounded = n == Long.MAX_VALUE; // NOPMD while (n != 0L) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java index 73dfb879fc..049a7cb4bb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java @@ -219,10 +219,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0) { + while (e != r) { boolean d = done; GroupedFlowable t = q.poll(); @@ -239,15 +238,14 @@ void drain() { a.onNext(t); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + requested.addAndGet(-e); } - s.request(-e); + s.request(e); } missed = addAndGet(-missed); @@ -295,13 +293,13 @@ boolean checkTerminated(boolean d, boolean empty, static final class GroupedUnicast extends GroupedFlowable { + final State state; + public static GroupedUnicast createWith(K key, int bufferSize, GroupBySubscriber parent, boolean delayError) { State state = new State(bufferSize, parent, key, delayError); return new GroupedUnicast(key, state); } - final State state; - protected GroupedUnicast(K key, State state) { super(key); this.state = state; @@ -418,10 +416,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0; - while (r != 0L) { + while (e != r) { boolean d = done; T v = q.poll(); boolean empty = v == null; @@ -436,15 +433,14 @@ void drain() { a.onNext(v); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + requested.addAndGet(-e); } - parent.s.request(-e); + parent.s.request(e); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java index a649171750..33d52d5b5b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableLift.java @@ -73,7 +73,7 @@ public void subscribeActual(Subscriber s) { } source.subscribe(st); - } catch (NullPointerException e) { + } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { // TODO throw if fatal? diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java index 364e78fa44..e6e953eff5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java @@ -190,9 +190,8 @@ public void run() { long r = requested.get(); long e = 0L; - boolean unbounded = r == Long.MAX_VALUE; - while (r != 0L) { + while (e != r) { boolean d = done; T v = q.poll(); boolean empty = v == null; @@ -207,7 +206,6 @@ public void run() { a.onNext(v); - r--; e++; } @@ -215,7 +213,7 @@ public void run() { return; } if (e != 0L) { - if (!unbounded) { + if (r != Long.MAX_VALUE) { requested.addAndGet(-e); } s.request(e); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java index 819efebe46..a55b2e1886 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java @@ -152,11 +152,10 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0L) { + while (e != r) { boolean d = done; T v = q.poll(); boolean empty = v == null; @@ -171,13 +170,12 @@ void drain() { a.onNext(v); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + requested.addAndGet(-e); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java index e65577ca9a..078ef70d97 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java @@ -42,6 +42,8 @@ public final class FlowablePublish extends ConnectableFlowable { /** The size of the prefetch buffer. */ final int bufferSize; + final Publisher onSubscribe; + /** * Creates a OperatorPublish instance to publish values of the given source observable. * @param the source value type @@ -112,7 +114,7 @@ public void subscribe(Subscriber child) { // setting the producer will trigger the first request to be considered by // the subscriber-to-source. child.onSubscribe(inner); - break; + break; // NOPMD } } }; @@ -140,8 +142,6 @@ public void accept(Disposable r) { }); } - final Publisher onSubscribe; - private FlowablePublish(Publisher onSubscribe, Publisher source, final AtomicReference> current, int bufferSize) { this.onSubscribe = onSubscribe; @@ -178,7 +178,7 @@ public void connect(Consumer connection) { // if connect() was called concurrently, only one of them should actually // connect to the source doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); - break; + break; // NOPMD } /* * Notify the callback that we have a (new) connection which it can unsubscribe diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java index d308a8a668..283957f100 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java @@ -85,7 +85,7 @@ void slowpath(long r) { for (;;) { long fs = end - idx + 1; - final boolean complete = fs <= r; + final boolean complete = fs <= r; // NOPMD fs = Math.min(fs, r) + idx; final Subscriber o = this.actual; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 1ea9eb4159..ea773221f4 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -32,6 +32,14 @@ * the value type */ public final class FlowableRefCount extends Flowable { + final ConnectableFlowable source; + volatile CompositeDisposable baseSubscription = new CompositeDisposable(); + final AtomicInteger subscriptionCount = new AtomicInteger(0); + + /** + * Use this lock for every subscription and disconnect action. + */ + final ReentrantLock lock = new ReentrantLock(); final class ConnectionSubscriber implements Subscriber, Subscription { final Subscriber subscriber; @@ -40,7 +48,7 @@ final class ConnectionSubscriber implements Subscriber, Subscription { Subscription s; - private ConnectionSubscriber(Subscriber subscriber, + ConnectionSubscriber(Subscriber subscriber, CompositeDisposable currentBase, Disposable resource) { this.subscriber = subscriber; this.currentBase = currentBase; @@ -99,15 +107,6 @@ void cleanup() { } } - final ConnectableFlowable source; - volatile CompositeDisposable baseSubscription = new CompositeDisposable(); - final AtomicInteger subscriptionCount = new AtomicInteger(0); - - /** - * Use this lock for every subscription and disconnect action. - */ - final ReentrantLock lock = new ReentrantLock(); - /** * Constructor. * diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 9a2bd3e00b..306d2fa017 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -37,6 +37,8 @@ public final class FlowableReplay extends ConnectableFlowable { /** A factory that creates the appropriate buffer for the ReplaySubscriber. */ final Supplier> bufferFactory; + final Publisher onSubscribe; + @SuppressWarnings("rawtypes") static final Supplier DEFAULT_UNBOUNDED_FACTORY = new Supplier() { @Override @@ -61,7 +63,6 @@ public static Flowable multicastSelector( @Override public void subscribe(Subscriber child) { ConnectableFlowable co; - Publisher observable; try { co = connectableFactory.get(); } catch (Throwable e) { @@ -73,6 +74,7 @@ public void subscribe(Subscriber child) { return; } + Publisher observable; try { observable = selector.apply(co); } catch (Throwable e) { @@ -207,7 +209,7 @@ public void subscribe(Subscriber child) { // if there isn't one if (r == null) { // create a new subscriber to source - ReplaySubscriber u = new ReplaySubscriber(curr, bufferFactory.get()); + ReplaySubscriber u = new ReplaySubscriber(bufferFactory.get()); // let's try setting it as the current subscriber-to-source if (!curr.compareAndSet(r, u)) { // didn't work, maybe someone else did it or the current subscriber @@ -231,15 +233,13 @@ public void subscribe(Subscriber child) { // setting the producer will trigger the first request to be considered by // the subscriber-to-source. child.onSubscribe(inner); - break; + break; // NOPMD } } }; return new FlowableReplay(onSubscribe, source, curr, bufferFactory); } - final Publisher onSubscribe; - private FlowableReplay(Publisher onSubscribe, Flowable source, final AtomicReference> current, final Supplier> bufferFactory) { @@ -265,7 +265,7 @@ public void connect(Consumer connection) { // if there is none yet or the current has unsubscribed if (ps == null || ps.isDisposed()) { // create a new subscriber-to-source - ReplaySubscriber u = new ReplaySubscriber(current, bufferFactory.get()); + ReplaySubscriber u = new ReplaySubscriber(bufferFactory.get()); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived @@ -277,7 +277,7 @@ public void connect(Consumer connection) { // if connect() was called concurrently, only one of them should actually // connect to the source doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); - break; + break; // NOPMD } /* * Notify the callback that we have a (new) connection which it can unsubscribe @@ -332,8 +332,7 @@ static final class ReplaySubscriber implements Subscriber, Disposable { /** The upstream producer. */ volatile Subscription subscription; - public ReplaySubscriber(AtomicReference> current, - ReplayBuffer buffer) { + public ReplaySubscriber(ReplayBuffer buffer) { this.buffer = buffer; this.producers = new AtomicReference(EMPTY); @@ -803,7 +802,7 @@ public void replay(InnerSubscription output) { int destIndex = destIndexObject != null ? destIndexObject.intValue() : 0; long r = output.get(); - long r0 = r; + long r0 = r; // NOPMD long e = 0L; while (r != 0L && destIndex < sourceIndex) { @@ -959,7 +958,7 @@ public final void replay(InnerSubscription output) { } long r = output.get(); - boolean unbounded = r == Long.MAX_VALUE; + boolean unbounded = r == Long.MAX_VALUE; // NOPMD long e = 0L; Node node = output.index(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java index 2c009502b4..05038709eb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java @@ -139,8 +139,6 @@ void drain() { } boolean d1 = s1.done; - T v1 = q1.peek(); - boolean e1 = v1 == null; if (d1) { Throwable e = s1.error; @@ -153,8 +151,6 @@ void drain() { } boolean d2 = s2.done; - T v2 = q2.peek(); - boolean e2 = v2 == null; if (d2) { Throwable e = s2.error; @@ -165,7 +161,12 @@ void drain() { return; } } - + + T v1 = q1.peek(); + boolean e1 = v1 == null; + T v2 = q2.peek(); + boolean e2 = v2 == null; + if (d1 && d2 && e1 && e2) { actual.onNext(true); actual.onComplete(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java index af00f884ea..8b698410f5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java @@ -147,10 +147,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0) { + while (e != r) { boolean d = done; Long ts = (Long)q.peek(); @@ -189,13 +188,12 @@ void drain() { a.onNext(v); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + requested.addAndGet(-e); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java index b2a4aaa78d..7fdcb1b9dc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java @@ -224,18 +224,18 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; boolean retry = false; - while (r != 0L) { + while (e != r) { + if (cancelled) { + return; + } + boolean d = inner.done; R v = q.poll(); boolean empty = v == null; - if (cancelled) { - return; - } if (inner != active.get()) { retry = true; break; @@ -261,16 +261,15 @@ void drain() { a.onNext(v); - r--; - e--; + e++; } if (e != 0L) { if (!cancelled) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + requested.addAndGet(-e); } - inner.get().request(-e); + inner.get().request(e); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLast.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLast.java index 92a64e2c02..fd2e9b4cb4 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLast.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLast.java @@ -110,10 +110,9 @@ void drain() { return; } if (done) { - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0L) { + while (e != r) { if (cancelled) { return; } @@ -123,10 +122,9 @@ void drain() { return; } a.onNext(v); - r--; e++; } - if (!unbounded && e != 0L) { + if (e != 0L && r != Long.MAX_VALUE) { r = requested.addAndGet(-e); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java index cca09e00f8..26a4f7b8a6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java @@ -175,7 +175,7 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; + boolean unbounded = r == Long.MAX_VALUE; // NOPMD long e = 0L; for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java index 3043e97c93..68cddeb153 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java @@ -95,7 +95,7 @@ public void onSubscribe(Subscription s) { if (firstTimeoutSelector != null) { try { p = firstTimeoutSelector.get(); - } catch (Exception ex) { + } catch (Throwable ex) { cancel(); EmptySubscription.error(ex, a); return; @@ -272,7 +272,7 @@ public void onSubscribe(Subscription s) { try { p = firstTimeoutSelector.get(); - } catch (Exception ex) { + } catch (Throwable ex) { dispose(); EmptySubscription.error(ex, a); return; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index 4a5393c031..15d9607131 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -230,7 +230,7 @@ void drainLoop() { for (;;) { for (;;) { - boolean term = terminated; + boolean term = terminated; // NOPMD boolean d = done; @@ -802,7 +802,7 @@ void drainLoop() { boolean empty = v == null; boolean sw = v instanceof SubjectWork; - if (d && (v == null || v instanceof SubjectWork)) { + if (d && (empty || sw)) { q.clear(); dispose(); Throwable e = error; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java index d9393a5686..4c82c68d4e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java @@ -155,10 +155,9 @@ public void drain() { for (;;) { long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0; - while (r != 0) { + while (e != r) { int i = 0; int emptyCount = 0; for (ZipSubscriber z : zs) { @@ -204,12 +203,11 @@ public void drain() { a.onNext(v); - r--; e++; } if (e != 0) { - if (!unbounded) { + if (r != Long.MAX_VALUE) { requested.addAndGet(-e); } for (ZipSubscriber z : zs) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java index 52930632dd..b67d61aa26 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java @@ -57,6 +57,9 @@ public Iterator iterator() { /** Observer of source, iterator for output. */ static final class LatestObserverIterator extends DisposableObserver>> implements Iterator { + // iterator's notification + Try> iNotif; + final Semaphore notify = new Semaphore(0); // observer's notification final AtomicReference>> value = new AtomicReference>>(); @@ -79,9 +82,6 @@ public void onComplete() { // not expected } - // iterator's notification - Try> iNotif; - @Override public boolean hasNext() { if (iNotif != null && iNotif.hasError()) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java index 93796f8eca..462f982a4f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java @@ -89,7 +89,7 @@ public Iterator getIterable() { /** * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). */ - private Object buf = null; + private Object buf; @Override public boolean hasNext() { @@ -101,10 +101,12 @@ public boolean hasNext() { public T next() { try { // if hasNext wasn't called before calling next. - if (buf == null) + if (buf == null) { buf = value; - if (NotificationLite.isComplete(buf)) + } + if (NotificationLite.isComplete(buf)) { throw new NoSuchElementException(); + } if (NotificationLite.isError(buf)) { throw Exceptions.propagate(NotificationLite.getError(buf)); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java index 9861b537b2..79fdb90a1b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java @@ -51,15 +51,15 @@ public Iterator iterator() { } // test needs to access the observer.waiting flag non-blockingly. - /* private */static final class NextIterator implements Iterator { + static final class NextIterator implements Iterator { private final NextObserver observer; private final Observable items; private T next; private boolean hasNext = true; private boolean isNextConsumed = true; - private Throwable error = null; - private boolean started = false; + private Throwable error; + private boolean started; private NextIterator(Observable items, NextObserver observer) { this.items = items; @@ -74,11 +74,11 @@ public boolean hasNext() { } // Since an iterator should not be used in different thread, // so we do not need any synchronization. - if (hasNext == false) { + if (!hasNext) { // the iterator has reached the end. return false; } - if (isNextConsumed == false) { + if (!isNextConsumed) { // next has not been used yet. return true; } @@ -117,7 +117,7 @@ private boolean moveToNext() { observer.dispose(); Thread.currentThread().interrupt(); error = e; - throw Exceptions.propagate(error); + throw Exceptions.propagate(e); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorDistinct.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorDistinct.java index 7cfb48cc29..6cb31e7f17 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorDistinct.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorDistinct.java @@ -27,7 +27,31 @@ public final class NbpOperatorDistinct implements NbpOperator { final Function keySelector; final Supplier> predicateSupplier; - + static final NbpOperatorDistinct UNTIL_CHANGED; + + static { + Supplier> p = new Supplier>() { + @Override + public Predicate get() { + final Object[] last = { null }; + + return new Predicate() { + @Override + public boolean test(Object t) { + if (t == null) { + last[0] = null; + return true; + } + Object o = last[0]; + last[0] = t; + return !Objects.equals(o, t); + } + }; + } + }; + UNTIL_CHANGED = new NbpOperatorDistinct(Functions.identity(), p); + } + public NbpOperatorDistinct(Function keySelector, Supplier> predicateSupplier) { this.predicateSupplier = predicateSupplier; this.keySelector = keySelector; @@ -55,30 +79,6 @@ public boolean test(K t) { return new NbpOperatorDistinct(keySelector, p); } - static final NbpOperatorDistinct UNTIL_CHANGED; - static { - Supplier> p = new Supplier>() { - @Override - public Predicate get() { - final Object[] last = { null }; - - return new Predicate() { - @Override - public boolean test(Object t) { - if (t == null) { - last[0] = null; - return true; - } - Object o = last[0]; - last[0] = t; - return !Objects.equals(o, t); - } - }; - } - }; - UNTIL_CHANGED = new NbpOperatorDistinct(Functions.identity(), p); - } - @SuppressWarnings("unchecked") public static NbpOperatorDistinct untilChanged() { return (NbpOperatorDistinct)UNTIL_CHANGED; diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorGroupBy.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorGroupBy.java index 0903287f16..e92e9b1126 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorGroupBy.java @@ -176,13 +176,13 @@ public void cancel(K key) { static final class GroupedUnicast extends GroupedObservable { + final State state; + public static GroupedUnicast createWith(K key, int bufferSize, GroupBySubscriber parent, boolean delayError) { State state = new State(bufferSize, parent, key, delayError); return new GroupedUnicast(key, state); } - final State state; - protected GroupedUnicast(K key, State state) { super(key); this.state = state; diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorPublish.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorPublish.java index 728e716baa..33c1a5f23a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorPublish.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorPublish.java @@ -38,6 +38,8 @@ public final class NbpOperatorPublish extends ConnectableObservable { /** The size of the prefetch buffer. */ final int bufferSize; + final ObservableConsumable onSubscribe; + /** * Creates a OperatorPublish instance to publish values of the given source observable. * @param the source value type @@ -108,7 +110,7 @@ public void subscribe(Observer child) { // setting the producer will trigger the first request to be considered by // the subscriber-to-source. child.onSubscribe(inner); - break; + break; // NOPMD } } }; @@ -136,8 +138,6 @@ public void accept(Disposable r) { }); } - final ObservableConsumable onSubscribe; - private NbpOperatorPublish(ObservableConsumable onSubscribe, ObservableConsumable source, final AtomicReference> current, int bufferSize) { this.onSubscribe = onSubscribe; @@ -174,7 +174,7 @@ public void connect(Consumer connection) { // if connect() was called concurrently, only one of them should actually // connect to the source doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); - break; + break; // NOPMD } /* * Notify the callback that we have a (new) connection which it can unsubscribe diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorReplay.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorReplay.java index ad17f63ab7..75e1b3ddb5 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorReplay.java @@ -35,6 +35,8 @@ public final class NbpOperatorReplay extends ConnectableObservable { /** A factory that creates the appropriate buffer for the ReplaySubscriber. */ final Supplier> bufferFactory; + final ObservableConsumable onSubscribe; + @SuppressWarnings("rawtypes") static final Supplier DEFAULT_UNBOUNDED_FACTORY = new Supplier() { @Override @@ -191,7 +193,7 @@ public void subscribe(Observer child) { // if there isn't one if (r == null) { // create a new subscriber to source - ReplaySubscriber u = new ReplaySubscriber(curr, bufferFactory.get()); + ReplaySubscriber u = new ReplaySubscriber(bufferFactory.get()); // let's try setting it as the current subscriber-to-source if (!curr.compareAndSet(r, u)) { // didn't work, maybe someone else did it or the current subscriber @@ -217,15 +219,13 @@ public void subscribe(Observer child) { // replay the contents of the buffer r.buffer.replay(inner); - break; + break; // NOPMD } } }; return new NbpOperatorReplay(onSubscribe, source, curr, bufferFactory); } - final ObservableConsumable onSubscribe; - private NbpOperatorReplay(ObservableConsumable onSubscribe, Observable source, final AtomicReference> current, final Supplier> bufferFactory) { @@ -251,7 +251,7 @@ public void connect(Consumer connection) { // if there is none yet or the current has unsubscribed if (ps == null || ps.isDisposed()) { // create a new subscriber-to-source - ReplaySubscriber u = new ReplaySubscriber(current, bufferFactory.get()); + ReplaySubscriber u = new ReplaySubscriber(bufferFactory.get()); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived @@ -263,7 +263,7 @@ public void connect(Consumer connection) { // if connect() was called concurrently, only one of them should actually // connect to the source doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); - break; + break; // NOPMD } /* * Notify the callback that we have a (new) connection which it can unsubscribe @@ -314,8 +314,7 @@ static final class ReplaySubscriber implements Observer, Disposable { /** The upstream producer. */ volatile Disposable subscription; - public ReplaySubscriber(AtomicReference> current, - ReplayBuffer buffer) { + public ReplaySubscriber(ReplayBuffer buffer) { this.buffer = buffer; this.producers = new AtomicReference(EMPTY); diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSingle.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSingle.java index 2724c3d903..c4be0727e7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSingle.java @@ -23,13 +23,14 @@ public final class NbpOperatorSingle implements NbpOperator { static final NbpOperatorSingle NO_DEFAULT = new NbpOperatorSingle(null); + + final T defaultValue; @SuppressWarnings("unchecked") public static NbpOperatorSingle instanceNoDefault() { return (NbpOperatorSingle)NO_DEFAULT; } - final T defaultValue; public NbpOperatorSingle(T defaultValue) { this.defaultValue = defaultValue; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSwitchMap.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSwitchMap.java index 4772f5179b..c239892e06 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorSwitchMap.java @@ -215,10 +215,6 @@ void drain() { boolean retry = false; for (;;) { - boolean d = inner.done; - R v = q.poll(); - boolean empty = v == null; - if (cancelled) { return; } @@ -226,7 +222,11 @@ void drain() { retry = true; break; } - + + boolean d = inner.done; + R v = q.poll(); + boolean empty = v == null; + if (d) { Throwable err = inner.error; if (err != null) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorTimeout.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorTimeout.java index 8e661a73a1..25f0ec6d32 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorTimeout.java @@ -80,7 +80,7 @@ public void onSubscribe(Disposable s) { if (firstTimeoutSelector != null) { try { p = firstTimeoutSelector.get(); - } catch (Exception ex) { + } catch (Throwable ex) { dispose(); EmptyDisposable.error(ex, a); return; @@ -256,7 +256,7 @@ public void onSubscribe(Disposable s) { try { p = firstTimeoutSelector.get(); - } catch (Exception ex) { + } catch (Throwable ex) { dispose(); EmptyDisposable.error(ex, a); return; diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorToList.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorToList.java index 7b2705e091..73f513005d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorToList.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorToList.java @@ -24,6 +24,8 @@ public final class NbpOperatorToList> implements NbpOperator { + final Supplier collectionSupplier; + @SuppressWarnings({"rawtypes", "unchecked"}) static final NbpOperatorToList DEFAULT = new NbpOperatorToList(new Supplier() { @Override @@ -37,8 +39,6 @@ public static NbpOperatorToList> defaultInstance() { return DEFAULT; } - final Supplier collectionSupplier; - public NbpOperatorToList(Supplier collectionSupplier) { this.collectionSupplier = collectionSupplier; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorWindowTimed.java index 6854b2394b..5e1168bd05 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/NbpOperatorWindowTimed.java @@ -207,7 +207,7 @@ void drainLoop() { for (;;) { for (;;) { - boolean term = terminated; + boolean term = terminated; // NOPMD boolean d = done; @@ -683,7 +683,7 @@ void drainLoop() { boolean empty = v == null; boolean sw = v instanceof SubjectWork; - if (d && (v == null || v instanceof SubjectWork)) { + if (d && (empty || sw)) { q.clear(); disposeWorker(); Throwable e = error; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java index cf482654b8..47ee6cba14 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java @@ -175,8 +175,6 @@ void disposeOther() { void next() { - Disposable o = other.get(); - U next; try { @@ -192,7 +190,7 @@ void next() { actual.onError(new NullPointerException("The buffer supplied is null")); return; } - + ObservableConsumable boundary; try { @@ -212,6 +210,8 @@ void next() { } BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); + + Disposable o = other.get(); if (!other.compareAndSet(o, bs)) { return; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java index 903e66b9dc..ecdd58434e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java @@ -270,7 +270,7 @@ public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; - final U b; + final U b; // NOPMD try { b = bufferSupplier.get(); @@ -367,7 +367,7 @@ public void run() { if (cancelled) { return; } - final U b; + final U b; // NOPMD try { b = bufferSupplier.get(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java index 14e71b24c6..1e5a44efda 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java @@ -27,7 +27,7 @@ */ public final class ObservableCache extends Observable { /** The cache and replay state. */ - private CacheState state; + final CacheState state; final AtomicBoolean once; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java index 58acabafb5..6689830f24 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java @@ -22,7 +22,7 @@ public ObservableFromArray(T[] array) { this.array = array; } public T[] array() { - return array; + return array; // NOPMD } @Override public void subscribeActual(Observer s) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java index 38e20f0e21..8716ac23fc 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java @@ -72,13 +72,13 @@ public GeneratorDisposable(Observer actual, public void run() { S s = state; - final BiFunction, S> f = generator; - if (cancelled) { dispose(s); return; } - + + final BiFunction, S> f = generator; + for (;;) { if (cancelled) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableLift.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableLift.java index 44e6992905..1d4eb79153 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableLift.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableLift.java @@ -71,7 +71,7 @@ public void subscribeActual(Observer s) { } source.subscribe(st); - } catch (NullPointerException e) { + } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { // TODO throw if fatal? diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index f5c5ee2ec7..64ea2d835d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -31,72 +31,6 @@ */ public final class ObservableRefCount extends Observable { - final class ConnectionSubscriber implements Observer, Disposable { - final Observer subscriber; - final CompositeDisposable currentBase; - final Disposable resource; - - Disposable s; - - private ConnectionSubscriber(Observer subscriber, - CompositeDisposable currentBase, Disposable resource) { - this.subscriber = subscriber; - this.currentBase = currentBase; - this.resource = resource; - } - - @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - subscriber.onSubscribe(this); - } - } - - @Override - public void onError(Throwable e) { - cleanup(); - subscriber.onError(e); - } - - @Override - public void onNext(T t) { - subscriber.onNext(t); - } - - @Override - public void onComplete() { - cleanup(); - subscriber.onComplete(); - } - - @Override - public void dispose() { - s.dispose(); - resource.dispose(); - } - - @Override - public boolean isDisposed() { - return s.isDisposed(); - } - - void cleanup() { - // on error or completion we need to unsubscribe the base subscription - // and set the subscriptionCount to 0 - lock.lock(); - try { - if (baseSubscription == currentBase) { - baseSubscription.dispose(); - baseSubscription = new CompositeDisposable(); - subscriptionCount.set(0); - } - } finally { - lock.unlock(); - } - } - } - final ConnectableObservable source; volatile CompositeDisposable baseSubscription = new CompositeDisposable(); @@ -199,4 +133,71 @@ public void run() { } }); } + + final class ConnectionSubscriber implements Observer, Disposable { + final Observer subscriber; + final CompositeDisposable currentBase; + final Disposable resource; + + Disposable s; + + ConnectionSubscriber(Observer subscriber, + CompositeDisposable currentBase, Disposable resource) { + this.subscriber = subscriber; + this.currentBase = currentBase; + this.resource = resource; + } + + @Override + public void onSubscribe(Disposable s) { + if (DisposableHelper.validate(this.s, s)) { + this.s = s; + subscriber.onSubscribe(this); + } + } + + @Override + public void onError(Throwable e) { + cleanup(); + subscriber.onError(e); + } + + @Override + public void onNext(T t) { + subscriber.onNext(t); + } + + @Override + public void onComplete() { + cleanup(); + subscriber.onComplete(); + } + + @Override + public void dispose() { + s.dispose(); + resource.dispose(); + } + + @Override + public boolean isDisposed() { + return s.isDisposed(); + } + + void cleanup() { + // on error or completion we need to unsubscribe the base subscription + // and set the subscriptionCount to 0 + lock.lock(); + try { + if (baseSubscription == currentBase) { + baseSubscription.dispose(); + baseSubscription = new CompositeDisposable(); + subscriptionCount.set(0); + } + } finally { + lock.unlock(); + } + } + } + } \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java index 927680b6dd..db8f533664 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java @@ -127,8 +127,6 @@ void drain() { } boolean d1 = s1.done; - T v1 = q1.peek(); - boolean e1 = v1 == null; if (d1) { Throwable e = s1.error; @@ -141,9 +139,6 @@ void drain() { } boolean d2 = s2.done; - T v2 = q2.peek(); - boolean e2 = v2 == null; - if (d2) { Throwable e = s2.error; if (e != null) { @@ -153,7 +148,13 @@ void drain() { return; } } - + + T v1 = q1.peek(); + boolean e1 = v1 == null; + + T v2 = q2.peek(); + boolean e2 = v2 == null; + if (d1 && d2 && e1 && e2) { actual.onNext(true); actual.onComplete(); diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java b/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java index 6310365141..08fca452e2 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java @@ -30,11 +30,9 @@ public SingleAmbIterable(Iterable> sourc @Override protected void subscribeActual(final SingleSubscriber s) { - final AtomicBoolean once = new AtomicBoolean(); final CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); - int c = 0; Iterator> iterator; try { @@ -48,6 +46,10 @@ protected void subscribeActual(final SingleSubscriber s) { s.onError(new NullPointerException("The iterator returned is null")); return; } + + final AtomicBoolean once = new AtomicBoolean(); + int c = 0; + for (;;) { if (once.get()) { return; @@ -70,12 +72,12 @@ protected void subscribeActual(final SingleSubscriber s) { break; } - SingleConsumable s1; - if (once.get()) { return; } + SingleConsumable s1; + try { s1 = iterator.next(); } catch (Throwable e) { diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAwait.java b/src/main/java/io/reactivex/internal/operators/single/SingleAwait.java index fed8cb7c28..40a0dd8d78 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleAwait.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAwait.java @@ -49,7 +49,7 @@ public void onSuccess(T value) { try { cdl.await(); } catch (InterruptedException ex) { - throw new RuntimeException(ex); + throw new IllegalStateException(ex); } } Throwable e = errorRef.get(); diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleLift.java b/src/main/java/io/reactivex/internal/operators/single/SingleLift.java index 47eaa0686f..c4b1ceb7de 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleLift.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleLift.java @@ -37,7 +37,7 @@ protected void subscribeActual(SingleSubscriber s) { } // TODO plugin wrapper source.subscribe(sr); - } catch (NullPointerException ex) { + } catch (NullPointerException ex) { // NOPMD throw ex; } catch (Throwable ex) { RxJavaPlugins.onError(ex); diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java b/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java index 1ec032c82e..6a6a2d835b 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java @@ -41,7 +41,7 @@ public SingleUsing(Supplier resourceSupplier, @Override protected void subscribeActual(final SingleSubscriber s) { - final U resource; + final U resource; // NOPMD try { resource = resourceSupplier.get(); diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java b/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java index d723178449..c5fdbef3fd 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleWrapper.java @@ -23,7 +23,7 @@ public SingleWrapper(SingleConsumable onSubscribe) { } @Override - protected void subscribeActual(io.reactivex.SingleSubscriber subscriber) { + protected void subscribeActual(SingleSubscriber subscriber) { onSubscribe.subscribe(subscriber); } } diff --git a/src/main/java/io/reactivex/internal/queue/BaseArrayQueue.java b/src/main/java/io/reactivex/internal/queue/BaseArrayQueue.java index c681037c17..a776b758fa 100644 --- a/src/main/java/io/reactivex/internal/queue/BaseArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/BaseArrayQueue.java @@ -38,8 +38,7 @@ public Iterator iterator() { @Override public void clear() { // we have to test isEmpty because of the weaker poll() guarantee - while (poll() != null || !isEmpty()) - ; + while (poll() != null || !isEmpty()) ; // NOPMD } protected final int calcElementOffset(long index, int mask) { return (int)index & mask; diff --git a/src/main/java/io/reactivex/internal/queue/BaseLinkedQueue.java b/src/main/java/io/reactivex/internal/queue/BaseLinkedQueue.java index 5c14019956..c500f7db57 100644 --- a/src/main/java/io/reactivex/internal/queue/BaseLinkedQueue.java +++ b/src/main/java/io/reactivex/internal/queue/BaseLinkedQueue.java @@ -71,7 +71,7 @@ public final int size() { // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. while (chaserNode != producerNode && size < Integer.MAX_VALUE) { LinkedQueueNode next; - while((next = chaserNode.lvNext()) == null); + while((next = chaserNode.lvNext()) == null); // NOPMD chaserNode = next; size++; } diff --git a/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java index 439c9ea9fd..6e821cfcb5 100644 --- a/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java @@ -46,7 +46,7 @@ public MpscLinkedQueue() { * @see java.util.Queue#offer(java.lang.Object) */ @Override - public final boolean offer(final T nextValue) { + public boolean offer(final T nextValue) { final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed @@ -71,7 +71,7 @@ public final boolean offer(final T nextValue) { * @see java.util.Queue#poll() */ @Override - public final T poll() { + public T poll() { LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright LinkedQueueNode nextNode = currConsumerNode.lvNext(); if (nextNode != null) { @@ -82,7 +82,7 @@ public final T poll() { } else if (currConsumerNode != lvProducerNode()) { // spin, we are no longer wait free - while((nextNode = currConsumerNode.lvNext()) == null); + while((nextNode = currConsumerNode.lvNext()) == null); // NOPMD // got the next node... // we have to null out the value because we are going to hang on to the node @@ -94,7 +94,7 @@ else if (currConsumerNode != lvProducerNode()) { } @Override - public final T peek() { + public T peek() { LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright LinkedQueueNode nextNode = currConsumerNode.lvNext(); if (nextNode != null) { @@ -102,7 +102,7 @@ public final T peek() { } else if (currConsumerNode != lvProducerNode()) { // spin, we are no longer wait free - while ((nextNode = currConsumerNode.lvNext()) == null); + while ((nextNode = currConsumerNode.lvNext()) == null); // NOPMD // got the next node... return nextNode.lpValue(); } diff --git a/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java index 3c00d3f253..629d504c9f 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java @@ -39,7 +39,7 @@ public final class SpscArrayQueue extends BaseArrayQueue { private static final long serialVersionUID = -1296597691183856449L; private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); final AtomicLong producerIndex; - protected long producerLookAhead; + long producerLookAhead; final AtomicLong consumerIndex; final int lookAheadStep; public SpscArrayQueue(int capacity) { diff --git a/src/main/java/io/reactivex/internal/queue/SpscExactArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscExactArrayQueue.java index 60a721c0ee..ffcb2d7f13 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscExactArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscExactArrayQueue.java @@ -79,7 +79,7 @@ public T peek() { } @Override public void clear() { - while (poll() != null || !isEmpty()); + while (poll() != null || !isEmpty()); // NOPMD } @Override public boolean isEmpty() { diff --git a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java index 2e7ea6593f..0d7e55ffa3 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java @@ -30,15 +30,15 @@ */ public final class SpscLinkedArrayQueue implements Queue { static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); - protected final AtomicLong producerIndex = new AtomicLong(); + final AtomicLong producerIndex = new AtomicLong(); - protected int producerLookAheadStep; - protected long producerLookAhead; - protected int producerMask; - protected AtomicReferenceArray producerBuffer; - protected int consumerMask; - protected AtomicReferenceArray consumerBuffer; - protected final AtomicLong consumerIndex = new AtomicLong(); + int producerLookAheadStep; + long producerLookAhead; + int producerMask; + AtomicReferenceArray producerBuffer; + int consumerMask; + AtomicReferenceArray consumerBuffer; + final AtomicLong consumerIndex = new AtomicLong(); private static final Object HAS_NEXT = new Object(); @@ -61,7 +61,7 @@ public SpscLinkedArrayQueue(final int bufferSize) { * This implementation is correct for single producer thread use only. */ @Override - public final boolean offer(final T e) { + public boolean offer(final T e) { // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray buffer = producerBuffer; final long index = lpProducerIndex(); @@ -118,7 +118,7 @@ private AtomicReferenceArray lvNext(AtomicReferenceArray curr) { */ @SuppressWarnings("unchecked") @Override - public final T poll() { + public T poll() { // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray buffer = consumerBuffer; final long index = lpConsumerIndex(); @@ -158,7 +158,7 @@ private T newBufferPoll(AtomicReferenceArray nextBuffer, final long inde */ @SuppressWarnings("unchecked") @Override - public final T peek() { + public T peek() { final AtomicReferenceArray buffer = consumerBuffer; final long index = lpConsumerIndex(); final int mask = consumerMask; @@ -173,7 +173,7 @@ public final T peek() { @Override public void clear() { - while (poll() != null || !isEmpty()); + while (poll() != null || !isEmpty()); // NOPMD } @SuppressWarnings("unchecked") @@ -184,7 +184,7 @@ private T newBufferPeek(AtomicReferenceArray nextBuffer, final long inde } @Override - public final int size() { + public int size() { /* * It is possible for a thread to be interrupted or reschedule between the read of the producer and * consumer indices, therefore protection is required to ensure size is within valid range. In the @@ -235,22 +235,22 @@ private void soConsumerIndex(long v) { consumerIndex.lazySet(v); } - private static final int calcWrappedOffset(long index, int mask) { + private static int calcWrappedOffset(long index, int mask) { return calcDirectOffset((int)index & mask); } - private static final int calcDirectOffset(int index) { + private static int calcDirectOffset(int index) { return index; } - private static final void soElement(AtomicReferenceArray buffer, int offset, Object e) { + private static void soElement(AtomicReferenceArray buffer, int offset, Object e) { buffer.lazySet(offset, e); } - private static final Object lvElement(AtomicReferenceArray buffer, int offset) { + private static Object lvElement(AtomicReferenceArray buffer, int offset) { return buffer.get(offset); } @Override - public final Iterator iterator() { + public Iterator iterator() { throw new UnsupportedOperationException(); } diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index 4948ee5c98..d3e968bdb9 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -27,6 +27,8 @@ * to requested Scheduler.Workers in a round-robin fashion. */ public final class ComputationScheduler extends Scheduler { + /** This will indicate no pool is active. */ + static final FixedSchedulerPool NONE = new FixedSchedulerPool(0); /** Manages a fixed number of workers. */ private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-"; private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); @@ -37,6 +39,11 @@ public final class ComputationScheduler extends Scheduler { static final String KEY_MAX_THREADS = "rx2.computation-threads"; /** The maximum number of computation scheduler threads. */ static final int MAX_THREADS; + + static final PoolWorker SHUTDOWN_WORKER; + + final AtomicReference pool; + static { int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0); int ncpu = Runtime.getRuntime().availableProcessors(); @@ -47,14 +54,11 @@ public final class ComputationScheduler extends Scheduler { max = maxThreads; } MAX_THREADS = max; - } - - static final PoolWorker SHUTDOWN_WORKER; - static { + SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown-")); SHUTDOWN_WORKER.dispose(); } - + static final class FixedSchedulerPool { final int cores; @@ -85,10 +89,6 @@ public void shutdown() { } } } - /** This will indicate no pool is active. */ - static final FixedSchedulerPool NONE = new FixedSchedulerPool(0); - - final AtomicReference pool; /** * Create a scheduler with pool size equal to the available processor @@ -139,7 +139,7 @@ public void shutdown() { } - private static class EventLoopWorker extends Scheduler.Worker { + static final class EventLoopWorker extends Scheduler.Worker { private final ListCompositeDisposable serial; private final CompositeDisposable timed; private final ListCompositeDisposable both; @@ -187,7 +187,7 @@ public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { } } - private static final class PoolWorker extends NewThreadWorker { + static final class PoolWorker extends NewThreadWorker { PoolWorker(ThreadFactory threadFactory) { super(threadFactory); } diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index e750aca650..36b57552e5 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -40,12 +40,18 @@ public final class IoScheduler extends Scheduler implements SchedulerLifecycle { private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS; static final ThreadWorker SHUTDOWN_THREADWORKER; + final AtomicReference pool; + + static final CachedWorkerPool NONE; static { + NONE = new CachedWorkerPool(0, null); + NONE.shutdown(); + SHUTDOWN_THREADWORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown-")); SHUTDOWN_THREADWORKER.dispose(); } - private static final class CachedWorkerPool { + static final class CachedWorkerPool { private final long keepAliveTime; private final ConcurrentLinkedQueue expiringWorkerQueue; private final CompositeDisposable allWorkers; @@ -138,14 +144,6 @@ void shutdown() { } } - final AtomicReference pool; - - static final CachedWorkerPool NONE; - static { - NONE = new CachedWorkerPool(0, null); - NONE.shutdown(); - } - public IoScheduler() { this.pool = new AtomicReference(NONE); start(); @@ -181,7 +179,7 @@ public int size() { return pool.get().allWorkers.size(); } - private static final class EventLoopWorker extends Scheduler.Worker { + static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; @@ -226,7 +224,7 @@ public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { } } - private static final class ThreadWorker extends NewThreadWorker { + static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index ead400baf2..caa3fb71fa 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -35,8 +35,7 @@ public SingleScheduler() { } static ScheduledExecutorService createExecutor() { - ScheduledExecutorService exec = SchedulerPoolFactory.create(new RxThreadFactory("RxSingleScheduler-")); - return exec; + return SchedulerPoolFactory.create(new RxThreadFactory("RxSingleScheduler-")); } @Override diff --git a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java index 43e6fc8660..ec81cc0e49 100644 --- a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java @@ -139,7 +139,7 @@ static final class TimedRunnable implements Comparable { volatile boolean disposed; - private TimedRunnable(Runnable run, Long execTime, int count) { + TimedRunnable(Runnable run, Long execTime, int count) { this.run = run; this.execTime = execTime; this.count = count; diff --git a/src/main/java/io/reactivex/internal/subscriptions/SubscriptionHelper.java b/src/main/java/io/reactivex/internal/subscriptions/SubscriptionHelper.java index 3590a5f609..286e5931df 100644 --- a/src/main/java/io/reactivex/internal/subscriptions/SubscriptionHelper.java +++ b/src/main/java/io/reactivex/internal/subscriptions/SubscriptionHelper.java @@ -25,6 +25,11 @@ */ public enum SubscriptionHelper { ; + /** + * Represents a cancelled Subscription. + *

Don't leak this instance! + */ + public static final Subscription CANCELLED = Cancelled.INSTANCE; /** * Verifies that current is null, next is not null, otherwise signals errors @@ -66,11 +71,6 @@ public static boolean validateRequest(long n) { return true; } - /** - * Represents a cancelled Subscription. - */ - public static final Subscription CANCELLED = Cancelled.INSTANCE; - /** * Check if the given subscription is the common cancelled subscription. * @param d the subscription to check diff --git a/src/main/java/io/reactivex/internal/util/Exceptions.java b/src/main/java/io/reactivex/internal/util/Exceptions.java index e1b5545d40..456205ea78 100644 --- a/src/main/java/io/reactivex/internal/util/Exceptions.java +++ b/src/main/java/io/reactivex/internal/util/Exceptions.java @@ -38,7 +38,7 @@ public static RuntimeException propagate(Throwable t) { } else if (t instanceof Error) { throw (Error) t; } else { - throw new RuntimeException(t); + throw new RuntimeException(t); // NOPMD } } diff --git a/src/main/java/io/reactivex/internal/util/LinkedArrayList.java b/src/main/java/io/reactivex/internal/util/LinkedArrayList.java index 9295f1eeab..5c617ad0e9 100644 --- a/src/main/java/io/reactivex/internal/util/LinkedArrayList.java +++ b/src/main/java/io/reactivex/internal/util/LinkedArrayList.java @@ -77,14 +77,14 @@ public void add(Object o) { * @return the head object array */ public Object[] head() { - return head; + return head; // NOPMD } /** * Returns the tail buffer segment or null if the list is empty. * @return the tail object array */ public Object[] tail() { - return tail; + return tail; // NOPMD } /** * Returns the total size of the list. diff --git a/src/main/java/io/reactivex/internal/util/NotificationLite.java b/src/main/java/io/reactivex/internal/util/NotificationLite.java index fd9f6c2715..1d5d7439f8 100644 --- a/src/main/java/io/reactivex/internal/util/NotificationLite.java +++ b/src/main/java/io/reactivex/internal/util/NotificationLite.java @@ -40,7 +40,7 @@ public String toString() { /** * Wraps a Throwable. */ - private static final class ErrorNotification implements Serializable { + static final class ErrorNotification implements Serializable { /** */ private static final long serialVersionUID = -8759979445933046293L; final Throwable e; @@ -57,7 +57,7 @@ public String toString() { /** * Wraps a Subscription. */ - private static final class SubscriptionNotification implements Serializable { + static final class SubscriptionNotification implements Serializable { /** */ private static final long serialVersionUID = -1322257508628817540L; final Subscription s; @@ -74,7 +74,7 @@ public String toString() { /** * Wraps a Disposable. */ - private static final class DisposableNotification implements Serializable { + static final class DisposableNotification implements Serializable { /** */ private static final long serialVersionUID = -7482590109178395495L; final Disposable d; diff --git a/src/main/java/io/reactivex/internal/util/OpenHashSet.java b/src/main/java/io/reactivex/internal/util/OpenHashSet.java index 9bc4877762..a5740cc1c6 100644 --- a/src/main/java/io/reactivex/internal/util/OpenHashSet.java +++ b/src/main/java/io/reactivex/internal/util/OpenHashSet.java @@ -30,6 +30,8 @@ * @param the element type */ public final class OpenHashSet { + private static final int INT_PHI = 0x9E3779B9; + final float loadFactor; int mask; int size; @@ -161,7 +163,7 @@ void rehash() { for (int j = size; j-- != 0; ) { - while (a[--i] == null); + while (a[--i] == null); // NOPMD int pos = mix(a[i].hashCode()) & m; if (b[pos] != null) { for (;;) { @@ -179,8 +181,6 @@ void rehash() { this.keys = b; } - private static final int INT_PHI = 0x9E3779B9; - static int mix(int x) { final int h = x * INT_PHI; return h ^ (h >>> 16); @@ -227,7 +227,7 @@ public boolean isEmpty() { } public Object[] keys() { - return keys; + return keys; // NOPMD } public int size() { diff --git a/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java b/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java index 09029fd734..8cb187d71a 100644 --- a/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java +++ b/src/main/java/io/reactivex/internal/util/QueueDrainHelper.java @@ -165,7 +165,6 @@ public static void drainLoop(Queue q, Subscriber a, boolean } long r = qd.requested(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; while (e != r) { @@ -187,7 +186,7 @@ public static void drainLoop(Queue q, Subscriber a, boolean } } - if (e != 0L && !unbounded) { + if (e != 0L && r != Long.MAX_VALUE) { qd.produced(e); } diff --git a/src/main/java/io/reactivex/observables/BlockingObservable.java b/src/main/java/io/reactivex/observables/BlockingObservable.java index 2572697dbd..a8443fdff5 100644 --- a/src/main/java/io/reactivex/observables/BlockingObservable.java +++ b/src/main/java/io/reactivex/observables/BlockingObservable.java @@ -58,7 +58,7 @@ public void forEach(Consumer action) { } } - static final BlockingIterator iterate(Observable p) { + static BlockingIterator iterate(Observable p) { final BlockingQueue queue = new LinkedBlockingQueue(); NbpLambdaSubscriber ls = new NbpLambdaSubscriber( @@ -420,7 +420,7 @@ private void awaitForComplete(CountDownLatch latch, Disposable subscription) { // for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780 Thread.currentThread().interrupt(); // using Runtime so it is not checked - throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); + throw new IllegalStateException("Interrupted while waiting for subscription to complete.", e); } } @@ -450,11 +450,7 @@ public void run() { awaitForComplete(cdl, ls); Throwable e = error[0]; if (e != null) { - if (e instanceof RuntimeException) { - throw (RuntimeException)e; - } else { - throw new RuntimeException(e); - } + Exceptions.propagate(e); } } diff --git a/src/main/java/io/reactivex/observers/TestObserver.java b/src/main/java/io/reactivex/observers/TestObserver.java index ab86250956..5e8db8fd3f 100644 --- a/src/main/java/io/reactivex/observers/TestObserver.java +++ b/src/main/java/io/reactivex/observers/TestObserver.java @@ -300,7 +300,7 @@ private void fail(String prefix, String message, Iterable e } else { ce.suppress(e); } - }; + } if (!ce.isEmpty()) { ae.initCause(ce); } diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 988ed0fda1..b9e241da59 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -188,14 +188,14 @@ public static void onError(Throwable error) { if (error == null) { error = new NullPointerException(); } - e.printStackTrace(); + e.printStackTrace(); // NOPMD } } else { if (error == null) { error = new NullPointerException(); } } - error.printStackTrace(); + error.printStackTrace(); // NOPMD } public static Scheduler onIoScheduler(Scheduler defaultScheduler) { @@ -416,7 +416,7 @@ public void accept(Throwable e) { * Returns a consumer which relays the received Throwable to RxJavaPlugins.onError(). * @return the consumer */ - public static final Consumer errorConsumer() { + public static Consumer errorConsumer() { return CONSUME_BY_RXJAVA_PLUGIN; } diff --git a/src/main/java/io/reactivex/processors/AsyncProcessor.java b/src/main/java/io/reactivex/processors/AsyncProcessor.java index db919882c2..67fe7280fe 100644 --- a/src/main/java/io/reactivex/processors/AsyncProcessor.java +++ b/src/main/java/io/reactivex/processors/AsyncProcessor.java @@ -34,6 +34,14 @@ * @param the value type */ public final class AsyncProcessor extends FlowProcessor { + /** The state holding onto the latest value or error and the array of subscribers. */ + final State state; + /** + * Indicates the subject has been terminated. It is checked in the onXXX methods in + * a relaxed matter: concurrent calls may not properly see it (which shouldn't happen if + * the reactive-streams contract is held). + */ + boolean done; /** * Constructs an empty AsyncSubject. @@ -44,15 +52,6 @@ public static AsyncProcessor create() { return new AsyncProcessor(); } - /** The state holding onto the latest value or error and the array of subscribers. */ - final State state; - /** - * Indicates the subject has been terminated. It is checked in the onXXX methods in - * a relaxed matter: concurrent calls may not properly see it (which shouldn't happen if - * the reactive-streams contract is held). - */ - boolean done; - protected AsyncProcessor() { this.state = new State(); } diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index 69c7d054f7..2677f1469f 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -26,6 +26,7 @@ import io.reactivex.plugins.RxJavaPlugins; public final class BehaviorProcessor extends FlowProcessor { + final State state; public static BehaviorProcessor create() { return new BehaviorProcessor(new State()); @@ -39,8 +40,6 @@ public static BehaviorProcessor createDefault(T defaultValue) { return new BehaviorProcessor(state); } - final State state; - protected BehaviorProcessor(State state) { this.state = state; } diff --git a/src/main/java/io/reactivex/processors/FlowProcessor.java b/src/main/java/io/reactivex/processors/FlowProcessor.java index 9eff6eaaf4..ecabc4ca54 100644 --- a/src/main/java/io/reactivex/processors/FlowProcessor.java +++ b/src/main/java/io/reactivex/processors/FlowProcessor.java @@ -26,6 +26,9 @@ * @param the item value type */ public abstract class FlowProcessor extends Flowable implements Processor { + /** An empty array to avoid allocation in getValues(). */ + private static final Object[] EMPTY = new Object[0]; + /** * Returns true if the subject has subscribers. @@ -97,9 +100,6 @@ public final FlowProcessor toSerialized() { return new SerializedProcessor(this); } - /** An empty array to avoid allocation in getValues(). */ - private static final Object[] EMPTY = new Object[0]; - /** * Returns an Object array containing snapshot all values of the Subject. *

The method is thread-safe. diff --git a/src/main/java/io/reactivex/processors/PublishProcessor.java b/src/main/java/io/reactivex/processors/PublishProcessor.java index 8bf606e9b6..557bf126a4 100644 --- a/src/main/java/io/reactivex/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/processors/PublishProcessor.java @@ -40,15 +40,6 @@ */ public final class PublishProcessor extends FlowProcessor { - /** - * Constructs a PublishSubject. - * @param the value type - * @return the new PublishSubject - */ - public static PublishProcessor create() { - return new PublishProcessor(); - } - /** Holds the terminal event and manages the array of subscribers. */ final State state; /** @@ -58,6 +49,15 @@ public static PublishProcessor create() { */ boolean done; + /** + * Constructs a PublishSubject. + * @param the value type + * @return the new PublishSubject + */ + public static PublishProcessor create() { + return new PublishProcessor(); + } + protected PublishProcessor() { this.state = new State(); } diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 7dfcb0f5da..5181bad438 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -35,7 +35,8 @@ * @param the value type */ public final class ReplayProcessor extends FlowProcessor { - + final State state; + public static ReplayProcessor create() { return create(16); } @@ -85,8 +86,6 @@ static ReplayProcessor createWithBuffer(ReplayBuffer buffer) { } - final State state; - protected ReplayProcessor(State state) { this.state = state; } @@ -165,7 +164,7 @@ public boolean hasThrowable() { @Override public boolean hasValue() { - return state.buffer.size() != 0; + return state.buffer.size() != 0; // NOPMD } /* test*/ int size() { @@ -486,7 +485,6 @@ public void replay(ReplaySubscription rs) { int s = size; long r = rs.requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; while (s != index) { @@ -528,7 +526,7 @@ public void replay(ReplaySubscription rs) { } if (e != 0L) { - if (!unbounded) { + if (rs.requested.get() != Long.MAX_VALUE) { r = rs.requested.addAndGet(e); } } @@ -718,7 +716,6 @@ public void replay(ReplaySubscription rs) { } long r = rs.requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0; for (;;) { @@ -764,7 +761,7 @@ public void replay(ReplaySubscription rs) { } if (e != 0L) { - if (!unbounded) { + if (rs.requested.get() != Long.MAX_VALUE) { r = rs.requested.addAndGet(e); } } @@ -999,7 +996,6 @@ public void replay(ReplaySubscription rs) { } long r = rs.requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0; for (;;) { @@ -1045,7 +1041,7 @@ public void replay(ReplaySubscription rs) { } if (e != 0L) { - if (!unbounded) { + if (rs.requested.get() != Long.MAX_VALUE) { r = rs.requested.addAndGet(e); } } diff --git a/src/main/java/io/reactivex/processors/UnicastProcessor.java b/src/main/java/io/reactivex/processors/UnicastProcessor.java index 4bee959fcd..c5e47b6f70 100644 --- a/src/main/java/io/reactivex/processors/UnicastProcessor.java +++ b/src/main/java/io/reactivex/processors/UnicastProcessor.java @@ -37,7 +37,10 @@ * @param the value type unicasted */ public final class UnicastProcessor extends FlowProcessor { - + + /** The subject state. */ + final State state; + /** * Creates an UnicastSubject with an internal buffer capacity hint 16. * @param the value type @@ -74,8 +77,6 @@ public static UnicastProcessor create(int capacityHint, Runnable onCancel return new UnicastProcessor(state); } - /** The subject state. */ - final State state; /** * Constructs the Observable base class. * @param state the subject state @@ -298,10 +299,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0L) { + while (e != r) { if (cancelled) { clear(q); @@ -330,12 +330,11 @@ void drain() { a.onNext(v); - r--; - e--; + e++; } - if (e != 0 && !unbounded) { - requested.getAndAdd(e); + if (e != 0 && r != Long.MAX_VALUE) { + requested.getAndAdd(-e); } } diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index 7df7ad1767..cad17396e9 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -20,16 +20,6 @@ import io.reactivex.plugins.RxJavaPlugins; public final class Schedulers { - - /* - * TODO I started to like enums for singletons and non-instantiatable - * utility classes, but since this is part of the public API, - * that would act quite unorthodoxically. - */ - private Schedulers() { - throw new IllegalStateException("No instances"); - } - static final Scheduler SINGLE; static final Scheduler COMPUTATION; @@ -53,6 +43,15 @@ private Schedulers() { NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(NewThreadScheduler.instance()); } + /* + * TODO I started to like enums for singletons and non-instantiatable + * utility classes, but since this is part of the public API, + * that would act quite unorthodoxically. + */ + private Schedulers() { + throw new IllegalStateException("No instances"); + } + public static Scheduler computation() { return RxJavaPlugins.onComputationScheduler(COMPUTATION); } diff --git a/src/main/java/io/reactivex/schedulers/TestScheduler.java b/src/main/java/io/reactivex/schedulers/TestScheduler.java index 3550335749..96a15664b5 100644 --- a/src/main/java/io/reactivex/schedulers/TestScheduler.java +++ b/src/main/java/io/reactivex/schedulers/TestScheduler.java @@ -33,15 +33,17 @@ public final class TestScheduler extends Scheduler { private final Queue queue = new PriorityBlockingQueue(11); /** The per-scheduler global order counter. */ long counter; + // Storing time in nanoseconds internally. + private volatile long time; - private static final class TimedRunnable implements Comparable { + static final class TimedRunnable implements Comparable { private final long time; private final Runnable run; private final TestWorker scheduler; private final long count; // for differentiating tasks at same time - private TimedRunnable(TestWorker scheduler, long time, Runnable run, long count) { + TimedRunnable(TestWorker scheduler, long time, Runnable run, long count) { this.time = time; this.run = run; this.scheduler = scheduler; @@ -62,9 +64,6 @@ public int compareTo(TimedRunnable o) { } } - // Storing time in nanoseconds internally. - private volatile long time; - @Override public long now(TimeUnit unit) { return unit.convert(time, TimeUnit.NANOSECONDS); diff --git a/src/main/java/io/reactivex/schedulers/Timed.java b/src/main/java/io/reactivex/schedulers/Timed.java index f1bba85a87..c8e473553e 100644 --- a/src/main/java/io/reactivex/schedulers/Timed.java +++ b/src/main/java/io/reactivex/schedulers/Timed.java @@ -59,6 +59,14 @@ public boolean equals(Object other) { return false; } + @Override + public int hashCode() { + int h = value != null ? value.hashCode() : 0; + h = h * 31 + (int)((time >>> 31) ^ (time & 0xFFFFFFFF)); + h = h * 31 + unit.hashCode(); + return h; + } + @Override public String toString() { return "Timed[time=" + time + ", unit=" + unit + ", value=" + value + "]"; diff --git a/src/main/java/io/reactivex/subjects/AsyncSubject.java b/src/main/java/io/reactivex/subjects/AsyncSubject.java index 779f41906a..38984a5e94 100644 --- a/src/main/java/io/reactivex/subjects/AsyncSubject.java +++ b/src/main/java/io/reactivex/subjects/AsyncSubject.java @@ -34,12 +34,12 @@ */ public final class AsyncSubject extends Subject { + final State state; + public static AsyncSubject create() { return new AsyncSubject(); } - final State state; - protected AsyncSubject() { this.state = new State(); } diff --git a/src/main/java/io/reactivex/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/subjects/BehaviorSubject.java index 2240edbf7b..b314a02da5 100644 --- a/src/main/java/io/reactivex/subjects/BehaviorSubject.java +++ b/src/main/java/io/reactivex/subjects/BehaviorSubject.java @@ -25,6 +25,7 @@ import io.reactivex.plugins.RxJavaPlugins; public final class BehaviorSubject extends Subject { + final State state; public static BehaviorSubject create() { State state = new State(); @@ -39,7 +40,6 @@ public static BehaviorSubject createDefault(T defaultValue) { return new BehaviorSubject(state); } - final State state; protected BehaviorSubject(State state) { this.state = state; } diff --git a/src/main/java/io/reactivex/subjects/PublishSubject.java b/src/main/java/io/reactivex/subjects/PublishSubject.java index 28493999ff..0a5dc04549 100644 --- a/src/main/java/io/reactivex/subjects/PublishSubject.java +++ b/src/main/java/io/reactivex/subjects/PublishSubject.java @@ -22,11 +22,11 @@ import io.reactivex.plugins.RxJavaPlugins; public final class PublishSubject extends Subject { + final State state; public static PublishSubject create() { return new PublishSubject(); } - final State state; protected PublishSubject() { this.state = new State(); } diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 650f8cf1d7..e85572f9e4 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -34,6 +34,7 @@ * @param the value type */ public final class ReplaySubject extends Subject { + final State state; public static ReplaySubject create() { return create(16); @@ -84,8 +85,6 @@ static ReplaySubject createWithBuffer(ReplayBuffer buffer) { } - final State state; - protected ReplaySubject(State state) { this.state = state; } @@ -164,7 +163,7 @@ public boolean hasThrowable() { @Override public boolean hasValue() { - return state.buffer.size() != 0; + return state.buffer.size() != 0; // NOPMD } /* test*/ int size() { diff --git a/src/main/java/io/reactivex/subjects/Subject.java b/src/main/java/io/reactivex/subjects/Subject.java index 9e8ea8c1f3..8f95b938ff 100644 --- a/src/main/java/io/reactivex/subjects/Subject.java +++ b/src/main/java/io/reactivex/subjects/Subject.java @@ -24,6 +24,8 @@ * @param the item value type */ public abstract class Subject extends Observable implements Observer { + /** An empty array to avoid allocation in getValues(). */ + private static final Object[] EMPTY = new Object[0]; /** * Returns true if the subject has subscribers. @@ -95,9 +97,6 @@ public final Subject toSerialized() { return new SerializedSubject(this); } - /** An empty array to avoid allocation in getValues(). */ - private static final Object[] EMPTY = new Object[0]; - /** * Returns an Object array containing snapshot all values of the Subject. *

The method is thread-safe. diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 70ad65b37f..55a02a1c85 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -36,6 +36,8 @@ * @param the value type unicasted */ public final class UnicastSubject extends Subject { + /** The subject state. */ + final State state; /** * Creates an UnicastSubject with an internal buffer capacity hint 16. @@ -73,8 +75,6 @@ public static UnicastSubject create(int capacityHint, Runnable onCancelle return new UnicastSubject(state); } - /** The subject state. */ - final State state; /** * Constructs the Observable base class. * @param state the subject state diff --git a/src/main/java/io/reactivex/subscribers/SafeSubscriber.java b/src/main/java/io/reactivex/subscribers/SafeSubscriber.java index f4f5adb56d..04c4694316 100644 --- a/src/main/java/io/reactivex/subscribers/SafeSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/SafeSubscriber.java @@ -65,7 +65,7 @@ public void onSubscribe(Subscription s) { try { s.cancel(); } catch (Throwable e1) { - CompositeException ce = new CompositeException(); + CompositeException ce = new CompositeException(); // NOPMD ce.suppress(e1); ce.suppress(e); e = ce; diff --git a/src/main/java/io/reactivex/subscribers/TestSubscriber.java b/src/main/java/io/reactivex/subscribers/TestSubscriber.java index 58faddadcc..0fbd6af99d 100644 --- a/src/main/java/io/reactivex/subscribers/TestSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/TestSubscriber.java @@ -360,7 +360,7 @@ private void fail(String prefix, String message, Iterable e } else { ce.suppress(e); } - }; + } if (!ce.isEmpty()) { ae.initCause(ce); } diff --git a/src/test/java/io/reactivex/flowable/BackpressureTests.java b/src/test/java/io/reactivex/flowable/BackpressureTests.java index 398e9ac41e..2b2f52a5e3 100644 --- a/src/test/java/io/reactivex/flowable/BackpressureTests.java +++ b/src/test/java/io/reactivex/flowable/BackpressureTests.java @@ -33,7 +33,7 @@ public class BackpressureTests { - private static final class FirehoseNoBackpressure extends AtomicBoolean implements Subscription { + static final class FirehoseNoBackpressure extends AtomicBoolean implements Subscription { /** */ private static final long serialVersionUID = -669931580197884015L; final Subscriber s; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index 50ef483d72..f6612cba05 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -287,7 +287,7 @@ public void testLongTimeAction() throws InterruptedException { assertFalse(action.fail); } - private static class LongTimeAction implements Consumer> { + static final class LongTimeAction implements Consumer> { CountDownLatch latch; boolean fail = false;