diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ada6755 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/.idea/ +/build/ +/.gradle/ diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..87b738c Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..2cfaf45 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..af6708f --- /dev/null +++ b/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..0f8d593 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/src/main/java/gr/james/sampling/AbstractThreadSafeRandomSampling.java b/src/main/java/gr/james/sampling/AbstractThreadSafeRandomSampling.java new file mode 100644 index 0000000..dc07d9d --- /dev/null +++ b/src/main/java/gr/james/sampling/AbstractThreadSafeRandomSampling.java @@ -0,0 +1,217 @@ +package gr.james.sampling; + +import java.util.AbstractList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.RandomAccess; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * This class provides a skeletal implementation of the {@link RandomSampling} interface to minimize the effort required + * to implement that interface. + * + * @param the item type + * @author Giorgos Stamatelatos + * @author Michael Böckling + */ +public abstract class AbstractThreadSafeRandomSampling implements RandomSampling, ThreadSafeRandomSampling { + + private final int sampleSize; + private final Random random; + private final AtomicReferenceArray sample; + private final AtomicInteger samplesCount; + private final Collection unmodifiableSample; + private AtomicLong streamSize; + private AtomicLong skip; + + /** + * Construct a new instance of this class using the specified sample size and RNG. The implementation assumes that + * {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure that. If this + * contract is violated, the behavior is undefined. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @throws NullPointerException if {@code random} is {@code null} + * @throws IllegalArgumentException if {@code sampleSize} is less than 1 + */ + AbstractThreadSafeRandomSampling(int sampleSize, Random random) { + if (random == null) { + throw new NullPointerException("Random was null"); + } + if (sampleSize < 1) { + throw new IllegalArgumentException("Sample size was less than 1"); + } + init(sampleSize, random); + this.random = random; + this.sampleSize = sampleSize; + this.streamSize = new AtomicLong(0); + this.sample = new AtomicReferenceArray<>(sampleSize); + this.samplesCount = new AtomicInteger(0); + this.skip = new AtomicLong(skipLength(sampleSize, sampleSize, random)); + this.unmodifiableSample = new AtomicReferenceArrayList<>(sample, samplesCount); + } + + /** + * {@inheritDoc} + * + * @param item {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws StreamOverflowException if the number of items feeded exceeds {@link Long#MAX_VALUE} + */ + @Override + public final boolean feed(T item) { + // Checks + if (item == null) { + throw new NullPointerException("Item was null"); + } + if (streamSize.get() == Long.MAX_VALUE) { + throw new StreamOverflowException(); + } + + // Increase stream size + long streamSize = this.streamSize.incrementAndGet(); + assert streamSize > 0; + + + // attempt to add to samples while we don't have a full count yet, until successful or array is full + for(int samplesInArray = samplesCount.get(); samplesInArray < sampleSize;) { + boolean arrayWasModified = sample.compareAndSet(samplesInArray, null, item); + if(!arrayWasModified) + continue; + samplesInArray = samplesCount.incrementAndGet(); + assert samplesInArray == Math.min(sampleSize(), streamSize); + return true; + } + + // try to either decrement the skip count or calculate a new skip count value, until either succeeds + while(true) { + long currentSkipValue = skip.get(); + if(currentSkipValue > 0) { + boolean decrementSuccess = skip.compareAndSet(currentSkipValue, currentSkipValue - 1); + if(decrementSuccess) { + return false; + } + } else { + assert currentSkipValue == 0; + long nextSkipValue = skipLength(streamSize, sampleSize, random); + boolean skipCountUpdated = skip.compareAndSet(currentSkipValue, nextSkipValue); + if(skipCountUpdated) { + sample.set(random.nextInt(sampleSize), item); + assert nextSkipValue >= 0; + return true; + } + } + } + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws StreamOverflowException if the number of items feeded exceeds {@link Long#MAX_VALUE} + */ + @Override + public final boolean feed(Iterator items) { + return RandomSampling.super.feed(items); + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws StreamOverflowException if the number of items feeded exceeds {@link Long#MAX_VALUE} + */ + @Override + public final boolean feed(Iterable items) { + return RandomSampling.super.feed(items); + } + + /** + * {@inheritDoc} + * + * @return {@inheritDoc} + */ + @Override + public final int sampleSize() { + assert this.sampleSize > 0; + return this.sampleSize; + } + + /** + * Get the number of items that have been feeded to the algorithm during the lifetime of this instance, which is a + * non-negative {@code long} value. + *

+ * This method runs in constant time. + * + * @return the number of items that have been feeded to the algorithm + */ + @Override + public final long streamSize() { + assert this.streamSize.get() >= 0; + return this.streamSize.get(); + } + + /** + * {@inheritDoc} + * + * @return {@inheritDoc} + */ + @Override + public final Collection sample() { + return this.unmodifiableSample; + } + + /** + * Returns how many items should the algorithm skip given its state. + *

+ * The implementation of this method must only rely on the given arguments and not on the state of the instance. + * + * @param streamSize how many items have been feeded to the sampler + * @param sampleSize expected sample size + * @param random the {@link Random} instance to use + * @return how many items to skip + */ + abstract long skipLength(long streamSize, int sampleSize, Random random); + + /** + * Performs initialization logic. + *

+ * This method is invoked in the constructor. + * + * @param sampleSize expected sample size + * @param random the {@link Random} instance assigned to this instance + */ + void init(int sampleSize, Random random) { + } + + static class AtomicReferenceArrayList extends AbstractList implements List, RandomAccess { + + private final AtomicReferenceArray array; + private final AtomicInteger arrayLength; + + AtomicReferenceArrayList(AtomicReferenceArray array, AtomicInteger arrayLength) { + this.array = array; + this.arrayLength = arrayLength; + } + + @Override + public int size() { + return arrayLength.get(); + } + + @Override + public T get(int index) { + return array.get(index); + } + } + +} diff --git a/src/main/java/gr/james/sampling/LiLSamplingThreadSafe.java b/src/main/java/gr/james/sampling/LiLSamplingThreadSafe.java new file mode 100644 index 0000000..fb3e6f6 --- /dev/null +++ b/src/main/java/gr/james/sampling/LiLSamplingThreadSafe.java @@ -0,0 +1,89 @@ +package gr.james.sampling; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of Algorithm L by Li in Reservoir-sampling algorithms of time complexity + * O(n(1 + log(N/n))). + *

+ * Unlike {@link WatermanSampling}, the {@link VitterXSampling}, {@link VitterZSampling} and {@code LiLSampling} + * algorithms decide how many items to skip, rather than deciding whether or not to skip an item each time it is feeded. + * This property allows these algorithms to perform better by efficiently calculating the number of items that need to + * be skipped, while making fewer calls to the RNG. + *

+ * This implementation throws {@link StreamOverflowException} if more than {@link Long#MAX_VALUE} items are feeded. + *

+ * The space complexity of this class is {@code O(k)}, where {@code k} is the sample size. + * + * @param the item type + * @author Giorgos Stamatelatos + * @author Michael Böckling + * @see Reservoir-sampling algorithms of time complexity + * O(n(1 + log(N/n))) + */ +public class LiLSamplingThreadSafe extends AbstractThreadSafeRandomSampling { + + private AtomicLong W; + + /** + * Construct a new instance of {@link LiLSamplingThreadSafe} using the specified sample size and RNG. The implementation + * assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure that. + * If this contract is violated, the behavior is undefined. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @throws NullPointerException if {@code random} is {@code null} + * @throws IllegalArgumentException if {@code sampleSize} is less than 1 + */ + public LiLSamplingThreadSafe(int sampleSize, Random random) { + super(sampleSize, random); + } + + /** + * Construct a new instance of {@link LiLSamplingThreadSafe} using the specified sample size and a default source of + * randomness. + * + * @param sampleSize the sample size + * @throws IllegalArgumentException if {@code sampleSize} is less than 1 + */ + public LiLSamplingThreadSafe(int sampleSize) { + this(sampleSize, ThreadLocalRandom.current()); + } + + /** + * Get a {@link RandomSamplingCollector} from this class. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @param the type of elements + * @return a {@link RandomSamplingCollector} from this class + */ + public static RandomSamplingCollector collector(int sampleSize, Random random) { + return new RandomSamplingCollector<>(() -> new LiLSamplingThreadSafe<>(sampleSize, random)); + } + + @Override + void init(int sampleSize, Random random) { + //W = Math.pow(RandomSamplingUtils.randomExclusive(random), 1.0 / sampleSize); + W = new AtomicLong(); + W.set(Double.doubleToLongBits(Math.pow(RandomSamplingUtils.randomExclusive(random), 1.0 / sampleSize))); + } + + @Override + long skipLength(long streamSize, int sampleSize, Random random) { + final double random1 = RandomSamplingUtils.randomExclusive(random); + final double random2 = RandomSamplingUtils.randomExclusive(random); + double w = Double.longBitsToDouble(W.get()); + long skip = (long) (Math.log(random1) / Math.log(1 - w)); + assert skip >= 0 || skip == Long.MIN_VALUE; + if (skip == Long.MIN_VALUE) { // Sometimes when W is very small, 1 - W = 1 and Math.log(1) = +0 instead of -0 + skip = Long.MAX_VALUE; // This results in negative infinity skip + } + // W = W * Math.pow(random2, 1.0 / sampleSize); + W.set(Double.doubleToLongBits(w * Math.pow(random2, 1.0 / sampleSize))); + return skip; + } + +} diff --git a/src/main/java/gr/james/sampling/ThreadSafeRandomSampling.java b/src/main/java/gr/james/sampling/ThreadSafeRandomSampling.java new file mode 100644 index 0000000..98aa4ca --- /dev/null +++ b/src/main/java/gr/james/sampling/ThreadSafeRandomSampling.java @@ -0,0 +1,6 @@ +package gr.james.sampling; + +/** + * Marker interface. + */ +public interface ThreadSafeRandomSampling {} diff --git a/src/test/java/gr/james/sampling/Benchmark.java b/src/test/java/gr/james/sampling/Benchmark.java index 9486b3d..b193d82 100644 --- a/src/test/java/gr/james/sampling/Benchmark.java +++ b/src/test/java/gr/james/sampling/Benchmark.java @@ -11,6 +11,7 @@ public class Benchmark { private static final VitterXSampling vitterx = new VitterXSampling<>(sample, random); private static final VitterZSampling vitterz = new VitterZSampling<>(sample, random); private static final LiLSampling lil = new LiLSampling<>(sample, random); + private static final LiLSamplingThreadSafe lilThreadSafe = new LiLSamplingThreadSafe<>(sample, random); private static final EfraimidisSampling efraimidis = new EfraimidisSampling<>(sample, random); private static final ChaoSampling chao = new ChaoSampling<>(sample, random); @@ -19,6 +20,7 @@ public static void main(String[] args) { System.out.printf("%10s %5d ms%n", "VitterX", performance(vitterx) / 1000000); System.out.printf("%10s %5d ms%n", "VitterZ", performance(vitterz) / 1000000); System.out.printf("%10s %5d ms%n", "LiL", performance(lil) / 1000000); + System.out.printf("%10s %5d ms%n", "LiL Thread Safe", performance(lilThreadSafe) / 1000000); System.out.printf("%10s %5d ms%n", "Efraimidis", performance(efraimidis) / 1000000); System.out.printf("%10s %5d ms%n", "Chao", performance(chao) / 1000000); } diff --git a/src/test/java/gr/james/sampling/RandomSamplingTest.java b/src/test/java/gr/james/sampling/RandomSamplingTest.java index a1971dc..45a7175 100644 --- a/src/test/java/gr/james/sampling/RandomSamplingTest.java +++ b/src/test/java/gr/james/sampling/RandomSamplingTest.java @@ -1,11 +1,18 @@ package gr.james.sampling; +import static org.junit.Assert.assertEquals; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -31,6 +38,7 @@ public static Collection>> implementations() { implementations.add(() -> new VitterXSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new VitterZSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new LiLSampling<>(SAMPLE, RANDOM)); + implementations.add(() -> new LiLSamplingThreadSafe<>(SAMPLE, RANDOM)); implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM)); return implementations; @@ -44,30 +52,55 @@ public void correctness() { final int[] streamSizes = {1, 20, 100}; final int[] repsSizes = {1000000, 1000000, 2000000}; - Assert.assertEquals(streamSizes.length, repsSizes.length); + assertEquals(streamSizes.length, repsSizes.length); for (int test = 0; test < streamSizes.length; test++) { final int STREAM = streamSizes[test]; + final int numCores = (impl.get() instanceof ThreadSafeRandomSampling) ? + Runtime.getRuntime().availableProcessors() : 1; final int REPS = repsSizes[test]; - final int[] d = new int[STREAM]; + final AtomicIntegerArray d = new AtomicIntegerArray(STREAM); + ExecutorService executorService = Executors.newFixedThreadPool(numCores); + List> taskList = new ArrayList<>(numCores); + + for(int core = 0; core < numCores; core++) { + taskList.add(() -> { + + for (int reps = 0; reps < (REPS / numCores); reps++) { + final RandomSampling alg = impl.get(); - for (int reps = 0; reps < REPS; reps++) { - final RandomSampling alg = impl.get(); + for (int i = 0; i < STREAM; i++) { + alg.feed(i); + } - for (int i = 0; i < STREAM; i++) { - alg.feed(i); - } + for (int s : alg.sample()) { + d.incrementAndGet(s); + } + } + + return null; + }); + } - for (int s : alg.sample()) { - d[s]++; - } + // wait until all threads are done + try { + executorService.invokeAll(taskList).stream().map(future -> { + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - for (int c : d) { + for (int i = 0; i < d.length(); i++) { + int c = d.get(i); final double expected = (double) REPS * Math.min(SAMPLE, STREAM) / STREAM; final double actual = (double) c; - Assert.assertEquals(1, actual / expected, 1e-2); + assertEquals(1, actual / expected, 1e-2); } } } @@ -98,6 +131,8 @@ public void stream20() { collector = ChaoSampling.collector(SAMPLE, RANDOM); } else if (alg instanceof LiLSampling) { collector = LiLSampling.collector(SAMPLE, RANDOM); + } else if (alg instanceof LiLSamplingThreadSafe) { + collector = LiLSamplingThreadSafe.collector(SAMPLE, RANDOM); } else { throw new AssertionError(); } @@ -112,7 +147,7 @@ public void stream20() { for (int c : d) { final double expected = (double) REPS * SAMPLE / STREAM; final double actual = (double) c; - Assert.assertEquals(1, actual / expected, 1e-2); + assertEquals(1, actual / expected, 1e-2); } } @@ -134,10 +169,10 @@ public void feedAlternative() { rs3.feed(set); Assert.assertTrue(RandomSamplingUtils.samplesEquals(rs1.sample(), rs2.sample())); Assert.assertTrue(RandomSamplingUtils.samplesEquals(rs2.sample(), rs3.sample())); - Assert.assertEquals(rs1.streamSize(), rs2.streamSize()); - Assert.assertEquals(rs2.streamSize(), rs3.streamSize()); - Assert.assertEquals(rs1.sample().size(), rs2.sample().size()); - Assert.assertEquals(rs2.sample().size(), rs3.sample().size()); + assertEquals(rs1.streamSize(), rs2.streamSize()); + assertEquals(rs2.streamSize(), rs3.streamSize()); + assertEquals(rs1.sample().size(), rs2.sample().size()); + assertEquals(rs2.sample().size(), rs3.sample().size()); } /** @@ -162,7 +197,7 @@ public void streamSize() { for (int i = 0; i < size; i++) { rs.feed(0); } - Assert.assertEquals(size, rs.streamSize()); + assertEquals(size, rs.streamSize()); } /** @@ -202,8 +237,8 @@ public void feedReturnValue() { Collection sample = new ArrayList<>(); for (int i = 0; i < 65536; i++) { final boolean changed = rs.feed(i); - Assert.assertEquals(changed, !RandomSamplingUtils.samplesEquals(sample, rs.sample())); - Assert.assertEquals(changed, rs.sample().contains(i)); + assertEquals(changed, !RandomSamplingUtils.samplesEquals(sample, rs.sample())); + assertEquals(changed, rs.sample().contains(i)); sample = new ArrayList<>(rs.sample()); } }