Skip to content

Commit 143ba3c

Browse files
authored
Merge pull request #980: Configurable transformer naming
2 parents 5ab0284 + 70aec77 commit 143ba3c

File tree

10 files changed

+91
-45
lines changed

10 files changed

+91
-45
lines changed

core/src/main/java/cz/o2/proxima/core/repository/AttributeFamilyDescriptor.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static Builder newBuilder() {
7272
@Getter
7373
private final ConsumerNameFactory<AttributeFamilyDescriptor> replicationConsumerNameFactory;
7474

75-
private final Map<String, Object> cfg;
75+
@Getter private final Map<String, Object> cfg;
7676

7777
AttributeFamilyDescriptor(
7878
String name,
@@ -117,10 +117,6 @@ private ConsumerNameFactory<AttributeFamilyDescriptor> constructConsumerNameFact
117117
return factory;
118118
}
119119

120-
public Map<String, Object> getCfg() {
121-
return Collections.unmodifiableMap(cfg);
122-
}
123-
124120
public List<AttributeDescriptor<?>> getAttributes() {
125121
return Collections.unmodifiableList(attributes);
126122
}

core/src/main/java/cz/o2/proxima/core/repository/ConfigRepository.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,6 +1577,7 @@ private void createRemoteReadTransform(
15771577
.setName(transform)
15781578
.addAttributes(source.getAttributes())
15791579
.setFilter(replicated.getFilter())
1580+
.setCfg(Collections.emptyMap())
15801581
.setTransformation(
15811582
renameTransform(
15821583
sourceMapping::get,
@@ -1625,6 +1626,7 @@ private void createTargetTransform(
16251626
.setName(transform)
16261627
.addAttributes(write.getAttributes())
16271628
.setFilter(targetFamily.getFilter())
1629+
.setCfg(Collections.emptyMap())
16281630
.setTransformation(
16291631
renameTransform(
16301632
sourceMapping::get,
@@ -1658,6 +1660,7 @@ private void createLocalWriteTransform(
16581660
.setName(transform)
16591661
.addAttributes(write.getAttributes())
16601662
.setFilter(replicated.getFilter())
1663+
.setCfg(Collections.emptyMap())
16611664
.setTransformation(
16621665
renameTransform(
16631666
src ->
@@ -2013,9 +2016,11 @@ private void readTransformations(Config cfg) {
20132016
return;
20142017
}
20152018

2019+
log.info("Will process {} loaded transformations.", cfgTransforms.keySet());
2020+
20162021
cfgTransforms.forEach(
20172022
(name, v) -> {
2018-
Map<String, Object> transformation = toMap(name, v);
2023+
Map<String, Object> transformation = flatten(toMap(name, v));
20192024

20202025
boolean disabled =
20212026
Optional.ofNullable(transformation.get(DISABLED))
@@ -2047,6 +2052,7 @@ private void readTransformations(Config cfg) {
20472052
TransformationDescriptor.newBuilder()
20482053
.setName(name)
20492054
.addAttributes(attrs)
2055+
.setCfg(transformation)
20502056
.setTransformation(t);
20512057

20522058
Optional.ofNullable(transformation.get(FILTER))
@@ -2059,7 +2065,7 @@ private void readTransformations(Config cfg) {
20592065
});
20602066

20612067
TransformationDescriptor transformationDescriptor = desc.build();
2062-
setupTransform(transformationDescriptor.getTransformation(), transformation);
2068+
setupTransform(transformationDescriptor, transformation);
20632069
this.transformations.put(name, transformationDescriptor);
20642070
});
20652071
}
@@ -2073,9 +2079,10 @@ private void createTransactionCommitTransformation() {
20732079
.setTransformation(provider.create())
20742080
.addAttributes(transaction.getAttribute(COMMIT_ATTRIBUTE))
20752081
.setName(name)
2082+
.setCfg(Collections.emptyMap())
20762083
.disableOutputTransactions()
20772084
.build();
2078-
setupTransform(descriptor.getTransformation(), Collections.emptyMap());
2085+
setupTransform(descriptor, Collections.emptyMap());
20792086
this.transformations.put(name, descriptor);
20802087
}
20812088

@@ -2099,13 +2106,16 @@ private TransactionTransformProvider getTransactionTransformProvider() {
20992106
return Iterables.get(providers, 0);
21002107
}
21012108

2102-
private void setupTransform(Transformation transformation, Map<String, Object> cfg) {
2109+
private void setupTransform(
2110+
TransformationDescriptor transformationDesc, Map<String, Object> cfg) {
2111+
Transformation transformation = transformationDesc.getTransformation();
21032112
if (transformation.isContextual()) {
21042113
final DataOperator op = getDataOperatorForDelegate(transformation);
21052114
transformation.asContextualTransform().setup(this, op, cfg);
21062115
} else {
21072116
transformation.asElementWiseTransform().setup(this, cfg);
21082117
}
2118+
transformationDesc.getConsumerNameFactory().setup(transformationDesc);
21092119
}
21102120

21112121
private DataOperator getDataOperatorForDelegate(DataOperatorAware delegate) {

core/src/main/java/cz/o2/proxima/core/repository/DefaultConsumerNameFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,22 @@ public void setup(AttributeFamilyDescriptor descriptor) {
8888
this.suffix = descriptor.getCfg().getOrDefault(getSuffixCfgKey(), "").toString();
8989
}
9090
}
91+
92+
public static class DefaultTransformerConsumerNameFactory
93+
extends DefaultConsumerNameFactory<TransformationDescriptor> {
94+
95+
public DefaultTransformerConsumerNameFactory() {
96+
super(
97+
"transformer-",
98+
CFG_TRANSFORMER_CONSUMER_NAME_PREFIX,
99+
CFG_TRANSFORMER_CONSUMER_NAME_SUFFIX);
100+
}
101+
102+
@Override
103+
public void setup(TransformationDescriptor descriptor) {
104+
this.name = descriptor.getName();
105+
this.prefix = descriptor.getCfg().getOrDefault(getPrefixCfgKey(), "").toString();
106+
this.suffix = descriptor.getCfg().getOrDefault(getSuffixCfgKey(), "").toString();
107+
}
108+
}
91109
}

core/src/main/java/cz/o2/proxima/core/repository/TransformationDescriptor.java

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
package cz.o2.proxima.core.repository;
1717

1818
import cz.o2.proxima.core.annotations.Evolving;
19+
import cz.o2.proxima.core.repository.DefaultConsumerNameFactory.DefaultTransformerConsumerNameFactory;
1920
import cz.o2.proxima.core.storage.PassthroughFilter;
2021
import cz.o2.proxima.core.storage.StorageFilter;
2122
import cz.o2.proxima.core.transform.Transformation;
22-
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
2323
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
2424
import cz.o2.proxima.internal.com.google.common.collect.Iterables;
2525
import java.io.Serializable;
@@ -32,9 +32,11 @@
3232
import java.util.stream.Collectors;
3333
import javax.annotation.Nullable;
3434
import lombok.Getter;
35+
import lombok.ToString;
3536

3637
/** Descriptor of single transformation specified in {@code transformations}. */
3738
@Evolving
39+
@ToString
3840
public class TransformationDescriptor implements Serializable {
3941

4042
private static final long serialVersionUID = 1L;
@@ -50,6 +52,7 @@ static class Builder {
5052
Transformation transformation;
5153
StorageFilter filter;
5254
boolean outputTransactions = true;
55+
Map<String, Object> cfg;
5356

5457
Builder setName(String name) {
5558
this.name = name;
@@ -71,6 +74,11 @@ Builder addAttributes(AttributeDescriptor<?>... attrs) {
7174
return this;
7275
}
7376

77+
Builder setCfg(Map<String, Object> cfg) {
78+
this.cfg = Collections.unmodifiableMap(cfg);
79+
return this;
80+
}
81+
7482
Builder addAttributes(Iterable<AttributeDescriptor<?>> attrs) {
7583
attrs.forEach(this.attrs::add);
7684
return this;
@@ -82,11 +90,12 @@ Builder disableOutputTransactions() {
8290
}
8391

8492
TransformationDescriptor build() {
85-
8693
Preconditions.checkArgument(!attrs.isEmpty(), "Please specify at least one attribute");
8794
Preconditions.checkArgument(transformation != null, "Please specify transformation function");
95+
Preconditions.checkArgument(cfg != null);
8896

89-
return new TransformationDescriptor(name, attrs, transformation, outputTransactions, filter);
97+
return new TransformationDescriptor(
98+
name, attrs, transformation, outputTransactions, cfg, filter);
9099
}
91100
}
92101

@@ -115,30 +124,38 @@ public enum OutputTransactionMode {
115124
/** The (stateless) mapping function. */
116125
@Getter private final Transformation transformation;
117126

127+
@Getter private final Map<String, Object> cfg;
128+
118129
/** Input filter. */
119130
@Getter private final StorageFilter filter;
120131

121132
@Getter private final InputTransactionMode inputTransactionMode;
122133

123134
@Getter private final OutputTransactionMode outputTransactionMode;
124135

136+
@Getter private final ConsumerNameFactory<TransformationDescriptor> consumerNameFactory;
137+
125138
private TransformationDescriptor(
126139
String name,
127140
List<AttributeDescriptor<?>> attributes,
128141
Transformation transformation,
129142
boolean supportOutputTransactions,
143+
Map<String, Object> cfg,
130144
@Nullable StorageFilter filter) {
131145

132146
this.name = Objects.requireNonNull(name);
133147
this.attributes = Objects.requireNonNull(attributes);
134148
this.transformation = Objects.requireNonNull(transformation);
135149
this.outputTransactionMode =
136150
supportOutputTransactions ? OutputTransactionMode.ENABLED : OutputTransactionMode.DISABLED;
151+
this.cfg = cfg;
137152
this.filter = filter == null ? new PassthroughFilter() : filter;
138153
this.inputTransactionMode =
139154
requireSingleTransactionMode(name, attributes) != TransactionMode.NONE
140155
? InputTransactionMode.TRANSACTIONAL
141156
: InputTransactionMode.NON_TRANSACTIONAL;
157+
158+
this.consumerNameFactory = new DefaultTransformerConsumerNameFactory();
142159
}
143160

144161
private TransactionMode requireSingleTransactionMode(
@@ -163,32 +180,4 @@ void replaceAttribute(AttributeDescriptor<?> attr) {
163180
attributes.remove(attr);
164181
attributes.add(attr);
165182
}
166-
167-
public ConsumerNameFactory<TransformationDescriptor> getConsumerNameFactory() {
168-
return new ConsumerNameFactory<TransformationDescriptor>() {
169-
170-
private static final long serialVersionUID = 1L;
171-
172-
@Override
173-
public void setup(TransformationDescriptor descriptor) {
174-
// nop
175-
}
176-
177-
@Override
178-
public String apply() {
179-
return "transformer-" + getName();
180-
}
181-
};
182-
}
183-
184-
@Override
185-
public String toString() {
186-
return MoreObjects.toStringHelper(this)
187-
.add("name", name)
188-
.add("attributes", attributes)
189-
.add("inputTransactionMode", inputTransactionMode)
190-
.add("outputTransactionMode", outputTransactionMode)
191-
.add("filter", filter)
192-
.toString();
193-
}
194183
}

core/src/main/java/cz/o2/proxima/core/util/Classpath.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ public static <T> Class<? extends T> findClass(String name, Class<T> superClass)
5858
return clz;
5959
}
6060
}
61-
throw new RuntimeException("Cannot find class " + name);
61+
throw new RuntimeException(
62+
String.format(
63+
"Cannot find class %s using %s", name, Thread.currentThread().getContextClassLoader()));
6264
}
6365

6466
@SuppressWarnings("unchecked")

core/src/test/java/cz/o2/proxima/core/repository/TransformationDescriptorTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
package cz.o2.proxima.core.repository;
1717

1818
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.mock;
1920

21+
import cz.o2.proxima.core.storage.PassthroughFilter;
22+
import cz.o2.proxima.core.transform.Transformation;
2023
import cz.o2.proxima.typesafe.config.ConfigFactory;
2124
import java.util.Map;
2225
import org.junit.Test;
@@ -31,4 +34,20 @@ public void testDefaultNamingOfTransformationConsumers() {
3134
transformations.forEach(
3235
(name, t) -> assertEquals("transformer-" + name, t.getConsumerNameFactory().apply()));
3336
}
37+
38+
@Test
39+
public void testConfigurableNamingConvention() {
40+
Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve());
41+
TransformationDescriptor desc =
42+
TransformationDescriptor.newBuilder()
43+
.setCfg(
44+
Map.of(DefaultConsumerNameFactory.CFG_TRANSFORMER_CONSUMER_NAME_PREFIX, "prefix-"))
45+
.setName("name")
46+
.addAttributes(repo.getEntity("gateway").getAttribute("status"))
47+
.setTransformation(mock(Transformation.class))
48+
.setFilter(new PassthroughFilter())
49+
.build();
50+
desc.getConsumerNameFactory().setup(desc);
51+
assertEquals("prefix-transformer-name", desc.getConsumerNameFactory().apply());
52+
}
3453
}

direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/ReplicationController.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,12 @@ public static void main(String[] args) throws Throwable {
8181
} else {
8282
repo = Repository.of(ConfigFactory.parseFile(new File(args[0])).resolve());
8383
}
84-
ReplicationController.of(repo).runReplicationThreads().get();
84+
try {
85+
ReplicationController.of(repo).runReplicationThreads().get();
86+
} catch (Throwable err) {
87+
log.error("Error running replication controller.", err);
88+
System.exit(1);
89+
}
8590
}
8691

8792
/**
@@ -225,7 +230,9 @@ public CompletableFuture<Void> runReplicationThreads() {
225230
});
226231

227232
// execute transformer threads
228-
repository.getTransformations().forEach(this::runTransformer);
233+
Map<String, TransformationDescriptor> transformations = repository.getTransformations();
234+
log.info("Starting transformations {}", transformations);
235+
transformations.forEach(this::runTransformer);
229236

230237
scheduler.scheduleAtFixedRate(this::checkLiveness, 0, 1, TimeUnit.SECONDS);
231238

@@ -357,7 +364,7 @@ private void consumeLog(
357364
private void runTransformer(String name, TransformationDescriptor transform) {
358365
if (transform.getInputTransactionMode() == InputTransactionMode.TRANSACTIONAL) {
359366
log.info(
360-
"Skipping run of transformation {} which read from transactional attributes {}. "
367+
"Skipping run of transformation {} which reads from transactional attributes {}. "
361368
+ "Will be executed during transaction commit.",
362369
name,
363370
transform.getAttributes());

flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ void finishAndMarkAsIdle(SourceContext<?> sourceContext) {
303303

304304
@Override
305305
public void cancel() {
306+
Optional.ofNullable(observeHandle).ifPresent(UnifiedObserveHandle::close);
306307
cancelled.countDown();
307308
}
308309

flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,11 @@ UnifiedObserveHandle<Offset> observePartitions(
111111
List<Partition> partitions,
112112
List<AttributeDescriptor<?>> attributeDescriptors,
113113
LogObserver<OutputT> observer) {
114+
114115
final ObserveHandle commitLogHandle =
115116
reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
116-
return new UnifiedObserveHandle<Offset>() {
117+
118+
return new UnifiedObserveHandle<>() {
117119

118120
@Override
119121
public List<Offset> getConsumedOffsets() {

flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
4747
import org.junit.jupiter.api.Assertions;
4848
import org.junit.jupiter.api.Test;
49+
import org.junit.jupiter.api.Timeout;
4950

5051
class CommitLogSourceFunctionTest {
5152

@@ -93,6 +94,7 @@ private static StreamElement newData(
9394
}
9495

9596
@Test
97+
@Timeout(60)
9698
void testRunAndClose() throws Exception {
9799
final Repository repository = Repository.ofTest(ConfigFactory.parseString(MODEL));
98100
final AttributeDescriptor<?> attribute = repository.getEntity("test").getAttribute("data");

0 commit comments

Comments
 (0)