Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 6e5d743

Browse files
tgrohdavorbonaci
authored andcommitted
Implement Counter#merge
Merge is referenced in isCompatibleWith but is not implemented via the Counter interface. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112723417
1 parent a923d77 commit 6e5d743

File tree

2 files changed

+178
-1
lines changed

2 files changed

+178
-1
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.AND;
2020
import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.MEAN;
2121
import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.OR;
22+
import static com.google.common.base.Preconditions.checkArgument;
2223

2324
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
2425
import com.google.common.util.concurrent.AtomicDouble;
@@ -328,6 +329,13 @@ public boolean isCompatibleWith(Counter<?> that) {
328329
&& this.getClass().equals(that.getClass());
329330
}
330331

332+
/**
333+
* Merges this counter with the provided counter, returning this counter with the combined value
334+
* of both counters. This may reset the delta of this counter.
335+
*
336+
* @throws IllegalArgumentException if the provided Counter is not compatible with this Counter
337+
*/
338+
public abstract Counter<T> merge(Counter<T> that);
331339

332340
//////////////////////////////////////////////////////////////////////////////
333341

@@ -494,6 +502,25 @@ public CounterMean<Long> getMean() {
494502
return mean.get();
495503
}
496504

505+
@Override
506+
public Counter<Long> merge(Counter<Long> that) {
507+
checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
508+
switch (kind) {
509+
case SUM:
510+
case MIN:
511+
case MAX:
512+
return addValue(that.getAggregate());
513+
case MEAN:
514+
CounterMean<Long> thisCounterMean = this.getMean();
515+
CounterMean<Long> thatCounterMean = that.getMean();
516+
return resetMeanToValue(
517+
thisCounterMean.getCount() + thatCounterMean.getCount(),
518+
thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
519+
default:
520+
throw illegalArgumentException();
521+
}
522+
}
523+
497524
private static class LongCounterMean implements CounterMean<Long> {
498525
private final long aggregate;
499526
private final long count;
@@ -670,6 +697,25 @@ public CounterMean<Double> getMean() {
670697
return mean.get();
671698
}
672699

700+
@Override
701+
public Counter<Double> merge(Counter<Double> that) {
702+
checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
703+
switch (kind) {
704+
case SUM:
705+
case MIN:
706+
case MAX:
707+
return addValue(that.getAggregate());
708+
case MEAN:
709+
CounterMean<Double> thisCounterMean = this.getMean();
710+
CounterMean<Double> thatCounterMean = that.getMean();
711+
return resetMeanToValue(
712+
thisCounterMean.getCount() + thatCounterMean.getCount(),
713+
thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
714+
default:
715+
throw illegalArgumentException();
716+
}
717+
}
718+
673719
private static class DoubleCounterMean implements CounterMean<Double> {
674720
private final double aggregate;
675721
private final long count;
@@ -763,13 +809,18 @@ public Boolean getAggregate() {
763809
public CounterMean<Boolean> getMean() {
764810
throw illegalArgumentException();
765811
}
812+
813+
@Override
814+
public Counter<Boolean> merge(Counter<Boolean> that) {
815+
checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
816+
return addValue(that.getAggregate());
817+
}
766818
}
767819

768820
/**
769821
* Implements a {@link Counter} for {@link String} values.
770822
*/
771823
private static class StringCounter extends Counter<String> {
772-
773824
/** Initializes a new {@link Counter} for {@link String} values. */
774825
private StringCounter(String name, AggregationKind kind) {
775826
super(name, kind);
@@ -833,6 +884,15 @@ public CounterMean<String> getMean() {
833884
throw illegalArgumentException();
834885
}
835886
}
887+
888+
@Override
889+
public Counter<String> merge(Counter<String> that) {
890+
checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
891+
switch (kind) {
892+
default:
893+
throw illegalArgumentException();
894+
}
895+
}
836896
}
837897

838898
/**
@@ -985,6 +1045,25 @@ public CounterMean<Integer> getMean() {
9851045
return mean.get();
9861046
}
9871047

1048+
@Override
1049+
public Counter<Integer> merge(Counter<Integer> that) {
1050+
checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
1051+
switch (kind) {
1052+
case SUM:
1053+
case MIN:
1054+
case MAX:
1055+
return addValue(that.getAggregate());
1056+
case MEAN:
1057+
CounterMean<Integer> thisCounterMean = this.getMean();
1058+
CounterMean<Integer> thatCounterMean = that.getMean();
1059+
return resetMeanToValue(
1060+
thisCounterMean.getCount() + thatCounterMean.getCount(),
1061+
thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
1062+
default:
1063+
throw illegalArgumentException();
1064+
}
1065+
}
1066+
9881067
private static class IntegerCounterMean implements CounterMean<Integer> {
9891068
private final int aggregate;
9901069
private final long count;

sdk/src/test/java/com/google/cloud/dataflow/sdk/util/common/CounterTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@
3333
import com.google.cloud.dataflow.sdk.util.CloudCounterUtils;
3434
import com.google.cloud.dataflow.sdk.util.common.Counter.CounterMean;
3535

36+
import org.junit.Rule;
3637
import org.junit.Test;
38+
import org.junit.rules.ExpectedException;
3739
import org.junit.runner.RunWith;
3840
import org.junit.runners.JUnit4;
3941

4042
import java.util.Arrays;
4143
import java.util.HashSet;
44+
import java.util.List;
4245
import java.util.Set;
4346

4447
/**
@@ -47,6 +50,9 @@
4750
@RunWith(JUnit4.class)
4851
public class CounterTest {
4952

53+
@Rule
54+
public ExpectedException thrown = ExpectedException.none();
55+
5056
private static MetricUpdate flush(Counter<?> c) {
5157
// TODO: Move this out into a separate Counter test.
5258
return CounterTestUtils.extractCounterUpdate(c, true);
@@ -211,6 +217,13 @@ public void testSumLong() {
211217
c.resetToValue(100L).addValue(17L).addValue(49L);
212218
expectedTotal = expectedDelta = 166;
213219
assertOK(expectedTotal, expectedDelta, c);
220+
221+
Counter<Long> other = Counter.longs("sum-long", SUM);
222+
other.addValue(12L);
223+
expectedDelta = 12L;
224+
expectedTotal += 12L;
225+
c.merge(other);
226+
assertOK(expectedTotal, expectedDelta, c);
214227
}
215228

216229
@Test
@@ -241,6 +254,13 @@ public void testSumDouble() {
241254
c.resetToValue(Math.sqrt(17)).addValue(17.0).addValue(49.0);
242255
expectedTotal = expectedDelta = Math.sqrt(17.0) + 17.0 + 49.0;
243256
assertOK(expectedTotal, expectedDelta, c);
257+
258+
Counter<Double> other = Counter.doubles("sum-double", SUM);
259+
other.addValue(12 * Math.PI);
260+
expectedDelta = 12 * Math.PI;
261+
expectedTotal += 12 * Math.PI;
262+
c.merge(other);
263+
assertOK(expectedTotal, expectedDelta, c);
244264
}
245265

246266

@@ -272,6 +292,12 @@ public void testMaxLong() {
272292
c.resetToValue(100L).addValue(171L).addValue(49L);
273293
expectedTotal = expectedDelta = 171;
274294
assertOK(expectedTotal, expectedDelta, c);
295+
296+
Counter<Long> other = Counter.longs("max-long", MAX);
297+
other.addValue(12L);
298+
expectedDelta = 12L;
299+
c.merge(other);
300+
assertOK(expectedTotal, expectedDelta, c);
275301
}
276302

277303
@Test
@@ -300,6 +326,12 @@ public void testMaxDouble() {
300326
c.resetToValue(Math.sqrt(17)).addValue(171.0).addValue(49.0);
301327
expectedTotal = expectedDelta = 171.0;
302328
assertOK(expectedTotal, expectedDelta, c);
329+
330+
Counter<Double> other = Counter.doubles("max-double", MAX);
331+
other.addValue(12 * Math.PI);
332+
expectedDelta = 12 * Math.PI;
333+
c.merge(other);
334+
assertOK(expectedTotal, expectedDelta, c);
303335
}
304336

305337

@@ -331,6 +363,12 @@ public void testMinLong() {
331363
c.resetToValue(100L).addValue(171L).addValue(49L);
332364
expectedTotal = expectedDelta = 49;
333365
assertOK(expectedTotal, expectedDelta, c);
366+
367+
Counter<Long> other = Counter.longs("min-long", MIN);
368+
other.addValue(42L);
369+
expectedTotal = expectedDelta = 42L;
370+
c.merge(other);
371+
assertOK(expectedTotal, expectedDelta, c);
334372
}
335373

336374
@Test
@@ -359,6 +397,12 @@ public void testMinDouble() {
359397
c.resetToValue(Math.sqrt(17)).addValue(171.0).addValue(0.0);
360398
expectedTotal = expectedDelta = 0.0;
361399
assertOK(expectedTotal, expectedDelta, c);
400+
401+
Counter<Double> other = Counter.doubles("min-double", MIN);
402+
other.addValue(42 * Math.E);
403+
expectedDelta = 42 * Math.E;
404+
c.merge(other);
405+
assertOK(expectedTotal, expectedDelta, c);
362406
}
363407

364408

@@ -419,6 +463,15 @@ public void testMeanLong() {
419463
expTotal = expDelta = 166;
420464
expCountTotal = expCountDelta = 5;
421465
assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
466+
467+
Counter<Long> other = Counter.longs("mean-long", MEAN);
468+
other.addValue(12L).addValue(44L).addValue(-5L);
469+
expTotal += 12L + 44L - 5L;
470+
expDelta += 12L + 44L - 5L;
471+
expCountTotal += 3;
472+
expCountDelta += 3;
473+
c.merge(other);
474+
assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
422475
}
423476

424477
@Test
@@ -458,6 +511,15 @@ public void testMeanDouble() {
458511
expTotal = expDelta = Math.sqrt(17.0) + 17.0 + 49.0;
459512
expCountTotal = expCountDelta = 5;
460513
assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
514+
515+
Counter<Double> other = Counter.doubles("mean-double", MEAN);
516+
other.addValue(3 * Math.PI).addValue(12 * Math.E);
517+
expTotal += 3 * Math.PI + 12 * Math.E;
518+
expDelta += 3 * Math.PI + 12 * Math.E;
519+
expCountTotal += 2;
520+
expCountDelta += 2;
521+
c.merge(other);
522+
assertMean(expTotal, expDelta, expCountTotal, expCountDelta, c);
461523
}
462524

463525

@@ -590,4 +652,40 @@ public void testExtraction() {
590652
assertEquals(cloudCountersFromSet, cloudCountersFromArray);
591653
assertEquals(2, cloudCountersFromSet.size());
592654
}
655+
656+
@Test
657+
public void testMergeIncompatibleCounters() {
658+
Counter<Long> longSums = Counter.longs("longsums", SUM);
659+
Counter<Long> longMean = Counter.longs("longmean", MEAN);
660+
Counter<Long> longMin = Counter.longs("longmin", MIN);
661+
662+
Counter<Long> otherLongSums = Counter.longs("othersums", SUM);
663+
Counter<Long> otherLongMean = Counter.longs("otherlongmean", MEAN);
664+
665+
Counter<Double> doubleSums = Counter.doubles("doublesums", SUM);
666+
Counter<Double> doubleMean = Counter.doubles("doublemean", MEAN);
667+
668+
Counter<Boolean> boolAnd = Counter.booleans("and", AND);
669+
Counter<Boolean> boolOr = Counter.booleans("or", OR);
670+
671+
List<Counter<Long>> longCounters =
672+
Arrays.asList(longSums, longMean, longMin, otherLongSums, otherLongMean);
673+
for (Counter<Long> left : longCounters) {
674+
for (Counter<Long> right : longCounters) {
675+
if (left != right) {
676+
assertIncompatibleMerge(left, right);
677+
}
678+
}
679+
}
680+
681+
assertIncompatibleMerge(doubleSums, doubleMean);
682+
assertIncompatibleMerge(boolAnd, boolOr);
683+
}
684+
685+
private <T> void assertIncompatibleMerge(Counter<T> left, Counter<T> right) {
686+
thrown.expect(IllegalArgumentException.class);
687+
thrown.expectMessage("Counters");
688+
thrown.expectMessage("are incompatible");
689+
left.merge(right);
690+
}
593691
}

0 commit comments

Comments
 (0)