Skip to content

Commit

Permalink
Implement sequential Poisson sampling (gstamatelat#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
gstamatelat committed Jul 22, 2020
1 parent 54bb595 commit 4ad8dd4
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 17 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ Signature: `ChaoSampling` implements `WeightedRandomSampling`
- [Chao, M. T. "A general purpose unequal probability sampling plan." Biometrika 69.3 (1982): 653-656.](https://doi.org/10.2307/2336002)
- [Sugden, R. A. "Chao's list sequential scheme for unequal probability sampling." Journal of Applied Statistics 23.4 (1996): 413-421.](https://doi.org/10.1080/02664769624152)

### 7 Algorithm by Ohlsson

Signature: `SequentialPoissonSampling` implements `WeightedRandomSampling`

#### References
- [Ohlsson, Esbjörn. "Sequential poisson sampling." Journal of official Statistics 14.2 (1998): 149.](https://www.mendeley.com/catalogue/95bcff1f-86be-389c-ab3f-717796d22abd/)

## References

[1] [Wikipedia contributors. "Reservoir sampling." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 17 Oct. 2017. Web. 21 Nov. 2017.](https://en.wikipedia.org/wiki/Reservoir_sampling)
Expand Down
255 changes: 255 additions & 0 deletions src/main/java/gr/james/sampling/SequentialPoissonSampling.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package gr.james.sampling;

import java.util.*;

/**
* Implementation of the algorithm by Ohlsson in <b>Sequential Poisson Sampling</b>.
* <p>
* Weighted are not being assigned a particular meaning or have physical interpretation but the resulting inclusion
* probabilities are an approximation of the exact model ({@link ChaoSampling}). Weights are the range (0,+Inf),
* otherwise an {@link IllegalWeightException} is thrown.
* <p>
* This implementation never throws {@link StreamOverflowException}.
* <p>
* The space complexity of this class is {@code O(k)}, where {@code k} is the sample size.
*
* @param <T> the item type
* @author Giorgos Stamatelatos
* @see <a href="https://www.mendeley.com/catalogue/95bcff1f-86be-389c-ab3f-717796d22abd/">Sequential poisson sampling</a>
*/
public class SequentialPoissonSampling<T> implements WeightedRandomSampling<T> {
private final int sampleSize;
private final Random random;
private final PriorityQueue<Weighted<T>> pq;
private final Collection<T> unmodifiableSample;
private long streamSize;

/**
* Construct a new instance of {@link SequentialPoissonSampling} using the specified sample size and RNG. The
* implementation assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks
* to ensure that. If this contract is violated, the behavior is undefined.
*
* @param sampleSize the sample size
* @param random the RNG to use
* @throws NullPointerException if {@code random} is {@code null}
* @throws IllegalArgumentException if {@code sampleSize} is less than 1
*/
public SequentialPoissonSampling(int sampleSize, Random random) {
if (random == null) {
throw new NullPointerException("Random was null");
}
if (sampleSize < 1) {
throw new IllegalArgumentException("Sample size was less than 1");
}
this.random = random;
this.sampleSize = sampleSize;
this.streamSize = 0;
this.pq = new PriorityQueue<>(sampleSize, Comparator.reverseOrder());
this.unmodifiableSample = new AbstractCollection<T>() {
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
final Iterator<Weighted<T>> it = pq.iterator();

@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public T next() {
return it.next().object;
}
};
}

@Override
public int size() {
return pq.size();
}
};
}

/**
* Construct a new instance of {@link SequentialPoissonSampling} using the specified sample size and a default
* source of randomness.
*
* @param sampleSize the sample size
* @throws IllegalArgumentException if {@code sampleSize} is less than 1
*/
public SequentialPoissonSampling(int sampleSize) {
this(sampleSize, new Random());
}

/**
* Get a {@link RandomSamplingCollector} from this class.
*
* @param sampleSize the sample size
* @param random the RNG to use
* @param <E> the type of elements
* @return a {@link RandomSamplingCollector} from this class
*/
public static <E> RandomSamplingCollector<E> collector(int sampleSize, Random random) {
return new RandomSamplingCollector<>(() -> new SequentialPoissonSampling<>(sampleSize, random));
}

/**
* Get a {@link WeightedRandomSamplingCollector} from this class.
*
* @param sampleSize the sample size
* @param random the RNG to use
* @param <E> the type of elements
* @return a {@link WeightedRandomSamplingCollector} from this class
*/
public static <E> WeightedRandomSamplingCollector<E> weightedCollector(int sampleSize, Random random) {
return new WeightedRandomSamplingCollector<>(() -> new SequentialPoissonSampling<>(sampleSize, random));
}

/**
* {@inheritDoc}
*
* @param item {@inheritDoc}
* @param weight {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalWeightException if {@code weight} is outside the range (0,+Inf)
*/
@Override
public boolean feed(T item, double weight) {
// Checks
if (item == null) {
throw new NullPointerException("Item was null");
}
if (weight <= 0) {
throw new IllegalWeightException("Weight was not positive, must be in (0,+Inf)");
}
if (Double.isInfinite(weight)) {
throw new IllegalWeightException("Weight was infinite, must be in (0,+Inf)");
}

// Produce a random value
final double r = RandomSamplingUtils.randomExclusive(random);

// Increase stream size
this.streamSize++;

// Calculate item weight
final Weighted<T> newItem = new Weighted<>(item, r / weight);
assert newItem.weight >= 0.0; // weight can also be 0.0 because of double precision

// Add item to reservoir
if (pq.size() < sampleSize) {
pq.add(newItem);
return true;
} else if (pq.peek().weight > newItem.weight) {
// Seems unfair for equal weight items to not have a chance to get in the sample
// Of course in the long run it hardly matters
assert pq.size() == sampleSize();
pq.poll();
pq.add(newItem);
return true;
}

return false;
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @param weights {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
* @throws IllegalWeightException {@inheritDoc}
*/
@Override
public boolean feed(Iterator<T> items, Iterator<Double> weights) {
return WeightedRandomSampling.super.feed(items, weights);
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalWeightException {@inheritDoc}
*/
@Override
public boolean feed(Map<T, Double> items) {
return WeightedRandomSampling.super.feed(items);
}

/**
* {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
public Collection<T> sample() {
return this.unmodifiableSample;
}

/**
* {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
public final int sampleSize() {
assert this.sampleSize > 0;
return this.sampleSize;
}

/**
* Get the number of items that have been feeded to the algorithm during the lifetime of this instance.
* <p>
* If more than {@link Long#MAX_VALUE} items has been feeded to the instance, {@code streamSize()} will cycle the
* long values, continuing from {@link Long#MIN_VALUE}.
* <p>
* This method runs in constant time.
*
* @return the number of items that have been feeded to the algorithm
*/
@Override
public final long streamSize() {
return this.streamSize;
}

/**
* {@inheritDoc}
*
* @param item {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
@Override
public boolean feed(T item) {
return WeightedRandomSampling.super.feed(item);
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
@Override
public boolean feed(Iterator<T> items) {
return WeightedRandomSampling.super.feed(items);
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
@Override
public boolean feed(Iterable<T> items) {
return WeightedRandomSampling.super.feed(items);
}
}
21 changes: 15 additions & 6 deletions src/main/java/gr/james/sampling/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
* selections of the sampling procedure (implemented in {@code EfraimidisSampling}). As a result, implementations of
* this interface may not exhibit identical behavior, as opposed to the {@link gr.james.sampling.RandomSampling}
* interface. The contract of this interface is, however, that a higher weight value suggests a higher probability for
* an item to be included in the sample. The first case is denoted as <em>P</em> and the second case as <em>W</em> in
* the implementation table below. Implementations may also define certain restrictions on the values of {@code weight}
* and violations will result in {@link gr.james.sampling.IllegalWeightException}. The weight ranges are also available
* in the table.
* an item to be included in the sample. Implementations may also define certain restrictions on the values of
* {@code weight} and violations will result in {@link gr.james.sampling.IllegalWeightException}. The weight ranges are
* available in the table below.
* <h4>Determinism</h4>
* Certain implementations rely on elements of the JRE that are not deterministic, for example
* {@link java.util.PriorityQueue} and {@link java.util.TreeSet}. The side effect of this is that weighted algorithms
Expand Down Expand Up @@ -105,15 +104,23 @@
* <td>Algorithm by Chao [5][6]</td>
* <td>{@code O(k)}</td>
* <td>D</td>
* <td>P (0, +&infin;)</td>
* <td>(0, +&infin;)</td>
* <td>-</td>
* </tr>
* <tr>
* <td>{@link gr.james.sampling.EfraimidisSampling}</td>
* <td>Algorithm A-Res by Efraimidis [7]</td>
* <td>{@code O(k)}</td>
* <td>ND</td>
* <td>W (0, +&infin;)</td>
* <td>(0, +&infin;)</td>
* <td>-</td>
* </tr>
* <tr>
* <td>{@link gr.james.sampling.SequentialPoissonSampling}</td>
* <td>Algorithm by Ohlsson [8]</td>
* <td>{@code O(k)}</td>
* <td>ND</td>
* <td>(0, +&infin;)</td>
* <td>-</td>
* </tr>
* </tbody>
Expand All @@ -133,6 +140,8 @@
* probability sampling." Journal of Applied Statistics 23.4 (1996): 413-421.</a></li>
* <li><a href="https://doi.org/10.1016/j.ipl.2005.11.003">Efraimidis, Pavlos S., and Paul G. Spirakis. "Weighted random
* sampling with a reservoir." Information Processing Letters 97.5 (2006): 181-185.</a></li>
* <li><a href="https://www.mendeley.com/catalogue/95bcff1f-86be-389c-ab3f-717796d22abd/">Ohlsson, Esbjörn. "Sequential
* poisson sampling." Journal of official Statistics 14.2 (1998): 149.</a></li>
* </ol>
*/
package gr.james.sampling;
16 changes: 9 additions & 7 deletions src/test/java/gr/james/sampling/Benchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ public class Benchmark {
private static final LiLSamplingThreadSafe<Object> lilThreadSafe = new LiLSamplingThreadSafe<>(sample, random);
private static final EfraimidisSampling<Object> efraimidis = new EfraimidisSampling<>(sample, random);
private static final ChaoSampling<Object> chao = new ChaoSampling<>(sample, random);
private static final SequentialPoissonSampling<Object> sequentialPoisson = new SequentialPoissonSampling<>(sample, random);

public static void main(String[] args) {
System.out.printf("%10s %5d ms%n", "Waterman", performance(waterman) / 1000000);
System.out.printf("%10s %5d ms%n", "VitterX", performance(vitterx) / 1000000);
System.out.printf("%10s %5d ms%n", "VitterZ", performance(vitterz) / 1000000);
System.out.printf("%10s %5d ms%n", "LiL", performance(lil) / 1000000);
System.out.printf("%10s %5d ms%n", "LiL Thread Safe", performance(lilThreadSafe) / 1000000);
System.out.printf("%10s %5d ms%n", "Efraimidis", performance(efraimidis) / 1000000);
System.out.printf("%10s %5d ms%n", "Chao", performance(chao) / 1000000);
System.out.printf("%18s %5d ms%n", "Waterman", performance(waterman) / 1000000);
System.out.printf("%18s %5d ms%n", "VitterX", performance(vitterx) / 1000000);
System.out.printf("%18s %5d ms%n", "VitterZ", performance(vitterz) / 1000000);
System.out.printf("%18s %5d ms%n", "LiL", performance(lil) / 1000000);
System.out.printf("%18s %5d ms%n", "LiL Thread Safe", performance(lilThreadSafe) / 1000000);
System.out.printf("%18s %5d ms%n", "Efraimidis", performance(efraimidis) / 1000000);
System.out.printf("%18s %5d ms%n", "Chao", performance(chao) / 1000000);
System.out.printf("%18s %5d ms%n", "Sequential Poisson", performance(sequentialPoisson) / 1000000);
}

private static long performance(RandomSampling<Object> alg) {
Expand Down
11 changes: 7 additions & 4 deletions src/test/java/gr/james/sampling/RandomSamplingTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package gr.james.sampling;

import static org.junit.Assert.assertEquals;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -16,6 +14,8 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;

/**
* Tests for unweighted algorithms (and weighted used as unweighted).
*/
Expand All @@ -41,6 +41,7 @@ public static Collection<Supplier<RandomSampling<Integer>>> implementations() {
implementations.add(() -> new LiLSamplingThreadSafe<>(SAMPLE, RANDOM));
implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new SequentialPoissonSampling<>(SAMPLE, RANDOM));
return implementations;
}

Expand All @@ -57,14 +58,14 @@ public void correctness() {
for (int test = 0; test < streamSizes.length; test++) {
final int STREAM = streamSizes[test];
final int numCores = (impl.get() instanceof ThreadSafeRandomSampling) ?
Runtime.getRuntime().availableProcessors() : 1;
Runtime.getRuntime().availableProcessors() : 1;
final int REPS = repsSizes[test];

final AtomicIntegerArray d = new AtomicIntegerArray(STREAM);
ExecutorService executorService = Executors.newFixedThreadPool(numCores);
List<Callable<Void>> taskList = new ArrayList<>(numCores);

for(int core = 0; core < numCores; core++) {
for (int core = 0; core < numCores; core++) {
taskList.add(() -> {

for (int reps = 0; reps < (REPS / numCores); reps++) {
Expand Down Expand Up @@ -133,6 +134,8 @@ public void stream20() {
collector = LiLSampling.collector(SAMPLE, RANDOM);
} else if (alg instanceof LiLSamplingThreadSafe) {
collector = LiLSamplingThreadSafe.collector(SAMPLE, RANDOM);
} else if (alg instanceof SequentialPoissonSampling) {
collector = SequentialPoissonSampling.collector(SAMPLE, RANDOM);
} else {
throw new AssertionError();
}
Expand Down
Loading

0 comments on commit 4ad8dd4

Please sign in to comment.