Skip to content

Commit

Permalink
Merge pull request #83 from jroper/streams-tck-massive-overhaul
Browse files Browse the repository at this point in the history
Massive overhaul of streams TCK
  • Loading branch information
jroper authored Aug 2, 2018
2 parents 6147150 + 0d0c4f5 commit 0c31068
Show file tree
Hide file tree
Showing 45 changed files with 2,991 additions and 645 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

/**
* Internal stages, used to capture the graph while being built, but never passed to a
* {@link org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine}.
* {@link org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine}. These exist for performance reasons,
* allowing the builder to hold the graph as an immutable linked tree where multiple stages can be appended in constant
* time, rather than needing to copy an array each time. However, when it comes to building the graph, it is first
* flattened out to an array, removing any of the internal stages that held nested stages, etc.
*/
class InternalStages {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.reactivestreams.Subscriber;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -242,6 +242,7 @@ public ProcessorBuilder<T, R> dropWhile(Predicate<? super R> predicate) {
* @return A new subscriber builder.
*/
public SubscriberBuilder<T, Void> forEach(Consumer<? super R> action) {
Objects.requireNonNull(action, "Action must not be null");
return collect(Collector.<R, Void, Void>of(
() -> null,
(n, r) -> action.accept(r),
Expand Down Expand Up @@ -303,24 +304,6 @@ public SubscriberBuilder<T, Optional<R>> reduce(BinaryOperator<R> accumulator) {
return addTerminalStage(new Stage.Collect(Reductions.reduce(accumulator)));
}

/**
* Perform a reduction on the elements of this stream, using the provided identity value, accumulation function and
* combiner function.
* <p>
* The result of the reduction is returned in the {@link CompletionSubscriber}.
*
* @param identity The identity value.
* @param accumulator The accumulator function.
* @param combiner The combiner function.
* @return A new subscriber builder.
*/
public <S> SubscriberBuilder<T, S> reduce(S identity,
BiFunction<S, ? super R, S> accumulator,
BinaryOperator<S> combiner) {

return addTerminalStage(new Stage.Collect(Reductions.reduce(identity, accumulator, combiner)));
}

/**
* Collect the elements emitted by this processor builder using the given {@link Collector}.
* <p>
Expand Down Expand Up @@ -536,6 +519,7 @@ public Processor<T, R> buildRs() {
* @return A {@link Processor} that will run this stream.
*/
public Processor<T, R> buildRs(ReactiveStreamsEngine engine) {
Objects.requireNonNull(engine, "Engine must not be null");
return engine.buildProcessor(toGraph());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.reactivestreams.Subscriber;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -241,6 +241,7 @@ public PublisherBuilder<T> dropWhile(Predicate<? super T> predicate) {
* @return A new completion builder.
*/
public CompletionRunner<Void> forEach(Consumer<? super T> action) {
Objects.requireNonNull(action, "Action must not be null");
return collect(Collector.<T, Void, Void>of(
() -> null,
(n, t) -> action.accept(t),
Expand Down Expand Up @@ -301,24 +302,6 @@ public CompletionRunner<Optional<T>> reduce(BinaryOperator<T> accumulator) {
return addTerminalStage(new Stage.Collect(Reductions.reduce(accumulator)));
}

/**
* Perform a reduction on the elements of this stream, using the provided identity value, accumulation function and
* combiner function.
* <p>
* The result of the reduction is returned in the {@link CompletionStage}.
*
* @param identity The identity value.
* @param accumulator The accumulator function.
* @param combiner The combiner function.
* @return A new completion builder.
*/
public <S> CompletionRunner<S> reduce(S identity,
BiFunction<S, ? super T, S> accumulator,
BinaryOperator<S> combiner) {

return addTerminalStage(new Stage.Collect(Reductions.reduce(identity, accumulator, combiner)));
}

/**
* Find the first element emitted by the {@link Publisher}, and return it in a
* {@link CompletionStage}.
Expand Down Expand Up @@ -390,6 +373,7 @@ public CompletionRunner<Void> to(Subscriber<T> subscriber) {
* @return A {@link CompletionRunner} that completes when the stream completes.
*/
public <R> CompletionRunner<R> to(SubscriberBuilder<T, R> subscriber) {
Objects.requireNonNull(subscriber, "Subscriber must not be null");
return addTerminalStage(new InternalStages.Nested(subscriber.getGraphBuilder()));
}

Expand Down Expand Up @@ -536,6 +520,7 @@ public Publisher<T> buildRs() {
* @return A {@link Publisher} that will run this stream.
*/
public Publisher<T> buildRs(ReactiveStreamsEngine engine) {
Objects.requireNonNull(engine, "Engine must not be null");
return engine.buildPublisher(toGraph());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
Expand Down Expand Up @@ -163,6 +164,7 @@ public static <T> SubscriberBuilder<T, Void> fromSubscriber(Subscriber<? extends
* @return A publisher builder.
*/
public static <T> PublisherBuilder<T> iterate(T seed, UnaryOperator<T> f) {
Objects.requireNonNull(f, "Operator must not be null");
return fromIterable(() -> Stream.iterate(seed, f).iterator());
}

Expand All @@ -174,6 +176,7 @@ public static <T> PublisherBuilder<T> iterate(T seed, UnaryOperator<T> f) {
* @return A publisher builder.
*/
public static <T> PublisherBuilder<T> generate(Supplier<? extends T> s) {
Objects.requireNonNull(s, "Supplier must not be null");
return fromIterable(() -> Stream.<T>generate((Supplier) s).iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.eclipse.microprofile.reactive.streams;

import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.Collector;

Expand All @@ -33,7 +33,7 @@ private Reductions() {
}

static <T> Collector<T, ?, Optional<T>> reduce(BinaryOperator<T> reducer) {

Objects.requireNonNull(reducer, "Reduction function must not be null");
return Collector.of(Reduction<T>::new,
(r, t) -> {
if (r.value == null) {
Expand All @@ -59,25 +59,14 @@ else if (s.value != null) {
}

static <T> Collector<T, ?, T> reduce(T identity, BinaryOperator<T> reducer) {

Objects.requireNonNull(reducer, "Reduction function must not be null");
return Collector.of(() -> new Reduction<>(identity),
(r, t) -> r.value = reducer.apply(r.value, t),
(r, s) -> r.replace(reducer.apply(r.value, s.value)),
r -> r.value
);
}

static <T, S> Collector<T, ?, S> reduce(S identity,
BiFunction<S, ? super T, S> accumulator,
BinaryOperator<S> combiner) {

return Collector.of(() -> new Reduction<>(identity),
(r, t) -> r.value = accumulator.apply(r.value, t),
(r, s) -> r.replace(combiner.apply(r.value, s.value)),
r -> r.value
);
}

private static class Reduction<T> {
private T value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
import org.eclipse.microprofile.reactive.streams.spi.Stage;

import java.util.Objects;

/**
* A builder for a {@link org.reactivestreams.Subscriber} and its result.
* <p>
Expand Down Expand Up @@ -63,6 +65,7 @@ public CompletionSubscriber<T, R> build() {
* @return A {@link CompletionSubscriber} that will run this stream.
*/
public CompletionSubscriber<T, R> build(ReactiveStreamsEngine engine) {
Objects.requireNonNull(engine, "Engine must not be null");
return engine.buildSubscriber(toGraph());
}

Expand Down
Loading

0 comments on commit 0c31068

Please sign in to comment.