diff --git a/build.gradle b/build.gradle index 2f33e66093..48565d07d0 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ targetCompatibility = JavaVersion.VERSION_1_6 // --------------------------------------- def junitVersion = "4.12" -def reactiveStreamsVersion = "1.0.2" +def reactiveStreamsVersion = "1.0.3" def mockitoVersion = "2.1.0" def jmhLibVersion = "1.20" def testNgVersion = "6.11" diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java index 20d2332214..ff7b6cced5 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java @@ -84,7 +84,7 @@ static final class NextObserver implements CompletableObserver { final CompletableObserver downstream; - public NextObserver(AtomicReference parent, CompletableObserver downstream) { + NextObserver(AtomicReference parent, CompletableObserver downstream) { this.parent = parent; this.downstream = downstream; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java index e573f3daf3..b325d2b78d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java @@ -79,6 +79,7 @@ public Publisher source() { } /** + * The internal buffer size of this FloawblePublish operator. * @return The internal buffer size of this FloawblePublish operator. */ @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishAlt.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishAlt.java index d58ba84503..932e0bf003 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishAlt.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishAlt.java @@ -62,6 +62,7 @@ public Publisher source() { } /** + * The internal buffer size of this FloawblePublishAlt operator. * @return The internal buffer size of this FloawblePublishAlt operator. */ public int publishBufferSize() { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishClassic.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishClassic.java index 0cbf70efad..27ded26592 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishClassic.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishClassic.java @@ -30,11 +30,13 @@ public interface FlowablePublishClassic { /** + * The upstream source of this publish operator. * @return the upstream source of this publish operator */ Publisher publishSource(); /** + * The internal buffer size of this publish operator. * @return the internal buffer size of this publish operator */ int publishBufferSize(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishAlt.java b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishAlt.java index 771e58dda8..a9e62babde 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishAlt.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishAlt.java @@ -146,7 +146,7 @@ static final class PublishConnection Throwable error; @SuppressWarnings("unchecked") - public PublishConnection(AtomicReference> current) { + PublishConnection(AtomicReference> current) { this.connect = new AtomicBoolean(); this.current = current; this.upstream = new AtomicReference(); @@ -261,7 +261,7 @@ static final class InnerDisposable final Observer downstream; - public InnerDisposable(Observer downstream, PublishConnection parent) { + InnerDisposable(Observer downstream, PublishConnection parent) { this.downstream = downstream; lazySet(parent); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishClassic.java b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishClassic.java index f072779930..b2bfe87f71 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishClassic.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservablePublishClassic.java @@ -30,6 +30,7 @@ public interface ObservablePublishClassic { /** + * The upstream source of this publish operator. * @return the upstream source of this publish operator */ ObservableSource publishSource(); diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java index b0b1339d29..ab465046aa 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java @@ -20,6 +20,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.functions.Function; + /** * Manages the creating of ScheduledExecutorServices and sets up purging. */ @@ -90,40 +92,48 @@ public static void shutdown() { } static { - Properties properties = System.getProperties(); - - PurgeProperties pp = new PurgeProperties(); - pp.load(properties); - - PURGE_ENABLED = pp.purgeEnable; - PURGE_PERIOD_SECONDS = pp.purgePeriod; + SystemPropertyAccessor propertyAccessor = new SystemPropertyAccessor(); + PURGE_ENABLED = getBooleanProperty(true, PURGE_ENABLED_KEY, true, true, propertyAccessor); + PURGE_PERIOD_SECONDS = getIntProperty(PURGE_ENABLED, PURGE_PERIOD_SECONDS_KEY, 1, 1, propertyAccessor); start(); } - static final class PurgeProperties { - - boolean purgeEnable; - - int purgePeriod; - - void load(Properties properties) { - if (properties.containsKey(PURGE_ENABLED_KEY)) { - purgeEnable = Boolean.parseBoolean(properties.getProperty(PURGE_ENABLED_KEY)); - } else { - purgeEnable = true; + static int getIntProperty(boolean enabled, String key, int defaultNotFound, int defaultNotEnabled, Function propertyAccessor) { + if (enabled) { + try { + String value = propertyAccessor.apply(key); + if (value == null) { + return defaultNotFound; + } + return Integer.parseInt(value); + } catch (Throwable ex) { + return defaultNotFound; } + } + return defaultNotEnabled; + } - if (purgeEnable && properties.containsKey(PURGE_PERIOD_SECONDS_KEY)) { - try { - purgePeriod = Integer.parseInt(properties.getProperty(PURGE_PERIOD_SECONDS_KEY)); - } catch (NumberFormatException ex) { - purgePeriod = 1; + static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNotFound, boolean defaultNotEnabled, Function propertyAccessor) { + if (enabled) { + try { + String value = propertyAccessor.apply(key); + if (value == null) { + return defaultNotFound; } - } else { - purgePeriod = 1; + return "true".equals(value); + } catch (Throwable ex) { + return defaultNotFound; } } + return defaultNotEnabled; + } + + static final class SystemPropertyAccessor implements Function { + @Override + public String apply(String t) throws Exception { + return System.getProperty(t); + } } /** diff --git a/src/main/java/io/reactivex/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/observers/BaseTestConsumer.java index 61db5a2356..4c92c27468 100644 --- a/src/main/java/io/reactivex/observers/BaseTestConsumer.java +++ b/src/main/java/io/reactivex/observers/BaseTestConsumer.java @@ -555,7 +555,6 @@ public final U assertValues(T... values) { * @return this * @since 2.2 */ - @SuppressWarnings("unchecked") public final U assertValuesOnly(T... values) { return assertSubscribed() .assertValues(values) diff --git a/src/test/java/io/reactivex/internal/schedulers/SchedulerPoolFactoryTest.java b/src/test/java/io/reactivex/internal/schedulers/SchedulerPoolFactoryTest.java index 603929ddb4..24b3daa801 100644 --- a/src/test/java/io/reactivex/internal/schedulers/SchedulerPoolFactoryTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/SchedulerPoolFactoryTest.java @@ -18,12 +18,11 @@ import static org.junit.Assert.*; -import java.util.Properties; - import org.junit.Test; import io.reactivex.TestHelper; -import io.reactivex.internal.schedulers.SchedulerPoolFactory.PurgeProperties; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.schedulers.Schedulers; public class SchedulerPoolFactoryTest { @@ -78,53 +77,66 @@ public void run() { } @Test - public void loadPurgeProperties() { - Properties props1 = new Properties(); - - PurgeProperties pp = new PurgeProperties(); - pp.load(props1); - - assertTrue(pp.purgeEnable); - assertEquals(pp.purgePeriod, 1); + public void boolPropertiesDisabledReturnsDefaultDisabled() throws Throwable { + assertTrue(SchedulerPoolFactory.getBooleanProperty(false, "key", false, true, failingPropertiesAccessor)); + assertFalse(SchedulerPoolFactory.getBooleanProperty(false, "key", true, false, failingPropertiesAccessor)); } @Test - public void loadPurgePropertiesDisabled() { - Properties props1 = new Properties(); - props1.setProperty(SchedulerPoolFactory.PURGE_ENABLED_KEY, "false"); + public void boolPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable { + assertTrue(SchedulerPoolFactory.getBooleanProperty(true, "key", true, false, missingPropertiesAccessor)); + assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "key", false, true, missingPropertiesAccessor)); + } - PurgeProperties pp = new PurgeProperties(); - pp.load(props1); + @Test + public void boolPropertiesFailureReturnsDefaultMissing() throws Throwable { + assertTrue(SchedulerPoolFactory.getBooleanProperty(true, "key", true, false, failingPropertiesAccessor)); + assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "key", false, true, failingPropertiesAccessor)); + } - assertFalse(pp.purgeEnable); - assertEquals(pp.purgePeriod, 1); + @Test + public void boolPropertiesReturnsValue() throws Throwable { + assertTrue(SchedulerPoolFactory.getBooleanProperty(true, "true", true, false, Functions.identity())); + assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "false", false, true, Functions.identity())); } @Test - public void loadPurgePropertiesEnabledCustomPeriod() { - Properties props1 = new Properties(); - props1.setProperty(SchedulerPoolFactory.PURGE_ENABLED_KEY, "true"); - props1.setProperty(SchedulerPoolFactory.PURGE_PERIOD_SECONDS_KEY, "2"); + public void intPropertiesDisabledReturnsDefaultDisabled() throws Throwable { + assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 0, -1, failingPropertiesAccessor)); + assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 1, -1, failingPropertiesAccessor)); + } - PurgeProperties pp = new PurgeProperties(); - pp.load(props1); + @Test + public void intPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable { + assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, missingPropertiesAccessor)); + assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, missingPropertiesAccessor)); + } - assertTrue(pp.purgeEnable); - assertEquals(pp.purgePeriod, 2); + @Test + public void intPropertiesFailureReturnsDefaultMissing() throws Throwable { + assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, failingPropertiesAccessor)); + assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, failingPropertiesAccessor)); } @Test - public void loadPurgePropertiesEnabledCustomPeriodNaN() { - Properties props1 = new Properties(); - props1.setProperty(SchedulerPoolFactory.PURGE_ENABLED_KEY, "true"); - props1.setProperty(SchedulerPoolFactory.PURGE_PERIOD_SECONDS_KEY, "abc"); + public void intPropertiesReturnsValue() throws Throwable { + assertEquals(1, SchedulerPoolFactory.getIntProperty(true, "1", 0, 4, Functions.identity())); + assertEquals(2, SchedulerPoolFactory.getIntProperty(true, "2", 3, 5, Functions.identity())); + } - PurgeProperties pp = new PurgeProperties(); - pp.load(props1); + static final Function failingPropertiesAccessor = new Function() { + @Override + public String apply(String v) throws Exception { + throw new SecurityException(); + } + }; - assertTrue(pp.purgeEnable); - assertEquals(pp.purgePeriod, 1); - } + static final Function missingPropertiesAccessor = new Function() { + @Override + public String apply(String v) throws Exception { + return null; + } + }; @Test public void putIntoPoolNoPurge() {