From 76fb595203ea0ec38580fa049b6522c0c6f7004f Mon Sep 17 00:00:00 2001 From: James Roper Date: Thu, 2 Aug 2018 15:20:33 +1000 Subject: [PATCH] Updated to newer TCK and fixed a number of bugs --- .../reactive/streams/akka/AkkaEngine.java | 64 ++++++----- .../akka/AkkaReactiveStreamsTckTest.java | 8 -- bin/buildDeps.sh | 2 +- .../reactive/streams/zerodep/BuiltGraph.java | 68 +++++------ .../streams/zerodep/CollectStage.java | 2 +- .../reactive/streams/zerodep/ConcatStage.java | 107 ++++++++---------- .../zerodep/FlatMapCompletionStage.java | 26 ++--- .../streams/zerodep/FlatMapIterableStage.java | 29 ++--- .../streams/zerodep/FlatMapStage.java | 15 +-- .../reactive/streams/zerodep/OfStage.java | 12 +- 10 files changed, 138 insertions(+), 195 deletions(-) diff --git a/akka/src/main/java/com/lightbend/microprofile/reactive/streams/akka/AkkaEngine.java b/akka/src/main/java/com/lightbend/microprofile/reactive/streams/akka/AkkaEngine.java index f550a5c..e9904c4 100644 --- a/akka/src/main/java/com/lightbend/microprofile/reactive/streams/akka/AkkaEngine.java +++ b/akka/src/main/java/com/lightbend/microprofile/reactive/streams/akka/AkkaEngine.java @@ -197,7 +197,8 @@ else if (size == 1) { return Source.from(stage.getElements()); }); addSourceStage(Stage.PublisherStage.class, stage -> Source.fromPublisher(stage.getRsPublisher())); - addSourceStage(Stage.Concat.class, stage -> buildSource(stage.getFirst()).concat(buildSource(stage.getSecond()))); + addSourceStage(Stage.Concat.class, stage -> buildSource(stage.getFirst()) + .concat(buildSource(stage.getSecond()))); addSourceStage(Stage.Failed.class, stage -> Source.failed(stage.getError())); // Flows @@ -281,35 +282,38 @@ public akka.stream.Graph, NotUsed> apply(Throwable x, boolea // Sinks addSinkStage(Stage.FindFirst.class, stage -> Sink.headOption()); addSinkStage(Stage.Collect.class, stage -> { - Collector collector = stage.getCollector(); - BiConsumer accumulator = collector.accumulator(); - Object firstContainer = collector.supplier().get(); - if (firstContainer == null) { - firstContainer = NULL; - } - Sink> sink = Sink.fold(firstContainer, (resultContainer, in) -> { - if (resultContainer == NULL) { - accumulator.accept(null, in); - } - else { - accumulator.accept(resultContainer, in); - } - return resultContainer; - }); - if (collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) && firstContainer != NULL) { - return sink; - } - else { - return sink.mapMaterializedValue(result -> result.thenApply(r -> { - if (r == NULL) { - return collector.finisher().apply(null); - } - else { - return collector.finisher().apply(r); - } - })); - } - }); + Collector collector = stage.getCollector(); + // Lazy inited sink so that exceptions thrown by supplier get propagated through completion stage, not directly. + return Sink.lazyInitAsync(() -> { + BiConsumer accumulator = collector.accumulator(); + Object firstContainer = collector.supplier().get(); + if (firstContainer == null) { + firstContainer = NULL; + } + return CompletableFuture.completedFuture(Sink.fold(firstContainer, (resultContainer, in) -> { + if (resultContainer == NULL) { + accumulator.accept(null, in); + } else { + accumulator.accept(resultContainer, in); + } + return resultContainer; + })); + }).mapMaterializedValue(asyncMaybeResult -> asyncMaybeResult.thenCompose(maybeResult -> { + CompletionStage resultContainer; + if (maybeResult.isPresent()) { + resultContainer = maybeResult.get(); + } else { + resultContainer = CompletableFuture.completedFuture(collector.supplier().get()); + } + return resultContainer.thenApply(container -> { + if (collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) && resultContainer != NULL) { + return container; + } else { + return collector.finisher().apply(container); + } + }); + })); + }); addSinkStage(Stage.SubscriberStage.class, stage -> Flow.create() .viaMat(new TerminationWatcher(), Keep.right()) diff --git a/akka/src/test/java/com/lightbend/microprofile/reactive/streams/akka/AkkaReactiveStreamsTckTest.java b/akka/src/test/java/com/lightbend/microprofile/reactive/streams/akka/AkkaReactiveStreamsTckTest.java index 8ccf919..8482914 100644 --- a/akka/src/test/java/com/lightbend/microprofile/reactive/streams/akka/AkkaReactiveStreamsTckTest.java +++ b/akka/src/test/java/com/lightbend/microprofile/reactive/streams/akka/AkkaReactiveStreamsTckTest.java @@ -8,10 +8,7 @@ import akka.stream.ActorMaterializer; import akka.stream.Materializer; import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine; -import org.eclipse.microprofile.reactive.streams.tck.CancelStageVerification; -import org.eclipse.microprofile.reactive.streams.tck.FlatMapStageVerification; import org.eclipse.microprofile.reactive.streams.tck.ReactiveStreamsTck; -import org.reactivestreams.tck.IdentityProcessorVerification; import org.reactivestreams.tck.TestEnvironment; import org.testng.annotations.AfterSuite; @@ -40,9 +37,4 @@ protected AkkaEngine createEngine() { materializer = ActorMaterializer.create(system); return new AkkaEngine(materializer); } - - @Override - protected boolean isEnabled(Object test) { - return true; - } } diff --git a/bin/buildDeps.sh b/bin/buildDeps.sh index 7dcab2d..34f69ce 100755 --- a/bin/buildDeps.sh +++ b/bin/buildDeps.sh @@ -1,7 +1,7 @@ #!/bin/bash # This should be set to the commit hash that is being tracked. Needed even if TRACKING_PR is set. -TRACKING_COMMIT="886a91d5" +TRACKING_COMMIT="0d0c4f50" # To track a particular pull request, put it's number here, otherwise comment it out. # TRACKING_PR="67" diff --git a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java index 402750e..be13408 100644 --- a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java +++ b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java @@ -269,9 +269,6 @@ private Builder buildGraph(Graph graph, Shape shape) { } } - ports.addAll(builderPorts); - stages.addAll(builderStages); - return this; } @@ -284,8 +281,22 @@ private void verifyReady() { for (Port port : builderPorts) { port.verifyReady(); } + ports.addAll(builderPorts); + } + + /** + * Start the stages on this listener + */ + private void startGraph() { + execute(() -> { + for (GraphStage stage : builderStages) { + stages.add(stage); + stage.postStart(); + } + }); } + private SubStageInlet inlet() { Objects.requireNonNull(lastInlet, "Not an inlet graph"); assert result == null; @@ -360,18 +371,8 @@ private void addStage(Stage stage, StageInlet inlet, Publisher publisher, StageO addStage(new OfStage(BuiltGraph.this, outlet, ((Stage.Of) stage).getElements())); } else if (stage instanceof Stage.Concat) { - - // Use this builder to build each of the sub stages that are being concatenated as an inlet graph, and then - // capture the last inlet of each to pass to the concat stage. - buildGraph(((Stage.Concat) stage).getFirst(), Shape.INLET); - StageInlet firstInlet = lastInlet; - lastInlet = null; - - buildGraph(((Stage.Concat) stage).getSecond(), Shape.INLET); - StageInlet secondInlet = lastInlet; - lastInlet = null; - - addStage(new ConcatStage(BuiltGraph.this, firstInlet, secondInlet, outlet)); + Stage.Concat concat = (Stage.Concat) stage; + addStage(new ConcatStage(BuiltGraph.this, buildSubInlet(concat.getFirst()), buildSubInlet(concat.getSecond()), outlet)); } else if (stage instanceof Stage.PublisherStage) { addStage(new ConnectorStage(BuiltGraph.this, ((Stage.PublisherStage) stage).getRsPublisher(), subscriber)); } else if (stage instanceof Stage.Failed) { @@ -517,17 +518,6 @@ public void execute(Runnable command) { }); } - /** - * Start the whole graph. - */ - private void startGraph() { - execute(() -> { - for (GraphStage stage : stages) { - stage.postStart(); - } - }); - } - private void streamFailure(Throwable error) { // todo handle better error.printStackTrace(); @@ -558,8 +548,6 @@ final class SubStageInlet implements StageInlet { private final List subStages; private final List subStagePorts; - private boolean started = false; - private SubStageInlet(StageInlet delegate, List subStages, List subStagePorts) { this.delegate = delegate; this.subStages = subStages; @@ -568,20 +556,24 @@ private SubStageInlet(StageInlet delegate, List subStages, List

{ + stages.removeAll(subStages); + ports.removeAll(subStagePorts); + }); } @Override public void pull() { - if (!started) { - throw new IllegalStateException("Pull before the sub stream has been started."); - } delegate.pull(); } @@ -602,17 +594,11 @@ public boolean isClosed() { @Override public void cancel() { - if (!started) { - throw new IllegalStateException("Cancel before the sub stream has been started."); - } delegate.cancel(); } @Override public T grab() { - if (!started) { - throw new IllegalStateException("Grab before the sub stream has been started."); - } return delegate.grab(); } diff --git a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/CollectStage.java b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/CollectStage.java index ad69366..99bda1b 100644 --- a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/CollectStage.java +++ b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/CollectStage.java @@ -23,12 +23,12 @@ public CollectStage(BuiltGraph builtGraph, StageInlet inlet, this.result = result; this.collector = collector; - container = collector.supplier().get(); inlet.setListener(this); } @Override protected void postStart() { + container = collector.supplier().get(); inlet.pull(); } diff --git a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/ConcatStage.java b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/ConcatStage.java index 21e559f..2b5df3d 100644 --- a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/ConcatStage.java +++ b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/ConcatStage.java @@ -4,92 +4,79 @@ package com.lightbend.microprofile.reactive.streams.zerodep; -public class ConcatStage extends GraphStage implements OutletListener { +public class ConcatStage extends GraphStage implements InletListener, OutletListener { - private final StageInlet first; - private final StageInlet second; + private final BuiltGraph.SubStageInlet first; + private final BuiltGraph.SubStageInlet second; private final StageOutlet outlet; - private Throwable secondError; - - public ConcatStage(BuiltGraph builtGraph, StageInlet first, StageInlet second, StageOutlet outlet) { + public ConcatStage(BuiltGraph builtGraph, BuiltGraph.SubStageInlet first, BuiltGraph.SubStageInlet second, StageOutlet outlet) { super(builtGraph); this.first = first; this.second = second; this.outlet = outlet; - first.setListener(new FirstInletListener()); - second.setListener(new SecondInletListener()); + first.setListener(this); outlet.setListener(this); } + @Override + protected void postStart() { + first.start(); + } + @Override public void onPull() { - if (first.isClosed()) { - second.pull(); - } else { - first.pull(); - } + first.pull(); } @Override public void onDownstreamFinish() { - if (!first.isClosed()) { - first.cancel(); - } - if (!second.isClosed()) { - second.cancel(); - } + first.cancel(); + // Start up second so we can shut it down, in case it holds any resources. + startAndCancelSecond(); } - private class FirstInletListener implements InletListener { - @Override - public void onPush() { - outlet.push(first.grab()); - } + @Override + public void onPush() { + outlet.push(first.grab()); + } - @Override - public void onUpstreamFinish() { - if (second.isClosed()) { - if (secondError != null) { - outlet.fail(secondError); - } else { - outlet.complete(); - } - } else if (outlet.isAvailable()) { - second.pull(); - } + @Override + public void onUpstreamFinish() { + second.forwardTo(outlet); + outlet.forwardTo(second); + second.start(); + if (outlet.isAvailable()) { + second.pull(); } + } - @Override - public void onUpstreamFailure(Throwable error) { - outlet.fail(error); - if (!second.isClosed()) { - second.cancel(); - } - } + @Override + public void onUpstreamFailure(Throwable error) { + outlet.fail(error); + startAndCancelSecond(); } - private class SecondInletListener implements InletListener { - @Override - public void onPush() { - outlet.push(second.grab()); - } + private void startAndCancelSecond() { + try { + second.setListener(new InletListener() { + @Override + public void onPush() { + } - @Override - public void onUpstreamFinish() { - if (first.isClosed()) { - outlet.complete(); - } - } + @Override + public void onUpstreamFinish() { + } - @Override - public void onUpstreamFailure(Throwable error) { - if (first.isClosed()) { - outlet.fail(error); - } else { - secondError = error; - } + @Override + public void onUpstreamFailure(Throwable error) { + } + }); + second.start(); + second.cancel(); + } catch (Exception e) { + // Ignore exceptions } } } diff --git a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapCompletionStage.java b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapCompletionStage.java index ee8c267..3f8448f 100644 --- a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapCompletionStage.java +++ b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapCompletionStage.java @@ -15,7 +15,7 @@ class FlatMapCompletionStage extends GraphStage implements InletListener { private final StageOutlet outlet; private final Function> mapper; - private Throwable error; + private CompletionStage activeCompletionStage; FlatMapCompletionStage(BuiltGraph builtGraph, StageInlet inlet, StageOutlet outlet, Function> mapper) { super(builtGraph); @@ -29,20 +29,16 @@ class FlatMapCompletionStage extends GraphStage implements InletListener { @Override public void onPush() { - CompletionStage future = mapper.apply(inlet.grab()); - future.whenCompleteAsync((result, error) -> { + activeCompletionStage = mapper.apply(inlet.grab()); + activeCompletionStage.whenCompleteAsync((result, error) -> { + activeCompletionStage = null; if (!outlet.isClosed()) { if (error == null) { outlet.push(result); if (inlet.isClosed()) { - if (this.error != null) { - outlet.fail(this.error); - } else { - outlet.complete(); - } + outlet.complete(); } } else { - outlet.fail(error); if (!inlet.isClosed()) { inlet.cancel(); @@ -54,21 +50,13 @@ public void onPush() { @Override public void onUpstreamFinish() { - if (!activeCompletionStage()) { + if (activeCompletionStage == null) { outlet.complete(); } } @Override public void onUpstreamFailure(Throwable error) { - if (activeCompletionStage()) { - this.error = error; - } else { - outlet.fail(error); - } - } - - private boolean activeCompletionStage() { - return outlet.isAvailable() && !inlet.isPulled(); + outlet.fail(error); } } diff --git a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapIterableStage.java b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapIterableStage.java index 06d475a..1eab0c5 100644 --- a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapIterableStage.java +++ b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapIterableStage.java @@ -15,7 +15,6 @@ class FlatMapIterableStage extends GraphStage implements InletListener, Ou private final StageOutlet outlet; private final Function> mapper; - private Throwable error; private Iterator iterator; FlatMapIterableStage(BuiltGraph builtGraph, StageInlet inlet, StageOutlet outlet, Function> mapper) { @@ -30,17 +29,15 @@ class FlatMapIterableStage extends GraphStage implements InletListener, Ou @Override public void onPush() { - Iterator iterator = mapper.apply(inlet.grab()).iterator(); + iterator = mapper.apply(inlet.grab()).iterator(); if (iterator.hasNext()) { - this.iterator = iterator; - outlet.push(iterator.next()); - // Make sure we're still on the same iterator in case a recursive call changed things - if (!iterator.hasNext() && this.iterator == iterator) { - this.iterator = null; + if (!iterator.hasNext()) { + iterator = null; } } else { + iterator = null; inlet.pull(); } } @@ -54,11 +51,8 @@ public void onUpstreamFinish() { @Override public void onUpstreamFailure(Throwable error) { - if (iterator == null) { - outlet.fail(error); - } else { - this.error = error; - } + // Allow failures to overtake elements here + outlet.fail(error); } @Override @@ -66,16 +60,11 @@ public void onPull() { if (iterator == null) { inlet.pull(); } else { - Iterator iterator = this.iterator; outlet.push(iterator.next()); - if (!iterator.hasNext() && this.iterator == iterator) { - this.iterator = null; + if (!iterator.hasNext()) { + iterator = null; if (inlet.isClosed()) { - if (error != null) { - outlet.fail(error); - } else { - outlet.complete(); - } + outlet.complete(); } } } diff --git a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapStage.java b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapStage.java index f43018e..637470d 100644 --- a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapStage.java +++ b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FlatMapStage.java @@ -14,7 +14,6 @@ class FlatMapStage extends GraphStage implements InletListener, OutletList private final Function mapper; private BuiltGraph.SubStageInlet substream; - private Throwable error; FlatMapStage(BuiltGraph builtGraph, StageInlet inlet, StageOutlet outlet, Function mapper) { super(builtGraph); @@ -40,11 +39,7 @@ public void onPush() { public void onUpstreamFinish() { substream = null; if (inlet.isClosed()) { - if (error != null) { - outlet.fail(error); - } else { - outlet.complete(); - } + outlet.complete(); } else if (outlet.isAvailable()) { inlet.pull(); } @@ -52,6 +47,7 @@ public void onUpstreamFinish() { @Override public void onUpstreamFailure(Throwable error) { + substream = null; outlet.fail(error); if (!inlet.isClosed()) { inlet.cancel(); @@ -71,10 +67,9 @@ public void onUpstreamFinish() { @Override public void onUpstreamFailure(Throwable error) { - if (substream == null) { - outlet.fail(error); - } else { - this.error = error; + outlet.fail(error); + if (substream != null) { + substream.cancel(); } } diff --git a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/OfStage.java b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/OfStage.java index cdfca04..df5f041 100644 --- a/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/OfStage.java +++ b/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/OfStage.java @@ -11,20 +11,22 @@ */ class OfStage extends GraphStage implements OutletListener { private final StageOutlet outlet; - private Iterator elements; + private final Iterable elements; + private Iterator iterator; public OfStage(BuiltGraph builtGraph, StageOutlet outlet, Iterable elements) { super(builtGraph); this.outlet = outlet; - this.elements = elements.iterator(); + this.elements = elements; outlet.setListener(this); } @Override protected void postStart() { + iterator = elements.iterator(); try { - if (!elements.hasNext()) { + if (!iterator.hasNext()) { outlet.complete(); } } catch (Exception e) { @@ -34,8 +36,8 @@ protected void postStart() { @Override public void onPull() { - outlet.push(elements.next()); - if (!elements.hasNext()) { + outlet.push(iterator.next()); + if (!iterator.hasNext()) { outlet.complete(); } }