From 4903fac4abd1629757f8d5814f5f893a8d2d7980 Mon Sep 17 00:00:00 2001 From: Giorgos Stamatelatos Date: Thu, 8 Sep 2022 23:49:56 +0300 Subject: [PATCH] Incorporate skip function in abstract classes (#53) * Implement SkipFunctionFactory in AbstractRandomSampling constructor * Utilize skip function instead of abstract skipLength method * Remove init and skipLength methods * Make SkipFunction public --- .../sampling/AbstractRandomSampling.java | 47 ++++---------- .../AbstractThreadSafeRandomSampling.java | 47 ++++---------- .../java/gr/james/sampling/LiLSampling.java | 39 +----------- .../james/sampling/LiLSamplingThreadSafe.java | 41 +------------ .../java/gr/james/sampling/SkipFunction.java | 3 +- .../james/sampling/SkipFunctionFactory.java | 18 ++++++ .../gr/james/sampling/VitterXSampling.java | 28 +-------- .../gr/james/sampling/VitterZSampling.java | 61 +------------------ .../gr/james/sampling/WatermanSampling.java | 22 +------ 9 files changed, 48 insertions(+), 258 deletions(-) create mode 100644 src/main/java/gr/james/sampling/SkipFunctionFactory.java diff --git a/src/main/java/gr/james/sampling/AbstractRandomSampling.java b/src/main/java/gr/james/sampling/AbstractRandomSampling.java index d822587..997295f 100644 --- a/src/main/java/gr/james/sampling/AbstractRandomSampling.java +++ b/src/main/java/gr/james/sampling/AbstractRandomSampling.java @@ -5,12 +5,6 @@ /** * This class provides a skeletal implementation of the {@link RandomSampling} interface to minimize the effort required * to implement that interface. - *

- * This class requires the implementation of 2 methods: - *

* * @param the item type * @author Giorgos Stamatelatos @@ -46,30 +40,36 @@ public abstract class AbstractRandomSampling implements RandomSampling { */ protected long skip; + /** + * The skip function. + */ + protected final SkipFunction skipFunction; + /** * Construct a new instance of this class using the specified sample size and RNG. The implementation assumes that * {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure that. If this * contract is violated, the behavior is undefined. * - * @param sampleSize the sample size - * @param random the RNG to use + * @param sampleSize the sample size + * @param random the RNG to use + * @param skipFunctionFactory the factory for the skip function * @throws NullPointerException if {@code random} is {@code null} * @throws IllegalArgumentException if {@code sampleSize} is less than 1 */ - protected AbstractRandomSampling(int sampleSize, Random random) { + protected AbstractRandomSampling(int sampleSize, Random random, SkipFunctionFactory skipFunctionFactory) { if (random == null) { throw new NullPointerException("Random was null"); } if (sampleSize < 1) { throw new IllegalArgumentException("Sample size was less than 1"); } - init(sampleSize, random); this.random = random; this.sampleSize = sampleSize; this.streamSize = 0; this.sample = new ArrayList<>(sampleSize); - this.skip = skipLength(sampleSize, sampleSize, random); this.unmodifiableSample = Collections.unmodifiableList(sample); + this.skipFunction = skipFunctionFactory.create(sampleSize, random); + this.skip = skipFunction.skip(); } /** @@ -111,7 +111,7 @@ public boolean feed(T item) { // Accept and generate new skip assert skip == 0; sample.set(random.nextInt(sampleSize), item); - skip = skipLength(streamSize, sampleSize, random); + skip = skipFunction.skip(); assert this.skip >= 0; return true; } @@ -176,27 +176,4 @@ public long streamSize() { public Collection sample() { return this.unmodifiableSample; } - - /** - * Returns how many items should the algorithm skip given its state. - *

- * The implementation of this method relies on the given arguments and not on the state of the instance. - * - * @param streamSize how many items have been fed to the sampler - * @param sampleSize expected sample size - * @param random the {@link Random} instance to use - * @return how many items to skip - */ - protected abstract long skipLength(long streamSize, int sampleSize, Random random); - - /** - * Performs initialization logic. - *

- * This method is invoked in the constructor. - * - * @param sampleSize expected sample size - * @param random the {@link Random} instance assigned to this instance - */ - protected void init(int sampleSize, Random random) { - } } diff --git a/src/main/java/gr/james/sampling/AbstractThreadSafeRandomSampling.java b/src/main/java/gr/james/sampling/AbstractThreadSafeRandomSampling.java index de2330c..81ff25c 100644 --- a/src/main/java/gr/james/sampling/AbstractThreadSafeRandomSampling.java +++ b/src/main/java/gr/james/sampling/AbstractThreadSafeRandomSampling.java @@ -8,12 +8,6 @@ /** * This class provides a skeletal implementation of the thread-safe variant of the {@link RandomSampling} interface to * minimize the effort required to implement that interface. - *

- * This class requires the implementation of 2 methods: - *

    - *
  • {@link #skipLength(long, int, Random)}
  • - *
  • {@link #init(int, Random)}
  • - *
* * @param the item type * @author Giorgos Stamatelatos @@ -55,31 +49,37 @@ public abstract class AbstractThreadSafeRandomSampling implements RandomSampl */ protected AtomicLong skip; + /** + * The skip function. + */ + protected final SkipFunction skipFunction; + /** * Construct a new instance of this class using the specified sample size and RNG. The implementation assumes that * {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure that. If this * contract is violated, the behavior is undefined. * - * @param sampleSize the sample size - * @param random the RNG to use + * @param sampleSize the sample size + * @param random the RNG to use + * @param skipFunctionFactory the factory for the skip function * @throws NullPointerException if {@code random} is {@code null} * @throws IllegalArgumentException if {@code sampleSize} is less than 1 */ - protected AbstractThreadSafeRandomSampling(int sampleSize, Random random) { + protected AbstractThreadSafeRandomSampling(int sampleSize, Random random, SkipFunctionFactory skipFunctionFactory) { if (random == null) { throw new NullPointerException("Random was null"); } if (sampleSize < 1) { throw new IllegalArgumentException("Sample size was less than 1"); } - init(sampleSize, random); this.random = random; this.sampleSize = sampleSize; this.streamSize = new AtomicLong(0); this.sample = new AtomicReferenceArray<>(sampleSize); this.samplesCount = new AtomicInteger(0); - this.skip = new AtomicLong(skipLength(sampleSize, sampleSize, random)); this.unmodifiableSample = new AtomicReferenceArrayList<>(sample, samplesCount); + this.skipFunction = skipFunctionFactory.create(sampleSize, random); + this.skip = new AtomicLong(skipFunction.skip()); } /** @@ -125,7 +125,7 @@ public final boolean feed(T item) { } } else { assert currentSkipValue == 0; - long nextSkipValue = skipLength(streamSize, sampleSize, random); + long nextSkipValue = skipFunction.skip(); boolean skipCountUpdated = skip.compareAndSet(currentSkipValue, nextSkipValue); if (skipCountUpdated) { sample.set(random.nextInt(sampleSize), item); @@ -197,29 +197,6 @@ public final Collection sample() { return this.unmodifiableSample; } - /** - * Returns how many items should the algorithm skip given its state. - *

- * The implementation of this method must only rely on the given arguments and not on the state of the instance. - * - * @param streamSize how many items have been fed to the sampler - * @param sampleSize expected sample size - * @param random the {@link Random} instance to use - * @return how many items to skip - */ - protected abstract long skipLength(long streamSize, int sampleSize, Random random); - - /** - * Performs initialization logic. - *

- * This method is invoked in the constructor. - * - * @param sampleSize expected sample size - * @param random the {@link Random} instance assigned to this instance - */ - protected void init(int sampleSize, Random random) { - } - static class AtomicReferenceArrayList extends AbstractList implements List, RandomAccess { private final AtomicReferenceArray array; diff --git a/src/main/java/gr/james/sampling/LiLSampling.java b/src/main/java/gr/james/sampling/LiLSampling.java index 6800f45..ea7efa6 100644 --- a/src/main/java/gr/james/sampling/LiLSampling.java +++ b/src/main/java/gr/james/sampling/LiLSampling.java @@ -21,8 +21,6 @@ * O(n(1 + log(N/n))) */ public class LiLSampling extends AbstractRandomSampling { - private double W; - /** * Construct a new instance of {@link LiLSampling} using the specified sample size and RNG. The implementation * assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure that. @@ -34,7 +32,7 @@ public class LiLSampling extends AbstractRandomSampling { * @throws IllegalArgumentException if {@code sampleSize} is less than 1 */ public LiLSampling(int sampleSize, Random random) { - super(sampleSize, random); + super(sampleSize, random, LiLSkipFunction::new); } /** @@ -60,41 +58,6 @@ public static RandomSamplingCollector collector(int sampleSize, Random ra return new RandomSamplingCollector<>(() -> new LiLSampling<>(sampleSize, random)); } - /** - * {@inheritDoc} - * - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - */ - @Override - protected void init(int sampleSize, Random random) { - // W = Math.exp(Math.log(RandomSamplingUtils.randomExclusive(random)) / sampleSize); - W = Math.pow(RandomSamplingUtils.randomExclusive(random), 1.0 / sampleSize); - } - - /** - * {@inheritDoc} - * - * @param streamSize {@inheritDoc} - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - * @return {@inheritDoc} - */ - @Override - protected long skipLength(long streamSize, int sampleSize, Random random) { - final double random1 = RandomSamplingUtils.randomExclusive(random); - final double random2 = RandomSamplingUtils.randomExclusive(random); - long skip = (long) (Math.log(random1) / Math.log(1 - W)); - assert skip >= 0 || skip == Long.MIN_VALUE; - if (skip == Long.MIN_VALUE) { // Sometimes when W is very small, 1 - W = 1 and Math.log(1) = +0 instead of -0 - skip = Long.MAX_VALUE; // This results in negative infinity skip - } - // W = W * Math.exp(Math.log(random2) / sampleSize); - W = W * Math.pow(random2, 1.0 / sampleSize); - return skip; - } - - @Deprecated private static class LiLSkipFunction implements SkipFunction { private final int sampleSize; private final Random random; diff --git a/src/main/java/gr/james/sampling/LiLSamplingThreadSafe.java b/src/main/java/gr/james/sampling/LiLSamplingThreadSafe.java index 5a1a90a..c0c7972 100644 --- a/src/main/java/gr/james/sampling/LiLSamplingThreadSafe.java +++ b/src/main/java/gr/james/sampling/LiLSamplingThreadSafe.java @@ -27,8 +27,6 @@ */ public class LiLSamplingThreadSafe extends AbstractThreadSafeRandomSampling { - private AtomicLong W; - /** * Construct a new instance of {@link LiLSamplingThreadSafe} using the specified sample size and RNG. The * implementation assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks @@ -40,7 +38,7 @@ public class LiLSamplingThreadSafe extends AbstractThreadSafeRandomSampling RandomSamplingCollector collector(int sampleSize, Random ra return new RandomSamplingCollector<>(() -> new LiLSamplingThreadSafe<>(sampleSize, random)); } - /** - * {@inheritDoc} - * - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - */ - @Override - protected void init(int sampleSize, Random random) { - //W = Math.pow(RandomSamplingUtils.randomExclusive(random), 1.0 / sampleSize); - W = new AtomicLong(); - W.set(Double.doubleToLongBits(Math.pow(RandomSamplingUtils.randomExclusive(random), 1.0 / sampleSize))); - } - - /** - * {@inheritDoc} - * - * @param streamSize {@inheritDoc} - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - * @return {@inheritDoc} - */ - @Override - protected long skipLength(long streamSize, int sampleSize, Random random) { - final double random1 = RandomSamplingUtils.randomExclusive(random); - final double random2 = RandomSamplingUtils.randomExclusive(random); - double w = Double.longBitsToDouble(W.get()); - long skip = (long) (Math.log(random1) / Math.log(1 - w)); - assert skip >= 0 || skip == Long.MIN_VALUE; - if (skip == Long.MIN_VALUE) { // Sometimes when W is very small, 1 - W = 1 and Math.log(1) = +0 instead of -0 - skip = Long.MAX_VALUE; // This results in negative infinity skip - } - // W = W * Math.pow(random2, 1.0 / sampleSize); - W.set(Double.doubleToLongBits(w * Math.pow(random2, 1.0 / sampleSize))); - return skip; - } - - @Deprecated private static class LiLThreadSafeSkipFunction implements SkipFunction { private final int sampleSize; private final Random random; diff --git a/src/main/java/gr/james/sampling/SkipFunction.java b/src/main/java/gr/james/sampling/SkipFunction.java index bc4d7f0..629bf9b 100644 --- a/src/main/java/gr/james/sampling/SkipFunction.java +++ b/src/main/java/gr/james/sampling/SkipFunction.java @@ -5,9 +5,8 @@ * reservoir. The {@code SkipFunction} works similarly to an iterator: it's {@link #skip()} method returns the skip * counts in temporal order as the stream increases. */ -@Deprecated @FunctionalInterface -interface SkipFunction { +public interface SkipFunction { /** * Returns a {@code long} indicating how many elements the algorithm must skip. *

diff --git a/src/main/java/gr/james/sampling/SkipFunctionFactory.java b/src/main/java/gr/james/sampling/SkipFunctionFactory.java new file mode 100644 index 0000000..faad7f2 --- /dev/null +++ b/src/main/java/gr/james/sampling/SkipFunctionFactory.java @@ -0,0 +1,18 @@ +package gr.james.sampling; + +import java.util.Random; + +/** + * A factory of {@link SkipFunction} instances. + */ +@FunctionalInterface +public interface SkipFunctionFactory { + /** + * Create a new {@link SkipFunction} instance. + * + * @param sampleSize the given sample size + * @param random the given {@link Random} instance + * @return the new {@link SkipFunction} instance + */ + SkipFunction create(int sampleSize, Random random); +} diff --git a/src/main/java/gr/james/sampling/VitterXSampling.java b/src/main/java/gr/james/sampling/VitterXSampling.java index 1080703..3a4f1d9 100644 --- a/src/main/java/gr/james/sampling/VitterXSampling.java +++ b/src/main/java/gr/james/sampling/VitterXSampling.java @@ -30,7 +30,7 @@ public class VitterXSampling extends AbstractRandomSampling { * @throws IllegalArgumentException if {@code sampleSize} is less than 1 */ public VitterXSampling(int sampleSize, Random random) { - super(sampleSize, random); + super(sampleSize, random, VitterXSkipFunction::new); } /** @@ -56,32 +56,6 @@ public static RandomSamplingCollector collector(int sampleSize, Random ra return new RandomSamplingCollector<>(() -> new VitterXSampling<>(sampleSize, random)); } - /** - * {@inheritDoc} - * - * @param streamSize {@inheritDoc} - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - * @return {@inheritDoc} - */ - @Override - protected long skipLength(long streamSize, int sampleSize, Random random) { - streamSize++; - - final double r = random.nextDouble(); - long skipCount = 0; - - double quot = (streamSize - sampleSize) / (double) streamSize; - while (quot > r && streamSize > 0) { - skipCount++; - streamSize++; - quot = (quot * (streamSize - sampleSize)) / (double) streamSize; - } - - return skipCount; - } - - @Deprecated private static class VitterXSkipFunction implements SkipFunction { private final int sampleSize; private final Random random; diff --git a/src/main/java/gr/james/sampling/VitterZSampling.java b/src/main/java/gr/james/sampling/VitterZSampling.java index d1de0aa..3b3c1dc 100644 --- a/src/main/java/gr/james/sampling/VitterZSampling.java +++ b/src/main/java/gr/james/sampling/VitterZSampling.java @@ -19,8 +19,6 @@ * @see Random Sampling with a Reservoir */ public class VitterZSampling extends AbstractRandomSampling { - private double W; - /** * Construct a new instance of {@link VitterZSampling} using the specified sample size and RNG. The implementation * assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure that. @@ -32,7 +30,7 @@ public class VitterZSampling extends AbstractRandomSampling { * @throws IllegalArgumentException if {@code sampleSize} is less than 1 */ public VitterZSampling(int sampleSize, Random random) { - super(sampleSize, random); + super(sampleSize, random, VitterZSkipFunction::new); } /** @@ -58,63 +56,6 @@ public static RandomSamplingCollector collector(int sampleSize, Random ra return new RandomSamplingCollector<>(() -> new VitterZSampling<>(sampleSize, random)); } - /** - * {@inheritDoc} - * - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - */ - @Override - protected void init(int sampleSize, Random random) { - W = Math.pow(random.nextDouble(), -1.0 / sampleSize); - } - - /** - * {@inheritDoc} - * - * @param streamSize {@inheritDoc} - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - * @return {@inheritDoc} - */ - @Override - protected long skipLength(long streamSize, int sampleSize, Random random) { - double term = streamSize - sampleSize + 1; - while (true) { - // Generate U and X - double U = RandomSamplingUtils.randomExclusive(random); - double X = streamSize * (this.W - 1.0); - long G = (long) X; - // Test if U <= h(G) / cg(X) - double lhs = Math.pow(((U * Math.pow(((streamSize + 1) / term), 2)) * (term + G)) / (streamSize + X), 1.0 / sampleSize); - double rhs = (((streamSize + X) / (term + G)) * term) / streamSize; - if (lhs < rhs) { - this.W = rhs / lhs; - return G; - } - // Test if U <= f(G) / cg(X) - double y = (((U * (streamSize + 1)) / term) * (streamSize + G + 1)) / (streamSize + X); - double denom; - double numer_lim; - if (sampleSize < G) { - denom = streamSize; - numer_lim = term + G; - } else { - denom = streamSize - sampleSize + G; - numer_lim = streamSize + 1; - } - for (long numer = streamSize + G; numer >= numer_lim; numer--) { - y = (y * numer) / denom; - denom = denom - 1; - } - this.W = Math.pow(random.nextDouble(), -1.0 / sampleSize); - if (Math.pow(y, 1.0 / sampleSize) <= (streamSize + X) / streamSize) { - return G; - } - } - } - - @Deprecated private static class VitterZSkipFunction implements SkipFunction { private final int sampleSize; private final Random random; diff --git a/src/main/java/gr/james/sampling/WatermanSampling.java b/src/main/java/gr/james/sampling/WatermanSampling.java index f941dd6..43de387 100644 --- a/src/main/java/gr/james/sampling/WatermanSampling.java +++ b/src/main/java/gr/james/sampling/WatermanSampling.java @@ -30,7 +30,7 @@ public class WatermanSampling extends AbstractRandomSampling { * @throws IllegalArgumentException if {@code sampleSize} is less than 1 */ public WatermanSampling(int sampleSize, Random random) { - super(sampleSize, random); + super(sampleSize, random, WatermanSkipFunction::new); } /** @@ -56,26 +56,6 @@ public static RandomSamplingCollector collector(int sampleSize, Random ra return new RandomSamplingCollector<>(() -> new WatermanSampling<>(sampleSize, random)); } - /** - * {@inheritDoc} - * - * @param streamSize {@inheritDoc} - * @param sampleSize {@inheritDoc} - * @param random {@inheritDoc} - * @return {@inheritDoc} - */ - @Override - protected long skipLength(long streamSize, int sampleSize, Random random) { - streamSize++; - long skipCount = 0; - while (random.nextDouble() * streamSize >= sampleSize && streamSize > 0) { - streamSize++; - skipCount++; - } - return skipCount; - } - - @Deprecated private static class WatermanSkipFunction implements SkipFunction { private final int sampleSize; private final Random random;