Skip to content

Commit e0fdc19

Browse files
authored
Merge pull request #982: [beam] OSS RetractPCollection prototype
2 parents b64ce7a + 3f81b96 commit e0fdc19

File tree

10 files changed

+1833
-0
lines changed

10 files changed

+1833
-0
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2017-2025 O2 Czech Republic, a.s.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package cz.o2.proxima.beam.core.transform.retract;
17+
18+
import cz.o2.proxima.core.functional.UnaryFunction;
19+
import cz.o2.proxima.core.util.ExceptionUtils;
20+
import java.util.Map;
21+
import lombok.AccessLevel;
22+
import lombok.Getter;
23+
import org.apache.beam.sdk.Pipeline;
24+
import org.apache.beam.sdk.coders.Coder;
25+
import org.apache.beam.sdk.coders.KvCoder;
26+
import org.apache.beam.sdk.transforms.MapElements;
27+
import org.apache.beam.sdk.transforms.PTransform;
28+
import org.apache.beam.sdk.values.KV;
29+
import org.apache.beam.sdk.values.PCollection;
30+
import org.apache.beam.sdk.values.PValue;
31+
import org.apache.beam.sdk.values.PValueBase;
32+
import org.apache.beam.sdk.values.TupleTag;
33+
import org.apache.beam.sdk.values.TypeDescriptor;
34+
import org.apache.beam.sdk.values.TypeParameter;
35+
36+
public class KeyedRetractPCollection<K, V> extends PValueBase implements PValue {
37+
38+
private final PCollection<RetractElement<V>> unwrapped;
39+
private final TupleTag<RetractElement<V>> tag;
40+
@Getter private final TypeDescriptor<K> keyDescriptor;
41+
@Getter private final TypeDescriptor<V> valueDescriptor;
42+
43+
@Getter(AccessLevel.PACKAGE)
44+
private final UnaryFunction<V, K> keyExtractor;
45+
46+
KeyedRetractPCollection(
47+
PCollection<RetractElement<V>> parent,
48+
Pipeline pipeline,
49+
TypeDescriptor<V> valueType,
50+
TypeDescriptor<K> keyType,
51+
UnaryFunction<V, K> keyExtractor,
52+
TupleTag<RetractElement<V>> tag) {
53+
54+
super(pipeline);
55+
this.unwrapped = parent;
56+
this.keyDescriptor = keyType;
57+
this.valueDescriptor = valueType;
58+
this.tag = tag;
59+
this.keyExtractor = keyExtractor;
60+
}
61+
62+
public <OUT extends PValue> OUT apply(PTransform<KeyedRetractPCollection<K, V>, OUT> transform) {
63+
return Pipeline.applyTransform(this, transform);
64+
}
65+
66+
public PCollection<RetractElement<KV<K, V>>> unwrapped() {
67+
TypeDescriptor<RetractElement<KV<K, V>>> type =
68+
getRetractElementTypeDescriptor(keyDescriptor, valueDescriptor);
69+
70+
Coder<K> keyCoder =
71+
ExceptionUtils.uncheckedFactory(
72+
() -> unwrapped.getPipeline().getCoderRegistry().getCoder(keyDescriptor));
73+
Coder<V> valueCoder =
74+
ExceptionUtils.uncheckedFactory(
75+
() -> unwrapped.getPipeline().getCoderRegistry().getCoder(valueDescriptor));
76+
KvCoder<K, V> kvCoder = KvCoder.of(keyCoder, valueCoder);
77+
Coder<RetractElement<KV<K, V>>> outputCoder = RetractElement.Coder.of(kvCoder);
78+
return unwrapped.apply(mapToUnwrapped(keyExtractor, type)).setCoder(outputCoder);
79+
}
80+
81+
public PCollection<RetractElement<V>> unwrappedValues() {
82+
return unwrapped;
83+
}
84+
85+
private static <K, V> MapElements<RetractElement<V>, RetractElement<KV<K, V>>> mapToUnwrapped(
86+
UnaryFunction<V, K> keyExtractor, TypeDescriptor<RetractElement<KV<K, V>>> type) {
87+
88+
return MapElements.into(type)
89+
.via(
90+
e ->
91+
e.isAddition()
92+
? RetractElement.ofAddition(
93+
KV.of(keyExtractor.apply(e.getValue()), e.getValue()), e.getSeqId())
94+
: RetractElement.ofDeletion(
95+
KV.of(keyExtractor.apply(e.getValue()), e.getValue()), e.getSeqId()));
96+
}
97+
98+
private static <K, V>
99+
MapElements<RetractElement<V>, RetractElement<KV<K, V>>> mapToUnwrappedValue(
100+
UnaryFunction<V, K> keyExtractor, TypeDescriptor<RetractElement<KV<K, V>>> type) {
101+
102+
return MapElements.into(type)
103+
.via(
104+
e ->
105+
e.isAddition()
106+
? RetractElement.ofAddition(
107+
KV.of(keyExtractor.apply(e.getValue()), e.getValue()), e.getSeqId())
108+
: RetractElement.ofDeletion(
109+
KV.of(keyExtractor.apply(e.getValue()), e.getValue()), e.getSeqId()));
110+
}
111+
112+
private static <K, V> TypeDescriptor<RetractElement<KV<K, V>>> getRetractElementTypeDescriptor(
113+
TypeDescriptor<K> keyDesc, TypeDescriptor<V> valueDesc) {
114+
115+
return new TypeDescriptor<RetractElement<KV<K, V>>>() {}.where(
116+
new TypeParameter<>() {}, keyDesc)
117+
.where(new TypeParameter<>() {}, valueDesc);
118+
}
119+
120+
@Override
121+
public Map<TupleTag<?>, PValue> expand() {
122+
return Map.of(tag, unwrapped);
123+
}
124+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2017-2025 O2 Czech Republic, a.s.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package cz.o2.proxima.beam.core.transform.retract;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.io.OutputStream;
21+
import lombok.EqualsAndHashCode;
22+
import lombok.ToString;
23+
import lombok.Value;
24+
import org.apache.beam.sdk.coders.BooleanCoder;
25+
import org.apache.beam.sdk.coders.Coder;
26+
import org.apache.beam.sdk.coders.CustomCoder;
27+
import org.checkerframework.checker.nullness.qual.Nullable;
28+
29+
@Value
30+
public class LeftOrRight<L, R> {
31+
32+
@ToString
33+
@EqualsAndHashCode(callSuper = false)
34+
static class LeftOrRightCoder<L, R> extends CustomCoder<LeftOrRight<L, R>> {
35+
private static final BooleanCoder boolCoder = BooleanCoder.of();
36+
private final Coder<L> leftCoder;
37+
private final Coder<R> rightCoder;
38+
39+
LeftOrRightCoder(Coder<L> leftCoder, Coder<R> rightCoder) {
40+
this.leftCoder = leftCoder;
41+
this.rightCoder = rightCoder;
42+
}
43+
44+
@Override
45+
public void encode(LeftOrRight<L, R> value, OutputStream outStream) throws IOException {
46+
if (value.isLeft()) {
47+
boolCoder.encode(true, outStream);
48+
leftCoder.encode(value.getLeft(), outStream);
49+
} else {
50+
boolCoder.encode(false, outStream);
51+
rightCoder.encode(value.getRight(), outStream);
52+
}
53+
}
54+
55+
@Override
56+
public LeftOrRight<L, R> decode(InputStream inStream) throws IOException {
57+
boolean isLeft = boolCoder.decode(inStream);
58+
if (isLeft) {
59+
return new LeftOrRight<>(leftCoder.decode(inStream), null);
60+
}
61+
return new LeftOrRight<>(null, rightCoder.decode(inStream));
62+
}
63+
}
64+
65+
public static <L, R> LeftOrRight<L, R> left(L left) {
66+
return new LeftOrRight<>(left, null);
67+
}
68+
69+
public static <L, R> LeftOrRight<L, R> right(R right) {
70+
return new LeftOrRight<>(null, right);
71+
}
72+
73+
@Nullable L left;
74+
@Nullable R right;
75+
76+
boolean isLeft() {
77+
return left != null;
78+
}
79+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2017-2025 O2 Czech Republic, a.s.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package cz.o2.proxima.beam.core.transform.retract;
17+
18+
import cz.o2.proxima.core.functional.UnaryFunction;
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
import java.util.List;
23+
import lombok.EqualsAndHashCode;
24+
import lombok.Getter;
25+
import lombok.ToString;
26+
import org.apache.beam.sdk.coders.BooleanCoder;
27+
import org.apache.beam.sdk.coders.Coder;
28+
import org.apache.beam.sdk.coders.CustomCoder;
29+
import org.apache.beam.sdk.coders.VarLongCoder;
30+
import org.apache.beam.sdk.values.TypeDescriptor;
31+
import org.apache.beam.sdk.values.TypeParameter;
32+
33+
@ToString
34+
@EqualsAndHashCode
35+
@Getter
36+
public class RetractElement<T> {
37+
38+
@EqualsAndHashCode(callSuper = false)
39+
public static class Coder<T> extends CustomCoder<RetractElement<T>> {
40+
41+
public static <T> Coder<T> of(org.apache.beam.sdk.coders.Coder<T> valueCoder) {
42+
return new Coder<>(valueCoder);
43+
}
44+
45+
private static final VarLongCoder longCoder = VarLongCoder.of();
46+
private static final BooleanCoder boolCoder = BooleanCoder.of();
47+
48+
@Getter private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
49+
50+
private Coder(org.apache.beam.sdk.coders.Coder<T> valueCoder) {
51+
this.valueCoder = valueCoder;
52+
}
53+
54+
@Override
55+
public void encode(RetractElement<T> value, OutputStream outStream) throws IOException {
56+
valueCoder.encode(value.getValue(), outStream);
57+
longCoder.encode(value.getSeqId(), outStream);
58+
boolCoder.encode(value.isAddition(), outStream);
59+
}
60+
61+
@Override
62+
public RetractElement<T> decode(InputStream inStream) throws IOException {
63+
return new RetractElement<>(
64+
valueCoder.decode(inStream), longCoder.decode(inStream), boolCoder.decode(inStream));
65+
}
66+
67+
@Override
68+
public List<? extends org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
69+
return List.of(valueCoder);
70+
}
71+
72+
@Override
73+
public TypeDescriptor<RetractElement<T>> getEncodedTypeDescriptor() {
74+
return new TypeDescriptor<RetractElement<T>>() {}.where(
75+
new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
76+
}
77+
}
78+
79+
public static <T> RetractElement<T> ofAddition(T value, long seqId) {
80+
return new RetractElement<>(value, seqId, true);
81+
}
82+
83+
public static <T> RetractElement<T> ofDeletion(T value, long seqId) {
84+
return new RetractElement<>(value, seqId, false);
85+
}
86+
87+
private final T value;
88+
private final long seqId;
89+
private final boolean isAddition;
90+
91+
public RetractElement(T value, long seqId, boolean isAddition) {
92+
this.value = value;
93+
this.seqId = seqId;
94+
this.isAddition = isAddition;
95+
}
96+
97+
public <M> RetractElement<M> mapped(UnaryFunction<T, M> map) {
98+
return isAddition() ? ofAddition(map.apply(value), seqId) : ofDeletion(map.apply(value), seqId);
99+
}
100+
}

0 commit comments

Comments
 (0)