From f0075b912456e240ee654e189fb67c7b0d506d5e Mon Sep 17 00:00:00 2001 From: Giorgos Stamatelatos Date: Thu, 23 Jul 2020 15:00:30 +0300 Subject: [PATCH] Implement ParetoSampling (#36) --- README.md | 9 + .../gr/james/sampling/ParetoSampling.java | 261 ++++++++++++++++++ .../java/gr/james/sampling/package-info.java | 12 + .../java/gr/james/sampling/Benchmark.java | 2 + .../gr/james/sampling/RandomSamplingTest.java | 9 +- .../sampling/WeightedRandomSamplingTest.java | 3 + 6 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 src/main/java/gr/james/sampling/ParetoSampling.java diff --git a/README.md b/README.md index 7da0c61..d9d085c 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ System.out.println(sample); | `EfraimidisSampling` | Algorithm A-Res by Efraimidis | `O(k)` | ✔ | | `ChaoSampling` | Algorithm by Chao | `O(k)` | ✔ | | `SequentialPoissonSampling` | Algorithm by Ohlsson | `O(k)` | ✔ | +| `ParetoSampling` | Algorithm by Rosén | `O(k)` | ✔ | ### 1 Algorithm R by Waterman @@ -166,6 +167,14 @@ Signature: `SequentialPoissonSampling` implements `WeightedRandomSampling` #### References - [Ohlsson, Esbjörn. "Sequential poisson sampling." Journal of official Statistics 14.2 (1998): 149.](https://www.mendeley.com/catalogue/95bcff1f-86be-389c-ab3f-717796d22abd/) +### 7 Algorithm by Rosén + +Signature: `ParetoSampling` implements `WeightedRandomSampling` + +#### References +- [Rosén, Bengt. "Asymptotic theory for order sampling." Journal of Statistical Planning and Inference 62.2 (1997): 135-158.](https://doi.org/10.1016/S0378-3758(96)00185-1) +- [Rosén, Bengt. "On sampling with probability proportional to size." Journal of statistical planning and inference 62.2 (1997): 159-191.](https://doi.org/10.1016/S0378-3758(96)00186-3) + ## References [1] [Wikipedia contributors. "Reservoir sampling." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 17 Oct. 2017. Web. 21 Nov. 2017.](https://en.wikipedia.org/wiki/Reservoir_sampling) diff --git a/src/main/java/gr/james/sampling/ParetoSampling.java b/src/main/java/gr/james/sampling/ParetoSampling.java new file mode 100644 index 0000000..bba21be --- /dev/null +++ b/src/main/java/gr/james/sampling/ParetoSampling.java @@ -0,0 +1,261 @@ +package gr.james.sampling; + +import java.util.*; + +/** + * Implementation of the algorithm by Rosén in On sampling with probability proportional to size. + *

+ * Weighted are not being assigned a particular meaning or have physical interpretation but the resulting inclusion + * probabilities are an approximation of the exact model ({@link ChaoSampling}). Weights must be in the range (0,+Inf) + * but not the value {@code 1.0}, otherwise an {@link IllegalWeightException} is thrown. A side effect of this is that + * the signatures {@link #feed(Object)}, {@link #feed(Iterable)} and {@link #feed(Iterator)} will always throw + * {@link IllegalWeightException}. + *

+ * This implementation never throws {@link StreamOverflowException}. + *

+ * The space complexity of this class is {@code O(k)}, where {@code k} is the sample size. + * + * @param the item type + * @author Giorgos Stamatelatos + * @see Asymptotic theory for order sampling + * @see On sampling with probability proportional to size + */ +public class ParetoSampling implements WeightedRandomSampling { + private final int sampleSize; + private final Random random; + private final PriorityQueue> pq; + private final Collection unmodifiableSample; + private long streamSize; + + /** + * Construct a new instance of {@link ParetoSampling} using the specified sample size and RNG. The implementation + * assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure + * that. If this contract is violated, the behavior is undefined. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @throws NullPointerException if {@code random} is {@code null} + * @throws IllegalArgumentException if {@code sampleSize} is less than 1 + */ + public ParetoSampling(int sampleSize, Random random) { + if (random == null) { + throw new NullPointerException("Random was null"); + } + if (sampleSize < 1) { + throw new IllegalArgumentException("Sample size was less than 1"); + } + this.random = random; + this.sampleSize = sampleSize; + this.streamSize = 0; + this.pq = new PriorityQueue<>(sampleSize, Comparator.reverseOrder()); + this.unmodifiableSample = new AbstractCollection() { + @Override + public Iterator iterator() { + return new Iterator() { + final Iterator> it = pq.iterator(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() { + return it.next().object; + } + }; + } + + @Override + public int size() { + return pq.size(); + } + }; + } + + /** + * Construct a new instance of {@link ParetoSampling} using the specified sample size and a default source of + * randomness. + * + * @param sampleSize the sample size + * @throws IllegalArgumentException if {@code sampleSize} is less than 1 + */ + public ParetoSampling(int sampleSize) { + this(sampleSize, new Random()); + } + + /** + * Get a {@link RandomSamplingCollector} from this class. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @param the type of elements + * @return a {@link RandomSamplingCollector} from this class + */ + public static RandomSamplingCollector collector(int sampleSize, Random random) { + return new RandomSamplingCollector<>(() -> new ParetoSampling<>(sampleSize, random)); + } + + /** + * Get a {@link WeightedRandomSamplingCollector} from this class. + * + * @param sampleSize the sample size + * @param random the RNG to use + * @param the type of elements + * @return a {@link WeightedRandomSamplingCollector} from this class + */ + public static WeightedRandomSamplingCollector weightedCollector(int sampleSize, Random random) { + return new WeightedRandomSamplingCollector<>(() -> new ParetoSampling<>(sampleSize, random)); + } + + /** + * {@inheritDoc} + * + * @param item {@inheritDoc} + * @param weight {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalWeightException if {@code weight} is outside the range (0,+Inf) + */ + @Override + public boolean feed(T item, double weight) { + // Checks + if (item == null) { + throw new NullPointerException("Item was null"); + } + if (weight <= 0) { + throw new IllegalWeightException("Weight was not positive, must be in (0,+Inf) but not 1"); + } + if (Double.isInfinite(weight)) { + throw new IllegalWeightException("Weight was infinite, must be in (0,+Inf) but not 1"); + } + if (weight == 1.0) { + throw new IllegalWeightException("Weight was 1, must be in (0,+Inf) but not 1"); + } + + // Produce a random value + final double r = RandomSamplingUtils.randomExclusive(random); + + // Increase stream size + this.streamSize++; + + // Calculate item weight + final Weighted newItem = new Weighted<>(item, (r * (1 - weight)) / ((1 - r) * weight)); + assert newItem.weight >= 0.0; // weight can also be 0.0 because of double precision + + // Add item to reservoir + if (pq.size() < sampleSize) { + pq.add(newItem); + return true; + } else if (pq.peek().weight > newItem.weight) { + // Seems unfair for equal weight items to not have a chance to get in the sample + // Of course in the long run it hardly matters + assert pq.size() == sampleSize(); + pq.poll(); + pq.add(newItem); + return true; + } + + return false; + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @param weights {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + * @throws IllegalWeightException {@inheritDoc} + */ + @Override + public boolean feed(Iterator items, Iterator weights) { + return WeightedRandomSampling.super.feed(items, weights); + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalWeightException {@inheritDoc} + */ + @Override + public boolean feed(Map items) { + return WeightedRandomSampling.super.feed(items); + } + + /** + * {@inheritDoc} + * + * @return {@inheritDoc} + */ + @Override + public Collection sample() { + return this.unmodifiableSample; + } + + /** + * {@inheritDoc} + * + * @return {@inheritDoc} + */ + @Override + public final int sampleSize() { + assert this.sampleSize > 0; + return this.sampleSize; + } + + /** + * Get the number of items that have been feeded to the algorithm during the lifetime of this instance. + *

+ * If more than {@link Long#MAX_VALUE} items has been feeded to the instance, {@code streamSize()} will cycle the + * long values, continuing from {@link Long#MIN_VALUE}. + *

+ * This method runs in constant time. + * + * @return the number of items that have been feeded to the algorithm + */ + @Override + public final long streamSize() { + return this.streamSize; + } + + /** + * {@inheritDoc} + * + * @param item {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean feed(T item) { + return WeightedRandomSampling.super.feed(item); + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean feed(Iterator items) { + return WeightedRandomSampling.super.feed(items); + } + + /** + * {@inheritDoc} + * + * @param items {@inheritDoc} + * @return {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean feed(Iterable items) { + return WeightedRandomSampling.super.feed(items); + } +} diff --git a/src/main/java/gr/james/sampling/package-info.java b/src/main/java/gr/james/sampling/package-info.java index 2089fb2..308a7fe 100644 --- a/src/main/java/gr/james/sampling/package-info.java +++ b/src/main/java/gr/james/sampling/package-info.java @@ -123,6 +123,14 @@ * (0, +∞) * - * + * + * {@link gr.james.sampling.ParetoSampling} + * Algorithm by Rosén [9][10] + * {@code O(k)} + * ND + * (0, +∞) except 1.0 + * - + * * * *

References

@@ -142,6 +150,10 @@ * sampling with a reservoir." Information Processing Letters 97.5 (2006): 181-185. *
  • Ohlsson, Esbjörn. "Sequential * poisson sampling." Journal of official Statistics 14.2 (1998): 149.
  • + *
  • Rosén, Bengt. "Asymptotic theory for order sampling." + * Journal of Statistical Planning and Inference 62.2 (1997): 135-158.
  • + *
  • Rosén, Bengt. "On sampling with probability proportional + * to size." Journal of statistical planning and inference 62.2 (1997): 159-191.
  • * */ package gr.james.sampling; diff --git a/src/test/java/gr/james/sampling/Benchmark.java b/src/test/java/gr/james/sampling/Benchmark.java index c5c36a0..ca20f90 100644 --- a/src/test/java/gr/james/sampling/Benchmark.java +++ b/src/test/java/gr/james/sampling/Benchmark.java @@ -15,6 +15,7 @@ public class Benchmark { private static final EfraimidisSampling efraimidis = new EfraimidisSampling<>(sample, random); private static final ChaoSampling chao = new ChaoSampling<>(sample, random); private static final SequentialPoissonSampling sequentialPoisson = new SequentialPoissonSampling<>(sample, random); + private static final ParetoSampling pareto = new ParetoSampling<>(sample, random); public static void main(String[] args) { System.out.printf("%18s %5d ms%n", "Waterman", performance(waterman) / 1000000); @@ -25,6 +26,7 @@ public static void main(String[] args) { System.out.printf("%18s %5d ms%n", "Efraimidis", performance(efraimidis) / 1000000); System.out.printf("%18s %5d ms%n", "Chao", performance(chao) / 1000000); System.out.printf("%18s %5d ms%n", "Sequential Poisson", performance(sequentialPoisson) / 1000000); + System.out.printf("%18s %5d ms%n", "Pareto", performance(pareto) / 1000000); } private static long performance(RandomSampling alg) { diff --git a/src/test/java/gr/james/sampling/RandomSamplingTest.java b/src/test/java/gr/james/sampling/RandomSamplingTest.java index 678a6a8..c094591 100644 --- a/src/test/java/gr/james/sampling/RandomSamplingTest.java +++ b/src/test/java/gr/james/sampling/RandomSamplingTest.java @@ -42,6 +42,7 @@ public static Collection>> implementations() { implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new SequentialPoissonSampling<>(SAMPLE, RANDOM)); + implementations.add(() -> new ParetoSampling<>(SAMPLE, RANDOM)); return implementations; } @@ -72,7 +73,11 @@ public void correctness() { final RandomSampling alg = impl.get(); for (int i = 0; i < STREAM; i++) { - alg.feed(i); + if (alg instanceof ParetoSampling) { + ((ParetoSampling) alg).feed(i, 0.5); + } else { + alg.feed(i); + } } for (int s : alg.sample()) { @@ -136,6 +141,8 @@ public void stream20() { collector = LiLSamplingThreadSafe.collector(SAMPLE, RANDOM); } else if (alg instanceof SequentialPoissonSampling) { collector = SequentialPoissonSampling.collector(SAMPLE, RANDOM); + } else if (alg instanceof ParetoSampling) { + collector = ParetoSampling.collector(SAMPLE, RANDOM); } else { throw new AssertionError(); } diff --git a/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java b/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java index 2091119..ce70077 100644 --- a/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java +++ b/src/test/java/gr/james/sampling/WeightedRandomSamplingTest.java @@ -31,6 +31,7 @@ public static Collection>> implementati implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM)); implementations.add(() -> new SequentialPoissonSampling<>(SAMPLE, RANDOM)); + implementations.add(() -> new ParetoSampling<>(SAMPLE, RANDOM)); return implementations; } @@ -88,6 +89,8 @@ public void stream20() { collector = ChaoSampling.weightedCollector(SAMPLE, RANDOM); } else if (alg instanceof SequentialPoissonSampling) { collector = SequentialPoissonSampling.weightedCollector(SAMPLE, RANDOM); + } else if (alg instanceof ParetoSampling) { + collector = ParetoSampling.weightedCollector(SAMPLE, RANDOM); } else { throw new AssertionError(); }