Skip to content

Commit

Permalink
Implement ParetoSampling (gstamatelat#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
gstamatelat committed Jul 23, 2020
1 parent 747dd5f commit f0075b9
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 1 deletion.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ System.out.println(sample);
| `EfraimidisSampling` | Algorithm A-Res by Efraimidis | `O(k)` | ✔ |
| `ChaoSampling` | Algorithm by Chao | `O(k)` | ✔ |
| `SequentialPoissonSampling` | Algorithm by Ohlsson | `O(k)` | ✔ |
| `ParetoSampling` | Algorithm by Rosén | `O(k)` | ✔ |

### 1 Algorithm R by Waterman

Expand Down Expand Up @@ -166,6 +167,14 @@ Signature: `SequentialPoissonSampling` implements `WeightedRandomSampling`
#### References
- [Ohlsson, Esbjörn. "Sequential poisson sampling." Journal of official Statistics 14.2 (1998): 149.](https://www.mendeley.com/catalogue/95bcff1f-86be-389c-ab3f-717796d22abd/)

### 7 Algorithm by Rosén

Signature: `ParetoSampling` implements `WeightedRandomSampling`

#### References
- [Rosén, Bengt. "Asymptotic theory for order sampling." Journal of Statistical Planning and Inference 62.2 (1997): 135-158.](https://doi.org/10.1016/S0378-3758(96)00185-1)
- [Rosén, Bengt. "On sampling with probability proportional to size." Journal of statistical planning and inference 62.2 (1997): 159-191.](https://doi.org/10.1016/S0378-3758(96)00186-3)

## References

[1] [Wikipedia contributors. "Reservoir sampling." Wikipedia, The Free Encyclopedia. Wikipedia, The Free Encyclopedia, 17 Oct. 2017. Web. 21 Nov. 2017.](https://en.wikipedia.org/wiki/Reservoir_sampling)
Expand Down
261 changes: 261 additions & 0 deletions src/main/java/gr/james/sampling/ParetoSampling.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package gr.james.sampling;

import java.util.*;

/**
* Implementation of the algorithm by Rosén in <b>On sampling with probability proportional to size</b>.
* <p>
* Weighted are not being assigned a particular meaning or have physical interpretation but the resulting inclusion
* probabilities are an approximation of the exact model ({@link ChaoSampling}). Weights must be in the range (0,+Inf)
* but not the value {@code 1.0}, otherwise an {@link IllegalWeightException} is thrown. A side effect of this is that
* the signatures {@link #feed(Object)}, {@link #feed(Iterable)} and {@link #feed(Iterator)} will always throw
* {@link IllegalWeightException}.
* <p>
* This implementation never throws {@link StreamOverflowException}.
* <p>
* The space complexity of this class is {@code O(k)}, where {@code k} is the sample size.
*
* @param <T> the item type
* @author Giorgos Stamatelatos
* @see <a href="https://doi.org/10.1016/S0378-3758(96)00185-1">Asymptotic theory for order sampling</a>
* @see <a href="https://doi.org/10.1016/S0378-3758(96)00186-3">On sampling with probability proportional to size</a>
*/
public class ParetoSampling<T> implements WeightedRandomSampling<T> {
private final int sampleSize;
private final Random random;
private final PriorityQueue<Weighted<T>> pq;
private final Collection<T> unmodifiableSample;
private long streamSize;

/**
* Construct a new instance of {@link ParetoSampling} using the specified sample size and RNG. The implementation
* assumes that {@code random} conforms to the contract of {@link Random} and will perform no checks to ensure
* that. If this contract is violated, the behavior is undefined.
*
* @param sampleSize the sample size
* @param random the RNG to use
* @throws NullPointerException if {@code random} is {@code null}
* @throws IllegalArgumentException if {@code sampleSize} is less than 1
*/
public ParetoSampling(int sampleSize, Random random) {
if (random == null) {
throw new NullPointerException("Random was null");
}
if (sampleSize < 1) {
throw new IllegalArgumentException("Sample size was less than 1");
}
this.random = random;
this.sampleSize = sampleSize;
this.streamSize = 0;
this.pq = new PriorityQueue<>(sampleSize, Comparator.reverseOrder());
this.unmodifiableSample = new AbstractCollection<T>() {
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
final Iterator<Weighted<T>> it = pq.iterator();

@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public T next() {
return it.next().object;
}
};
}

@Override
public int size() {
return pq.size();
}
};
}

/**
* Construct a new instance of {@link ParetoSampling} using the specified sample size and a default source of
* randomness.
*
* @param sampleSize the sample size
* @throws IllegalArgumentException if {@code sampleSize} is less than 1
*/
public ParetoSampling(int sampleSize) {
this(sampleSize, new Random());
}

/**
* Get a {@link RandomSamplingCollector} from this class.
*
* @param sampleSize the sample size
* @param random the RNG to use
* @param <E> the type of elements
* @return a {@link RandomSamplingCollector} from this class
*/
public static <E> RandomSamplingCollector<E> collector(int sampleSize, Random random) {
return new RandomSamplingCollector<>(() -> new ParetoSampling<>(sampleSize, random));
}

/**
* Get a {@link WeightedRandomSamplingCollector} from this class.
*
* @param sampleSize the sample size
* @param random the RNG to use
* @param <E> the type of elements
* @return a {@link WeightedRandomSamplingCollector} from this class
*/
public static <E> WeightedRandomSamplingCollector<E> weightedCollector(int sampleSize, Random random) {
return new WeightedRandomSamplingCollector<>(() -> new ParetoSampling<>(sampleSize, random));
}

/**
* {@inheritDoc}
*
* @param item {@inheritDoc}
* @param weight {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalWeightException if {@code weight} is outside the range (0,+Inf)
*/
@Override
public boolean feed(T item, double weight) {
// Checks
if (item == null) {
throw new NullPointerException("Item was null");
}
if (weight <= 0) {
throw new IllegalWeightException("Weight was not positive, must be in (0,+Inf) but not 1");
}
if (Double.isInfinite(weight)) {
throw new IllegalWeightException("Weight was infinite, must be in (0,+Inf) but not 1");
}
if (weight == 1.0) {
throw new IllegalWeightException("Weight was 1, must be in (0,+Inf) but not 1");
}

// Produce a random value
final double r = RandomSamplingUtils.randomExclusive(random);

// Increase stream size
this.streamSize++;

// Calculate item weight
final Weighted<T> newItem = new Weighted<>(item, (r * (1 - weight)) / ((1 - r) * weight));
assert newItem.weight >= 0.0; // weight can also be 0.0 because of double precision

// Add item to reservoir
if (pq.size() < sampleSize) {
pq.add(newItem);
return true;
} else if (pq.peek().weight > newItem.weight) {
// Seems unfair for equal weight items to not have a chance to get in the sample
// Of course in the long run it hardly matters
assert pq.size() == sampleSize();
pq.poll();
pq.add(newItem);
return true;
}

return false;
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @param weights {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
* @throws IllegalWeightException {@inheritDoc}
*/
@Override
public boolean feed(Iterator<T> items, Iterator<Double> weights) {
return WeightedRandomSampling.super.feed(items, weights);
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalWeightException {@inheritDoc}
*/
@Override
public boolean feed(Map<T, Double> items) {
return WeightedRandomSampling.super.feed(items);
}

/**
* {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
public Collection<T> sample() {
return this.unmodifiableSample;
}

/**
* {@inheritDoc}
*
* @return {@inheritDoc}
*/
@Override
public final int sampleSize() {
assert this.sampleSize > 0;
return this.sampleSize;
}

/**
* Get the number of items that have been feeded to the algorithm during the lifetime of this instance.
* <p>
* If more than {@link Long#MAX_VALUE} items has been feeded to the instance, {@code streamSize()} will cycle the
* long values, continuing from {@link Long#MIN_VALUE}.
* <p>
* This method runs in constant time.
*
* @return the number of items that have been feeded to the algorithm
*/
@Override
public final long streamSize() {
return this.streamSize;
}

/**
* {@inheritDoc}
*
* @param item {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
@Override
public boolean feed(T item) {
return WeightedRandomSampling.super.feed(item);
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
@Override
public boolean feed(Iterator<T> items) {
return WeightedRandomSampling.super.feed(items);
}

/**
* {@inheritDoc}
*
* @param items {@inheritDoc}
* @return {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
@Override
public boolean feed(Iterable<T> items) {
return WeightedRandomSampling.super.feed(items);
}
}
12 changes: 12 additions & 0 deletions src/main/java/gr/james/sampling/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@
* <td>(0, +&infin;)</td>
* <td>-</td>
* </tr>
* <tr>
* <td>{@link gr.james.sampling.ParetoSampling}</td>
* <td>Algorithm by Rosén [9][10]</td>
* <td>{@code O(k)}</td>
* <td>ND</td>
* <td>(0, +&infin;) except 1.0</td>
* <td>-</td>
* </tr>
* </tbody>
* </table>
* <h3>References</h3>
Expand All @@ -142,6 +150,10 @@
* sampling with a reservoir." Information Processing Letters 97.5 (2006): 181-185.</a></li>
* <li><a href="https://www.mendeley.com/catalogue/95bcff1f-86be-389c-ab3f-717796d22abd/">Ohlsson, Esbjörn. "Sequential
* poisson sampling." Journal of official Statistics 14.2 (1998): 149.</a></li>
* <li><a href="https://doi.org/10.1016/S0378-3758(96)00185-1">Rosén, Bengt. "Asymptotic theory for order sampling."
* Journal of Statistical Planning and Inference 62.2 (1997): 135-158.</a></li>
* <li><a href="https://doi.org/10.1016/S0378-3758(96)00186-3">Rosén, Bengt. "On sampling with probability proportional
* to size." Journal of statistical planning and inference 62.2 (1997): 159-191.</a></li>
* </ol>
*/
package gr.james.sampling;
2 changes: 2 additions & 0 deletions src/test/java/gr/james/sampling/Benchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class Benchmark {
private static final EfraimidisSampling<Object> efraimidis = new EfraimidisSampling<>(sample, random);
private static final ChaoSampling<Object> chao = new ChaoSampling<>(sample, random);
private static final SequentialPoissonSampling<Object> sequentialPoisson = new SequentialPoissonSampling<>(sample, random);
private static final ParetoSampling<Object> pareto = new ParetoSampling<>(sample, random);

public static void main(String[] args) {
System.out.printf("%18s %5d ms%n", "Waterman", performance(waterman) / 1000000);
Expand All @@ -25,6 +26,7 @@ public static void main(String[] args) {
System.out.printf("%18s %5d ms%n", "Efraimidis", performance(efraimidis) / 1000000);
System.out.printf("%18s %5d ms%n", "Chao", performance(chao) / 1000000);
System.out.printf("%18s %5d ms%n", "Sequential Poisson", performance(sequentialPoisson) / 1000000);
System.out.printf("%18s %5d ms%n", "Pareto", performance(pareto) / 1000000);
}

private static long performance(RandomSampling<Object> alg) {
Expand Down
9 changes: 8 additions & 1 deletion src/test/java/gr/james/sampling/RandomSamplingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static Collection<Supplier<RandomSampling<Integer>>> implementations() {
implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new SequentialPoissonSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new ParetoSampling<>(SAMPLE, RANDOM));
return implementations;
}

Expand Down Expand Up @@ -72,7 +73,11 @@ public void correctness() {
final RandomSampling<Integer> alg = impl.get();

for (int i = 0; i < STREAM; i++) {
alg.feed(i);
if (alg instanceof ParetoSampling) {
((ParetoSampling<Integer>) alg).feed(i, 0.5);
} else {
alg.feed(i);
}
}

for (int s : alg.sample()) {
Expand Down Expand Up @@ -136,6 +141,8 @@ public void stream20() {
collector = LiLSamplingThreadSafe.collector(SAMPLE, RANDOM);
} else if (alg instanceof SequentialPoissonSampling) {
collector = SequentialPoissonSampling.collector(SAMPLE, RANDOM);
} else if (alg instanceof ParetoSampling) {
collector = ParetoSampling.collector(SAMPLE, RANDOM);
} else {
throw new AssertionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static Collection<Supplier<WeightedRandomSampling<Integer>>> implementati
implementations.add(() -> new EfraimidisSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new ChaoSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new SequentialPoissonSampling<>(SAMPLE, RANDOM));
implementations.add(() -> new ParetoSampling<>(SAMPLE, RANDOM));
return implementations;
}

Expand Down Expand Up @@ -88,6 +89,8 @@ public void stream20() {
collector = ChaoSampling.weightedCollector(SAMPLE, RANDOM);
} else if (alg instanceof SequentialPoissonSampling) {
collector = SequentialPoissonSampling.weightedCollector(SAMPLE, RANDOM);
} else if (alg instanceof ParetoSampling) {
collector = ParetoSampling.weightedCollector(SAMPLE, RANDOM);
} else {
throw new AssertionError();
}
Expand Down

0 comments on commit f0075b9

Please sign in to comment.