From 4ad8dd447744fc26c26d283e7f4b40746179d2cc Mon Sep 17 00:00:00 2001 From: Giorgos Stamatelatos Date: Wed, 22 Jul 2020 23:25:32 +0300 Subject: [PATCH] Implement sequential Poisson sampling (#35) --- README.md | 7 + .../sampling/SequentialPoissonSampling.java | 255 ++++++++++++++++++ .../java/gr/james/sampling/package-info.java | 21 +- .../java/gr/james/sampling/Benchmark.java | 16 +- .../gr/james/sampling/RandomSamplingTest.java | 11 +- .../sampling/WeightedRandomSamplingTest.java | 3 + 6 files changed, 296 insertions(+), 17 deletions(-) create mode 100644 src/main/java/gr/james/sampling/SequentialPoissonSampling.java diff --git a/README.md b/README.md index 609067e..e9c3ba9 100644 --- a/README.md +++ b/README.md @@ -158,6 +158,13 @@ Signature: `ChaoSampling` implements `WeightedRandomSampling` - [Chao, M. T. "A general purpose unequal probability sampling plan." Biometrika 69.3 (1982): 653-656.](https://doi.org/10.2307/2336002) - [Sugden, R. A. "Chao's list sequential scheme for unequal probability sampling." Journal of Applied Statistics 23.4 (1996): 413-421.](https://doi.org/10.1080/02664769624152) +### 7 Algorithm by Ohlsson + +Signature: `SequentialPoissonSampling` implements `WeightedRandomSampling` + +#### References +- [Ohlsson, Esbjörn. "Sequential poisson sampling." Journal of official Statistics 14.2 (1998): 149.](https://www.mendeley.com/catalogue/95bcff1f-86be-389c-ab3f-717796d22abd/) + ## References [1] [Wikipedia contributors. "Reservoir sampling." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 17 Oct. 2017. Web. 21 Nov. 2017.](https://en.wikipedia.org/wiki/Reservoir_sampling) diff --git a/src/main/java/gr/james/sampling/SequentialPoissonSampling.java b/src/main/java/gr/james/sampling/SequentialPoissonSampling.java new file mode 100644 index 0000000..75d3071 --- /dev/null +++ b/src/main/java/gr/james/sampling/SequentialPoissonSampling.java @@ -0,0 +1,255 @@ +package gr.james.sampling; + +import java.util.*; + +/** + * Implementation of the algorithm by Ohlsson in Sequential Poisson Sampling. + *

+ * Weighted are not being assigned a particular meaning or have physical interpretation but the resulting inclusion + * probabilities are an approximation of the exact model ({@link ChaoSampling}). Weights are the range (0,+Inf), + * otherwise an {@link IllegalWeightException} is thrown. + *

+ * This implementation never throws {@link StreamOverflowException}. + *

+ * The space complexity of this class is {@code O(k)}, where {@code k} is the sample size. + * + * @param the item type + * @author Giorgos Stamatelatos + * @see Sequential poisson sampling + */ +public class SequentialPoissonSampling implements WeightedRandomSampling { + private final int sampleSize; + private final Random random; + private final PriorityQueue> pq; + private final Collection unmodifiableSample; + private long streamSize; + + /** + * Construct a new instance of {@link SequentialPoissonSampling} using the specified sample size and RNG. The + * implementation assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks + * to ensure that. If this contract is violated, the behavior is undefined. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @throws NullPointerException if {@code random} is {@code null} + * @throws IllegalArgumentException if {@code sampleSize} is less than 1 + */ + public SequentialPoissonSampling(int sampleSize, Random random) { + if (random == null) { + throw new NullPointerException("Random was null"); + } + if (sampleSize < 1) { + throw new IllegalArgumentException("Sample size was less than 1"); + } + this.random = random; + this.sampleSize = sampleSize; + this.streamSize = 0; + this.pq = new PriorityQueue<>(sampleSize, Comparator.reverseOrder()); + this.unmodifiableSample = new AbstractCollection() { + @Override + public Iterator iterator() { + return new Iterator() { + final Iterator> it = pq.iterator(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() { + return it.next().object; + } + }; + } + + @Override + public int size() { + return pq.size(); + } + }; + } + + /** + * Construct a new instance of {@link SequentialPoissonSampling} using the specified sample size and a default + * source of randomness. + * + * @param sampleSize the sample size + * @throws IllegalArgumentException if {@code sampleSize} is less than 1 + */ + public SequentialPoissonSampling(int sampleSize) { + this(sampleSize, new Random()); + } + + /** + * Get a {@link RandomSamplingCollector} from this class. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @param the type of elements + * @return a {@link RandomSamplingCollector} from this class + */ + public static RandomSamplingCollector collector(int sampleSize, Random random) { + return new RandomSamplingCollector<>(() -> new SequentialPoissonSampling<>(sampleSize, random)); + } + + /** + * Get a {@link WeightedRandomSamplingCollector} from this class. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @param the type of elements + * @return a {@link WeightedRandomSamplingCollector} from this class + */ + public static WeightedRandomSamplingCollector weightedCollector(int sampleSize, Random random) { + return new WeightedRandomSamplingCollector<>(() -> new SequentialPoissonSampling<>(sampleSize, random)); + } + + /** + * {@inheritDoc} + * + * @param item {@inheritDoc} + * @param weight {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalWeightException if {@code weight} is outside the range (0,+Inf) + */ + @Override + public boolean feed(T item, double weight) { + // Checks + if (item == null) { + throw new NullPointerException("Item was null"); + } + if (weight <= 0) { + throw new IllegalWeightException("Weight was not positive, must be in (0,+Inf)"); + } + if (Double.isInfinite(weight)) { + throw new IllegalWeightException("Weight was infinite, must be in (0,+Inf)"); + } + + // Produce a random value + final double r = RandomSamplingUtils.randomExclusive(random); + + // Increase stream size + this.streamSize++; + + // Calculate item weight + final Weighted newItem = new Weighted<>(item, r / weight); + assert newItem.weight >= 0.0; // weight can also be 0.0 because of double precision + + // Add item to reservoir + if (pq.size() < sampleSize) { + pq.add(newItem); + return true; + } else if (pq.peek().weight > newItem.weight) { + // Seems unfair for equal weight items to not have a chance to get in the sample + // Of course in the long run it hardly matters + assert pq.size() == sampleSize(); + pq.poll(); + pq.add(newItem); + return true; + } + + return false; + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @param weights {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + * @throws IllegalWeightException {@inheritDoc} + */ + @Override + public boolean feed(Iterator items, Iterator weights) { + return WeightedRandomSampling.super.feed(items, weights); + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalWeightException {@inheritDoc} + */ + @Override + public boolean feed(Map items) { + return WeightedRandomSampling.super.feed(items); + } + + /** + * {@inheritDoc} + * + * @return {@inheritDoc} + */ + @Override + public Collection sample() { + return this.unmodifiableSample; + } + + /** + * {@inheritDoc} + * + * @return {@inheritDoc} + */ + @Override + public final int sampleSize() { + assert this.sampleSize > 0; + return this.sampleSize; + } + + /** + * Get the number of items that have been feeded to the algorithm during the lifetime of this instance. + *

+ * If more than {@link Long#MAX_VALUE} items has been feeded to the instance, {@code streamSize()} will cycle the + * long values, continuing from {@link Long#MIN_VALUE}. + *

+ * This method runs in constant time. + * + * @return the number of items that have been feeded to the algorithm + */ + @Override + public final long streamSize() { + return this.streamSize; + } + + /** + * {@inheritDoc} + * + * @param item {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean feed(T item) { + return WeightedRandomSampling.super.feed(item); + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean feed(Iterator items) { + return WeightedRandomSampling.super.feed(items); + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean feed(Iterable items) { + return WeightedRandomSampling.super.feed(items); + } +} diff --git a/src/main/java/gr/james/sampling/package-info.java b/src/main/java/gr/james/sampling/package-info.java index f398f92..2089fb2 100644 --- a/src/main/java/gr/james/sampling/package-info.java +++ b/src/main/java/gr/james/sampling/package-info.java @@ -34,10 +34,9 @@ * selections of the sampling procedure (implemented in {@code EfraimidisSampling}). As a result, implementations of * this interface may not exhibit identical behavior, as opposed to the {@link gr.james.sampling.RandomSampling} * interface. The contract of this interface is, however, that a higher weight value suggests a higher probability for - * an item to be included in the sample. The first case is denoted as P and the second case as W in - * the implementation table below. Implementations may also define certain restrictions on the values of {@code weight} - * and violations will result in {@link gr.james.sampling.IllegalWeightException}. The weight ranges are also available - * in the table. + * an item to be included in the sample. Implementations may also define certain restrictions on the values of + * {@code weight} and violations will result in {@link gr.james.sampling.IllegalWeightException}. The weight ranges are + * available in the table below. *

Determinism

* Certain implementations rely on elements of the JRE that are not deterministic, for example * {@link java.util.PriorityQueue} and {@link java.util.TreeSet}. The side effect of this is that weighted algorithms @@ -105,7 +104,7 @@ * Algorithm by Chao [5][6] * {@code O(k)} * D - * P (0, +∞) + * (0, +∞) * - * * @@ -113,7 +112,15 @@ * Algorithm A-Res by Efraimidis [7] * {@code O(k)} * ND - * W (0, +∞) + * (0, +∞) + * - + * + * + * {@link gr.james.sampling.SequentialPoissonSampling} + * Algorithm by Ohlsson [8] + * {@code O(k)} + * ND + * (0, +∞) * - * * @@ -133,6 +140,8 @@ * probability sampling." Journal of Applied Statistics 23.4 (1996): 413-421. *
  • Efraimidis, Pavlos S., and Paul G. Spirakis. "Weighted random * sampling with a reservoir." Information Processing Letters 97.5 (2006): 181-185.
  • + *
  • Ohlsson, Esbjörn. "Sequential + * poisson sampling." Journal of official Statistics 14.2 (1998): 149.
  • * */ package gr.james.sampling; diff --git a/src/test/java/gr/james/sampling/Benchmark.java b/src/test/java/gr/james/sampling/Benchmark.java index b193d82..c5c36a0 100644 --- a/src/test/java/gr/james/sampling/Benchmark.java +++ b/src/test/java/gr/james/sampling/Benchmark.java @@ -14,15 +14,17 @@ public class Benchmark { private static final LiLSamplingThreadSafe lilThreadSafe = new LiLSamplingThreadSafe<>(sample, random); private static final EfraimidisSampling efraimidis = new EfraimidisSampling<>(sample, random); private static final ChaoSampling chao = new ChaoSampling<>(sample, random); + private static final SequentialPoissonSampling sequentialPoisson = new SequentialPoissonSampling<>(sample, random); public static void main(String[] args) { - System.out.printf("%10s %5d ms%n", "Waterman", performance(waterman) / 1000000); - System.out.printf("%10s %5d ms%n", "VitterX", performance(vitterx) / 1000000); - System.out.printf("%10s %5d ms%n", "VitterZ", performance(vitterz) / 1000000); - System.out.printf("%10s %5d ms%n", "LiL", performance(lil) / 1000000); - System.out.printf("%10s %5d ms%n", "LiL Thread Safe", performance(lilThreadSafe) / 1000000); - System.out.printf("%10s %5d ms%n", "Efraimidis", performance(efraimidis) / 1000000); - System.out.printf("%10s %5d ms%n", "Chao", performance(chao) / 1000000); + System.out.printf("%18s %5d ms%n", "Waterman", performance(waterman) / 1000000); + System.out.printf("%18s %5d ms%n", "VitterX", performance(vitterx) / 1000000); + System.out.printf("%18s %5d ms%n", "VitterZ", performance(vitterz) / 1000000); + System.out.printf("%18s %5d ms%n", "LiL", performance(lil) / 1000000); + System.out.printf("%18s %5d ms%n", "LiL Thread Safe", performance(lilThreadSafe) / 1000000); + System.out.printf("%18s %5d ms%n", "Efraimidis", performance(efraimidis) / 1000000); + System.out.printf("%18s %5d ms%n", "Chao", performance(chao) / 1000000); + System.out.printf("%18s %5d ms%n", "Sequential Poisson", performance(sequentialPoisson) / 1000000); } private static long performance(RandomSampling alg) { diff --git a/src/test/java/gr/james/sampling/RandomSamplingTest.java b/src/test/java/gr/james/sampling/RandomSamplingTest.java index 45a7175..678a6a8 100644 --- a/src/test/java/gr/james/sampling/RandomSamplingTest.java +++ b/src/test/java/gr/james/sampling/RandomSamplingTest.java @@ -1,7 +1,5 @@ package gr.james.sampling; -import static org.junit.Assert.assertEquals; - import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -16,6 +14,8 @@ import java.util.function.Supplier; import java.util.stream.IntStream; +import static org.junit.Assert.assertEquals; + /** * Tests for unweighted algorithms (and weighted used as unweighted). */ @@ -41,6 +41,7 @@ public static Collection>> implementations() { implementations.add(() -> new LiLSamplingThreadSafe<>(SAMPLE, RANDOM)); implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM)); + implementations.add(() -> new SequentialPoissonSampling<>(SAMPLE, RANDOM)); return implementations; } @@ -57,14 +58,14 @@ public void correctness() { for (int test = 0; test < streamSizes.length; test++) { final int STREAM = streamSizes[test]; final int numCores = (impl.get() instanceof ThreadSafeRandomSampling) ? - Runtime.getRuntime().availableProcessors() : 1; + Runtime.getRuntime().availableProcessors() : 1; final int REPS = repsSizes[test]; final AtomicIntegerArray d = new AtomicIntegerArray(STREAM); ExecutorService executorService = Executors.newFixedThreadPool(numCores); List> taskList = new ArrayList<>(numCores); - for(int core = 0; core < numCores; core++) { + for (int core = 0; core < numCores; core++) { taskList.add(() -> { for (int reps = 0; reps < (REPS / numCores); reps++) { @@ -133,6 +134,8 @@ public void stream20() { collector = LiLSampling.collector(SAMPLE, RANDOM); } else if (alg instanceof LiLSamplingThreadSafe) { collector = LiLSamplingThreadSafe.collector(SAMPLE, RANDOM); + } else if (alg instanceof SequentialPoissonSampling) { + collector = SequentialPoissonSampling.collector(SAMPLE, RANDOM); } else { throw new AssertionError(); } diff --git a/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java b/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java index febb076..2091119 100644 --- a/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java +++ b/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java @@ -30,6 +30,7 @@ public static Collection>> implementati final Collection>> implementations = new ArrayList<>(); implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM)); + implementations.add(() -> new SequentialPoissonSampling<>(SAMPLE, RANDOM)); return implementations; } @@ -85,6 +86,8 @@ public void stream20() { collector = EfraimidisSampling.weightedCollector(SAMPLE, RANDOM); } else if (alg instanceof ChaoSampling) { collector = ChaoSampling.weightedCollector(SAMPLE, RANDOM); + } else if (alg instanceof SequentialPoissonSampling) { + collector = SequentialPoissonSampling.weightedCollector(SAMPLE, RANDOM); } else { throw new AssertionError(); }