diff --git a/streams/pom.xml b/streams/pom.xml
index 2fce81f..ce7c473 100644
--- a/streams/pom.xml
+++ b/streams/pom.xml
@@ -37,6 +37,7 @@
api
tck
+ tck-arquillian
spec
@@ -67,6 +68,23 @@
org.osgi.annotation.versioning
1.0.0
+
+ org.jboss.arquillian
+ arquillian-bom
+ 1.1.15.Final
+ import
+ pom
+
+
+ org.testng
+ testng
+ 6.11
+
+
+ javax.enterprise
+ cdi-api
+ 2.0
+
diff --git a/streams/tck-arquillian/pom.xml b/streams/tck-arquillian/pom.xml
new file mode 100644
index 0000000..216c745
--- /dev/null
+++ b/streams/tck-arquillian/pom.xml
@@ -0,0 +1,131 @@
+
+
+
+
+
+ 4.0.0
+
+
+ org.eclipse.microprofile.reactive.streams
+ microprofile-reactive-streams-parent
+ 1.0-SNAPSHOT
+
+
+ microprofile-reactive-streams-operators-tck-arquillian
+ MicroProfile Reactive Streams Operators TCK Arquillian
+ MicroProfile Reactive Streams Operators :: TCK Arquillian runner
+
+
+
+ org.eclipse.microprofile.reactive.streams
+ microprofile-reactive-streams-operators
+ ${project.version}
+ provided
+
+
+ org.eclipse.microprofile.reactive.streams
+ microprofile-reactive-streams-operators-tck
+ ${project.version}
+
+
+ org.jboss.arquillian.test
+ arquillian-test-spi
+
+
+ org.jboss.arquillian.testng
+ arquillian-testng-container
+
+
+ javax.enterprise
+ cdi-api
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+ org.eclipse.microprofile.maven
+ microprofile-maven-build-extension
+ true
+
+
+
+
+
+
+ eclipse-jarsigner
+
+
+
+ org.eclipse.cbi.maven.plugins
+ eclipse-jarsigner-plugin
+
+
+
+ sign
+
+
+
+
+
+
+
+
+
+
diff --git a/streams/tck-arquillian/src/main/java/org/eclipse/microprofile/reactive/streams/tck/arquillian/ReactiveStreamsArquillianTck.java b/streams/tck-arquillian/src/main/java/org/eclipse/microprofile/reactive/streams/tck/arquillian/ReactiveStreamsArquillianTck.java
new file mode 100644
index 0000000..3b90309
--- /dev/null
+++ b/streams/tck-arquillian/src/main/java/org/eclipse/microprofile/reactive/streams/tck/arquillian/ReactiveStreamsArquillianTck.java
@@ -0,0 +1,177 @@
+/*******************************************************************************
+ * Copyright (c) 2018 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * You may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.eclipse.microprofile.reactive.streams.tck.arquillian;
+
+import org.eclipse.microprofile.reactive.streams.tck.ReactiveStreamsTck;
+import org.jboss.arquillian.container.test.api.Deployment;
+import org.jboss.arquillian.testng.Arquillian;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.asset.EmptyAsset;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.reactivestreams.tck.TestEnvironment;
+import org.testng.IClassListener;
+import org.testng.IMethodInstance;
+import org.testng.IMethodInterceptor;
+import org.testng.IObjectFactory;
+import org.testng.ITestClass;
+import org.testng.ITestContext;
+import org.testng.ITestListener;
+import org.testng.ITestNGListener;
+import org.testng.ITestResult;
+import org.testng.TestNG;
+import org.testng.annotations.Test;
+import org.testng.internal.ObjectFactoryImpl;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test runner for running the TCK in Arquillian.
+ *
+ * It would be nice if this was able to run the tests properly, and I did get something working, but it was so complex
+ * and fragile because Arquillian really isn't that flexible that I decided it simply wasn't worth it, this is much
+ * simpler.
+ */
+public class ReactiveStreamsArquillianTck extends Arquillian {
+ @Deployment
+ public static JavaArchive tckDeployment() {
+ return ShrinkWrap.create(JavaArchive.class)
+ // Add everything from the TCK
+ .addPackages(true, ReactiveStreamsTck.class.getPackage())
+ // And add the reactive streams TCK
+ .addPackages(true, TestEnvironment.class.getPackage())
+ // And we need a CDI descriptor
+ .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml");
+
+ }
+
+ @Inject
+ private ReactiveStreamsCdiTck tck;
+
+ @Test
+ public void runAllTckTests() throws Throwable {
+ TestNG testng = new TestNG();
+
+ ObjectFactoryImpl delegate = new ObjectFactoryImpl();
+ testng.setObjectFactory((IObjectFactory) (constructor, params) -> {
+ if (constructor.getDeclaringClass().equals(ReactiveStreamsCdiTck.class)) {
+ return tck;
+ }
+ else {
+ return delegate.newInstance(constructor, params);
+ }
+ });
+
+ testng.setUseDefaultListeners(false);
+ ResultListener resultListener = new ResultListener();
+ testng.addListener((ITestNGListener) resultListener);
+ testng.setTestClasses(new Class[]{ ReactiveStreamsCdiTck.class });
+ testng.setMethodInterceptor(new IMethodInterceptor() {
+ @Override
+ public List intercept(List methods, ITestContext context) {
+ methods.sort(Comparator.comparing(m -> m.getInstance().getClass().getName()));
+ return methods;
+ }
+ });
+ testng.run();
+ int total = resultListener.success.get() + resultListener.failed.get() + resultListener.skipped.get();
+ System.out.println(String.format("Ran %d tests, %d passed, %d failed, %d skipped.", total, resultListener.success.get(),
+ resultListener.failed.get(), resultListener.skipped.get()));
+ System.out.println("Failed tests:");
+ resultListener.failures.forEach(result -> {
+ System.out.println(result.getInstance().getClass().getName() + "." + result.getMethod().getMethodName());
+ });
+ if (resultListener.failed.get() > 0) {
+ if (resultListener.lastFailure.get() != null) {
+ throw resultListener.lastFailure.get();
+ }
+ else {
+ throw new Exception("Tests failed with no exception");
+ }
+ }
+ }
+
+ private static class ResultListener implements IClassListener, ITestListener {
+ private final AtomicInteger success = new AtomicInteger();
+ private final AtomicInteger failed = new AtomicInteger();
+ private final AtomicInteger skipped = new AtomicInteger();
+ private final AtomicReference lastFailure = new AtomicReference<>();
+ private final List failures = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void onBeforeClass(ITestClass testClass) {
+ System.out.println(testClass.getName() + ":");
+ }
+
+ @Override
+ public void onAfterClass(ITestClass testClass) {
+ }
+
+ @Override
+ public void onTestStart(ITestResult result) {
+ }
+
+ @Override
+ public void onTestSuccess(ITestResult result) {
+ printResult(result, "SUCCESS");
+ success.incrementAndGet();
+ }
+
+ @Override
+ public void onTestFailure(ITestResult result) {
+ printResult(result, "FAILED");
+ if (result.getThrowable() != null) {
+ result.getThrowable().printStackTrace(System.out);
+ lastFailure.set(result.getThrowable());
+ }
+ failures.add(result);
+ failed.incrementAndGet();
+ }
+
+ @Override
+ public void onTestSkipped(ITestResult result) {
+ printResult(result, "SKIPPED");
+ skipped.incrementAndGet();
+ }
+
+ @Override
+ public void onTestFailedButWithinSuccessPercentage(ITestResult result) {
+ }
+
+ @Override
+ public void onStart(ITestContext context) {
+ }
+
+ @Override
+ public void onFinish(ITestContext context) {
+
+ }
+
+ private static void printResult(ITestResult result, String status) {
+ String methodName = String.format("%-100s", result.getMethod().getMethodName()).replace(' ', '.');
+ System.out.println(" - " + methodName + "." + status);
+ }
+ }
+}
diff --git a/streams/tck-arquillian/src/main/java/org/eclipse/microprofile/reactive/streams/tck/arquillian/ReactiveStreamsCdiTck.java b/streams/tck-arquillian/src/main/java/org/eclipse/microprofile/reactive/streams/tck/arquillian/ReactiveStreamsCdiTck.java
new file mode 100644
index 0000000..a0b80f3
--- /dev/null
+++ b/streams/tck-arquillian/src/main/java/org/eclipse/microprofile/reactive/streams/tck/arquillian/ReactiveStreamsCdiTck.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Copyright (c) 2018 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * You may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.eclipse.microprofile.reactive.streams.tck.arquillian;
+
+import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
+import org.eclipse.microprofile.reactive.streams.tck.ReactiveStreamsTck;
+import org.reactivestreams.tck.TestEnvironment;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+@ApplicationScoped
+public class ReactiveStreamsCdiTck extends ReactiveStreamsTck {
+
+ public ReactiveStreamsCdiTck() {
+ super(new TestEnvironment());
+ }
+
+ @Inject
+ private ReactiveStreamsEngine engine;
+
+ @Override
+ protected ReactiveStreamsEngine createEngine() {
+ return engine;
+ }
+}
diff --git a/streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/spi/OnErrorResumeStageVerification.java b/streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/spi/OnErrorResumeStageVerification.java
index a3e93f6..d68ac42 100644
--- a/streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/spi/OnErrorResumeStageVerification.java
+++ b/streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/spi/OnErrorResumeStageVerification.java
@@ -76,7 +76,7 @@ public void onErrorResumeWithPublisherShouldCatchErrorFromSource() {
assertEquals(await(ReactiveStreams.failed(new QuietRuntimeException("failed"))
.onErrorResumeWithPublisher(err -> {
exception.set(err);
- return ReactiveStreams.of("foo", "bar").buildRs();
+ return ReactiveStreams.of("foo", "bar").buildRs(getEngine());
})
.toList()
.run(getEngine())), Arrays.asList("foo", "bar"));
@@ -133,7 +133,7 @@ public void onErrorResumeWithPublisherShouldCatchErrorFromStage() {
})
.onErrorResumeWithPublisher(err -> {
exception.set(err);
- return ReactiveStreams.of("foo", "bar").buildRs();
+ return ReactiveStreams.of("foo", "bar").buildRs(getEngine());
})
.toList()
.run(getEngine())), Arrays.asList("A", "foo", "bar"));
@@ -182,7 +182,7 @@ public void onErrorResumeWithShouldBeAbleToInjectAFailure() {
@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*boom.*")
public void onErrorResumeWithPublisherShouldBeAbleToInjectAFailure() {
await(ReactiveStreams.failed(new QuietRuntimeException("failed"))
- .onErrorResumeWithPublisher(err -> ReactiveStreams.failed(new QuietRuntimeException("boom")).buildRs())
+ .onErrorResumeWithPublisher(err -> ReactiveStreams.failed(new QuietRuntimeException("boom")).buildRs(getEngine()))
.toList()
.run(getEngine()));
}