Skip to content

Commit

Permalink
2.x: Fix mergeWith not canceling other when the main fails (ReactiveX…
Browse files Browse the repository at this point in the history
…#6599)

* 2.x: Fix mergeWith not canceling other when the main fails

* Switch to OpenJDK compilation as OracleJDK is not available

* Add more time to refCount testing

* More time again

* Looks like 250ms is still not enough, let's loop
  • Loading branch information
akarnokd authored Jul 30, 2019
1 parent 5550b63 commit e784201
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: java
jdk:
- oraclejdk8
- openjdk8

# force upgrade Java8 as per https://github.com/travis-ci/travis-ci/issues/4042 (fixes compilation issue)
#addons:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable ex) {
SubscriptionHelper.cancel(mainSubscription);
DisposableHelper.dispose(otherObserver);
HalfSerializer.onError(downstream, ex, this, error);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable ex) {
if (error.addThrowable(ex)) {
SubscriptionHelper.cancel(mainSubscription);
DisposableHelper.dispose(otherObserver);
drain();
} else {
RxJavaPlugins.onError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable ex) {
if (error.addThrowable(ex)) {
SubscriptionHelper.cancel(mainSubscription);
DisposableHelper.dispose(otherObserver);
drain();
} else {
RxJavaPlugins.onError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable ex) {
DisposableHelper.dispose(mainDisposable);
DisposableHelper.dispose(otherObserver);
HalfSerializer.onError(downstream, ex, this, error);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable ex) {
if (error.addThrowable(ex)) {
DisposableHelper.dispose(mainDisposable);
DisposableHelper.dispose(otherObserver);
drain();
} else {
RxJavaPlugins.onError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable ex) {
if (error.addThrowable(ex)) {
DisposableHelper.dispose(mainDisposable);
DisposableHelper.dispose(otherObserver);
drain();
} else {
RxJavaPlugins.onError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,40 @@ public void run() {
ts.assertResult(1);
}
}

@Test
public void cancelOtherOnMainError() {
PublishProcessor<Integer> pp = PublishProcessor.create();
CompletableSubject cs = CompletableSubject.create();

TestSubscriber<Integer> ts = pp.mergeWith(cs).test();

assertTrue(pp.hasSubscribers());
assertTrue(cs.hasObservers());

pp.onError(new TestException());

ts.assertFailure(TestException.class);

assertFalse("main has observers!", pp.hasSubscribers());
assertFalse("other has observers", cs.hasObservers());
}

@Test
public void cancelMainOnOtherError() {
PublishProcessor<Integer> pp = PublishProcessor.create();
CompletableSubject cs = CompletableSubject.create();

TestSubscriber<Integer> ts = pp.mergeWith(cs).test();

assertTrue(pp.hasSubscribers());
assertTrue(cs.hasObservers());

cs.onError(new TestException());

ts.assertFailure(TestException.class);

assertFalse("main has observers!", pp.hasSubscribers());
assertFalse("other has observers", cs.hasObservers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,40 @@ public void onNext(Integer t) {
ts.assertValueCount(Flowable.bufferSize());
ts.assertComplete();
}

@Test
public void cancelOtherOnMainError() {
PublishProcessor<Integer> pp = PublishProcessor.create();
MaybeSubject<Integer> ms = MaybeSubject.create();

TestSubscriber<Integer> ts = pp.mergeWith(ms).test();

assertTrue(pp.hasSubscribers());
assertTrue(ms.hasObservers());

pp.onError(new TestException());

ts.assertFailure(TestException.class);

assertFalse("main has observers!", pp.hasSubscribers());
assertFalse("other has observers", ms.hasObservers());
}

@Test
public void cancelMainOnOtherError() {
PublishProcessor<Integer> pp = PublishProcessor.create();
MaybeSubject<Integer> ms = MaybeSubject.create();

TestSubscriber<Integer> ts = pp.mergeWith(ms).test();

assertTrue(pp.hasSubscribers());
assertTrue(ms.hasObservers());

ms.onError(new TestException());

ts.assertFailure(TestException.class);

assertFalse("main has observers!", pp.hasSubscribers());
assertFalse("other has observers", ms.hasObservers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,40 @@ public void onNext(Integer t) {
ts.assertValueCount(Flowable.bufferSize());
ts.assertComplete();
}

@Test
public void cancelOtherOnMainError() {
PublishProcessor<Integer> pp = PublishProcessor.create();
SingleSubject<Integer> ss = SingleSubject.create();

TestSubscriber<Integer> ts = pp.mergeWith(ss).test();

assertTrue(pp.hasSubscribers());
assertTrue(ss.hasObservers());

pp.onError(new TestException());

ts.assertFailure(TestException.class);

assertFalse("main has observers!", pp.hasSubscribers());
assertFalse("other has observers", ss.hasObservers());
}

@Test
public void cancelMainOnOtherError() {
PublishProcessor<Integer> pp = PublishProcessor.create();
SingleSubject<Integer> ss = SingleSubject.create();

TestSubscriber<Integer> ts = pp.mergeWith(ss).test();

assertTrue(pp.hasSubscribers());
assertTrue(ss.hasObservers());

ss.onError(new TestException());

ts.assertFailure(TestException.class);

assertFalse("main has observers!", pp.hasSubscribers());
assertFalse("other has observers", ss.hasObservers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,40 @@ protected void subscribeActual(Observer<? super Integer> observer) {
.test()
.assertResult(1);
}

@Test
public void cancelOtherOnMainError() {
PublishSubject<Integer> ps = PublishSubject.create();
CompletableSubject cs = CompletableSubject.create();

TestObserver<Integer> to = ps.mergeWith(cs).test();

assertTrue(ps.hasObservers());
assertTrue(cs.hasObservers());

ps.onError(new TestException());

to.assertFailure(TestException.class);

assertFalse("main has observers!", ps.hasObservers());
assertFalse("other has observers", cs.hasObservers());
}

@Test
public void cancelMainOnOtherError() {
PublishSubject<Integer> ps = PublishSubject.create();
CompletableSubject cs = CompletableSubject.create();

TestObserver<Integer> to = ps.mergeWith(cs).test();

assertTrue(ps.hasObservers());
assertTrue(cs.hasObservers());

cs.onError(new TestException());

to.assertFailure(TestException.class);

assertFalse("main has observers!", ps.hasObservers());
assertFalse("other has observers", cs.hasObservers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,39 @@ public void onNext(Integer t) {
to.assertResult(0, 1, 2, 3, 4);
}

@Test
public void cancelOtherOnMainError() {
PublishSubject<Integer> ps = PublishSubject.create();
MaybeSubject<Integer> ms = MaybeSubject.create();

TestObserver<Integer> to = ps.mergeWith(ms).test();

assertTrue(ps.hasObservers());
assertTrue(ms.hasObservers());

ps.onError(new TestException());

to.assertFailure(TestException.class);

assertFalse("main has observers!", ps.hasObservers());
assertFalse("other has observers", ms.hasObservers());
}

@Test
public void cancelMainOnOtherError() {
PublishSubject<Integer> ps = PublishSubject.create();
MaybeSubject<Integer> ms = MaybeSubject.create();

TestObserver<Integer> to = ps.mergeWith(ms).test();

assertTrue(ps.hasObservers());
assertTrue(ms.hasObservers());

ms.onError(new TestException());

to.assertFailure(TestException.class);

assertFalse("main has observers!", ps.hasObservers());
assertFalse("other has observers", ms.hasObservers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,40 @@ public void onNext(Integer t) {

to.assertResult(0, 1, 2, 3, 4);
}

@Test
public void cancelOtherOnMainError() {
PublishSubject<Integer> ps = PublishSubject.create();
SingleSubject<Integer> ss = SingleSubject.create();

TestObserver<Integer> to = ps.mergeWith(ss).test();

assertTrue(ps.hasObservers());
assertTrue(ss.hasObservers());

ps.onError(new TestException());

to.assertFailure(TestException.class);

assertFalse("main has observers!", ps.hasObservers());
assertFalse("other has observers", ss.hasObservers());
}

@Test
public void cancelMainOnOtherError() {
PublishSubject<Integer> ps = PublishSubject.create();
SingleSubject<Integer> ss = SingleSubject.create();

TestObserver<Integer> to = ps.mergeWith(ss).test();

assertTrue(ps.hasObservers());
assertTrue(ss.hasObservers());

ss.onError(new TestException());

to.assertFailure(TestException.class);

assertFalse("main has observers!", ps.hasObservers());
assertFalse("other has observers", ss.hasObservers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ protected void subscribeActual(Observer<? super Integer> observer) {
@Test
public void replayNoLeak() throws Exception {
System.gc();
Thread.sleep(100);
Thread.sleep(250);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

Expand All @@ -646,7 +646,7 @@ public Object call() throws Exception {
source.subscribe();

System.gc();
Thread.sleep(100);
Thread.sleep(250);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

Expand All @@ -657,7 +657,7 @@ public Object call() throws Exception {
@Test
public void replayNoLeak2() throws Exception {
System.gc();
Thread.sleep(100);
Thread.sleep(250);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

Expand All @@ -680,7 +680,7 @@ public Object call() throws Exception {
d2 = null;

System.gc();
Thread.sleep(100);
Thread.sleep(250);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

Expand All @@ -701,7 +701,7 @@ static final class ExceptionData extends Exception {
@Test
public void publishNoLeak() throws Exception {
System.gc();
Thread.sleep(100);
Thread.sleep(250);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

Expand All @@ -716,10 +716,19 @@ public Object call() throws Exception {

source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer());

System.gc();
Thread.sleep(100);
long after = 0L;

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
for (int i = 0; i < 10; i++) {
System.gc();

after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

if (start + 20 * 1000 * 1000 > after) {
break;
}

Thread.sleep(100);
}

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
Expand All @@ -728,7 +737,7 @@ public Object call() throws Exception {
@Test
public void publishNoLeak2() throws Exception {
System.gc();
Thread.sleep(100);
Thread.sleep(250);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

Expand All @@ -751,7 +760,7 @@ public Object call() throws Exception {
d2 = null;

System.gc();
Thread.sleep(100);
Thread.sleep(250);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

Expand Down
Loading

0 comments on commit e784201

Please sign in to comment.