Skip to content

Commit 846fc1a

Browse files
authored
Merge pull request #983: [proxima-beam-core] fix packages of retract transforms
2 parents e0fdc19 + 7ce6b83 commit 846fc1a

File tree

10 files changed

+73
-498
lines changed

10 files changed

+73
-498
lines changed

beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/KeyedRetractPCollection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
1717

1818
import cz.o2.proxima.core.functional.UnaryFunction;
1919
import cz.o2.proxima.core.util.ExceptionUtils;

beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/LeftOrRight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
1717

1818
import java.io.IOException;
1919
import java.io.InputStream;

beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/RetractElement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
1717

1818
import cz.o2.proxima.core.functional.UnaryFunction;
1919
import java.io.IOException;

beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/RetractInnerJoin.java

Lines changed: 0 additions & 383 deletions
This file was deleted.

beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/RetractMostRecentJoin.java renamed to beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/RetractJoin.java

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
1717

18-
import cz.o2.proxima.beam.core.transform.retract.LeftOrRight.LeftOrRightCoder;
18+
import cz.o2.proxima.beam.core.transforms.retract.LeftOrRight.LeftOrRightCoder;
1919
import cz.o2.proxima.beam.util.state.ExcludeExternal;
2020
import cz.o2.proxima.core.functional.UnaryFunction;
2121
import cz.o2.proxima.core.util.ExceptionUtils;
@@ -54,7 +54,7 @@
5454
import org.joda.time.Instant;
5555

5656
@Slf4j
57-
public class RetractMostRecentJoin {
57+
public class RetractJoin {
5858

5959
public static <K1, K2, V1, V2, JK> RetractPCollection<KV<KV<K1, K2>, KV<V1, V2>>> join(
6060
KeyedRetractPCollection<K1, V1> lhs,
@@ -206,9 +206,10 @@ public void process(
206206

207207
Instant cleanupStamp = cleanupTs.read();
208208
if (cleanupStamp == null) {
209-
// first element setup timer
209+
// first element set up timer
210210
cleanupTimer.offset(cleanupDuration).setRelative();
211-
cleanupTs.write(cleanupStamp = BoundedWindow.TIMESTAMP_MIN_VALUE);
211+
cleanupStamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
212+
cleanupTs.write(cleanupStamp);
212213
}
213214

214215
AtomicLong seq = new AtomicLong(MoreObjects.firstNonNull(seqState.read(), 0L));
@@ -257,8 +258,8 @@ private <PK, SK, PV, SV> void retractAndEmit(
257258
PK primaryKey,
258259
RetractElement<PV> primaryValue,
259260
Instant ts,
260-
OutputConsumer<PK, SK, PV, SV> add,
261-
OutputConsumer<PK, SK, PV, SV> retract) {
261+
OutputConsumer<PK, SK, PV, SV> addConsumer,
262+
OutputConsumer<PK, SK, PV, SV> retractConsumer) {
262263

263264
SequentialInstant thisInstant = new SequentialInstant(ts, primaryValue.getSeqId());
264265
@Nullable KV<SequentialInstant, RetractElement<PV>> oldPrimaryValue =
@@ -288,36 +289,55 @@ private <PK, SK, PV, SV> void retractAndEmit(
288289

289290
// infer time-range for updates
290291
for (Map.Entry<SK, KV<SequentialInstant, RetractElement<SV>>> e : secondaryKeyedElements) {
291-
SK secondaryKey = e.getKey();
292-
KV<SequentialInstant, RetractElement<SV>> secondaryValue = e.getValue();
293-
// if secondary value exists
294-
if (secondaryValue.getValue().isAddition()) {
295-
// remove old value, if exists
296-
if (oldPrimaryValue != null && oldPrimaryValue.getValue().isAddition()) {
297-
retract.apply(
298-
primaryKey,
299-
secondaryKey,
300-
oldPrimaryValue.getValue().getValue(),
301-
secondaryValue.getValue().getValue(),
302-
max(oldPrimaryValue.getKey(), secondaryValue.getKey()));
303-
}
304-
if (primaryValue.isAddition()) {
305-
// insert new value
306-
add.apply(
307-
primaryKey,
308-
secondaryKey,
309-
primaryValue.getValue(),
310-
secondaryValue.getValue().getValue(),
311-
max(thisInstant, secondaryValue.getKey()));
312-
} else {
313-
// retract old value
314-
retract.apply(
315-
primaryKey,
316-
secondaryKey,
317-
oldPrimaryValue.getValue().getValue(),
318-
secondaryValue.getValue().getValue(),
319-
max(thisInstant, secondaryValue.getKey()));
320-
}
292+
processSecondaryElement(
293+
primaryKey,
294+
primaryValue,
295+
e,
296+
thisInstant,
297+
oldPrimaryValue,
298+
addConsumer,
299+
retractConsumer);
300+
}
301+
}
302+
303+
private <PK, SK, PV, SV> void processSecondaryElement(
304+
PK primaryKey,
305+
RetractElement<PV> primaryValue,
306+
Entry<SK, KV<SequentialInstant, RetractElement<SV>>> e,
307+
SequentialInstant thisInstant,
308+
@Nullable KV<SequentialInstant, RetractElement<PV>> oldPrimaryValue,
309+
OutputConsumer<PK, SK, PV, SV> addConsumer,
310+
OutputConsumer<PK, SK, PV, SV> retractConsumer) {
311+
312+
SK secondaryKey = e.getKey();
313+
KV<SequentialInstant, RetractElement<SV>> secondaryValue = e.getValue();
314+
// if secondary value exists
315+
if (secondaryValue.getValue().isAddition()) {
316+
// remove old value, if exists
317+
if (oldPrimaryValue != null && oldPrimaryValue.getValue().isAddition()) {
318+
retractConsumer.apply(
319+
primaryKey,
320+
secondaryKey,
321+
oldPrimaryValue.getValue().getValue(),
322+
secondaryValue.getValue().getValue(),
323+
max(oldPrimaryValue.getKey(), secondaryValue.getKey()));
324+
}
325+
if (primaryValue.isAddition()) {
326+
// insert new value
327+
addConsumer.apply(
328+
primaryKey,
329+
secondaryKey,
330+
primaryValue.getValue(),
331+
secondaryValue.getValue().getValue(),
332+
max(thisInstant, secondaryValue.getKey()));
333+
} else {
334+
// retract old value
335+
retractConsumer.apply(
336+
primaryKey,
337+
secondaryKey,
338+
oldPrimaryValue.getValue().getValue(),
339+
secondaryValue.getValue().getValue(),
340+
max(thisInstant, secondaryValue.getKey()));
321341
}
322342
}
323343
}

beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/RetractPCollection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
1717

1818
import cz.o2.proxima.core.functional.UnaryFunction;
1919
import cz.o2.proxima.core.util.ExceptionUtils;

beam/core/src/main/java/cz/o2/proxima/beam/core/transforms/retract/SequentialInstant.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
1717

1818
import java.io.IOException;
1919
import java.io.InputStream;

beam/core/src/test/java/cz/o2/proxima/beam/core/transforms/retract/RetractInnerJoinTest.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

beam/core/src/test/java/cz/o2/proxima/beam/core/transforms/retract/RetractMostRecentJoinTest.java renamed to beam/core/src/test/java/cz/o2/proxima/beam/core/transforms/retract/RetractJoinTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
17+
18+
import static org.junit.Assert.assertNotNull;
1719

1820
import cz.o2.proxima.core.util.Pair;
1921
import java.util.ArrayList;
@@ -51,7 +53,7 @@
5153
import org.joda.time.Instant;
5254
import org.junit.Test;
5355

54-
public class RetractMostRecentJoinTest {
56+
public class RetractJoinTest {
5557

5658
final Duration cleanupDuration = Duration.standardDays(2);
5759

@@ -71,7 +73,7 @@ public void testSimple() {
7173
PCollection<
7274
RetractElement<KV<KV<String, Integer>, KV<KV<String, Integer>, KV<Integer, String>>>>>
7375
joined =
74-
RetractMostRecentJoin.join(
76+
RetractJoin.join(
7577
lhs, rhs, KV::getValue, KV::getKey, TypeDescriptors.integers(), cleanupDuration)
7678
.unwrapped();
7779
PCollection<String> res =
@@ -89,7 +91,7 @@ public void testSimple() {
8991
e.getValue().getValue().getValue().getValue())));
9092

9193
PAssert.that(res).containsInAnyOrder("b:2::b:2:2:c", "a:1::a:1:1:b");
92-
p.run();
94+
assertNotNull(p.run());
9395
}
9496

9597
@Test
@@ -124,7 +126,7 @@ public void testWithRetractions() {
124126
PCollection<
125127
RetractElement<KV<KV<String, Integer>, KV<KV<String, Integer>, KV<Integer, String>>>>>
126128
joined =
127-
RetractMostRecentJoin.join(
129+
RetractJoin.join(
128130
lhs, rhs, KV::getValue, KV::getKey, TypeDescriptors.integers(), cleanupDuration)
129131
.unwrapped();
130132
PCollection<String> res =
@@ -144,7 +146,7 @@ public void testWithRetractions() {
144146
e.getValue().getValue().getValue())));
145147

146148
PAssert.that(res).containsInAnyOrder("a:3::a:3:3:c", "b:4::b:4:4:x");
147-
p.run();
149+
assertNotNull(p.run());
148150
}
149151

150152
@Test
@@ -183,7 +185,7 @@ public void testWithManyRetractions() {
183185
PCollection<
184186
RetractElement<KV<KV<String, Integer>, KV<KV<String, Integer>, KV<Integer, String>>>>>
185187
joined =
186-
RetractMostRecentJoin.join(
188+
RetractJoin.join(
187189
lhs, rhs, KV::getValue, KV::getKey, TypeDescriptors.integers(), cleanupDuration)
188190
.unwrapped();
189191
PCollection<String> res =
@@ -203,7 +205,7 @@ public void testWithManyRetractions() {
203205
e.getValue().getValue().getValue())));
204206

205207
PAssert.that(res).containsInAnyOrder(computeOutputs(inputs));
206-
p.run();
208+
assertNotNull(p.run());
207209
}
208210

209211
private List<String> computeOutputs(

beam/core/src/test/java/cz/o2/proxima/beam/core/transforms/retract/RetractPCollectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package cz.o2.proxima.beam.core.transform.retract;
16+
package cz.o2.proxima.beam.core.transforms.retract;
1717

1818
import static org.junit.Assert.assertNotNull;
1919

0 commit comments

Comments
 (0)