Skip to content

Commit 7b13d42

Browse files
authored
Deduplicate MonitoringInfo in PortableMetrics (#37066)
* deduplicate * addressing gemini comments * changes * fixes * use correct method * fix * spotless fix
1 parent f7a9b26 commit 7b13d42

File tree

2 files changed

+132
-44
lines changed

2 files changed

+132
-44
lines changed

runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java

Lines changed: 86 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.Collections;
33+
import java.util.LinkedHashMap;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.stream.Collectors;
@@ -101,20 +102,30 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
101102

102103
private static PortableMetrics convertMonitoringInfosToMetricResults(
103104
JobApi.MetricResults jobMetrics) {
104-
List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
105-
// TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and Committed metrics
106-
monitoringInfoList.addAll(jobMetrics.getAttemptedList());
107-
monitoringInfoList.addAll(jobMetrics.getCommittedList());
108-
Iterable<MetricResult<Long>> countersFromJobMetrics =
109-
extractCountersFromJobMetrics(monitoringInfoList);
105+
// Deduplicate attempted + committed. Committed wins.
106+
LinkedHashMap<String, MiAndCommitted> infoMap = new LinkedHashMap<>();
107+
108+
for (MetricsApi.MonitoringInfo attempted : jobMetrics.getAttemptedList()) {
109+
String key = monitoringInfoKey(attempted);
110+
infoMap.putIfAbsent(key, new MiAndCommitted(attempted, false));
111+
}
112+
113+
for (MetricsApi.MonitoringInfo committed : jobMetrics.getCommittedList()) {
114+
String key = monitoringInfoKey(committed);
115+
infoMap.put(key, new MiAndCommitted(committed, true));
116+
}
117+
118+
List<MiAndCommitted> merged = new ArrayList<>(infoMap.values());
119+
120+
Iterable<MetricResult<Long>> countersFromJobMetrics = extractCountersFromJobMetrics(merged);
110121
Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
111-
extractDistributionMetricsFromJobMetrics(monitoringInfoList);
122+
extractDistributionMetricsFromJobMetrics(merged);
112123
Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
113-
extractGaugeMetricsFromJobMetrics(monitoringInfoList);
124+
extractGaugeMetricsFromJobMetrics(merged);
114125
Iterable<MetricResult<StringSetResult>> stringSetFromMetrics =
115-
extractStringSetMetricsFromJobMetrics(monitoringInfoList);
126+
extractStringSetMetricsFromJobMetrics(merged);
116127
Iterable<MetricResult<BoundedTrieResult>> boundedTrieFromMetrics =
117-
extractBoundedTrieMetricsFromJobMetrics(monitoringInfoList);
128+
extractBoundedTrieMetricsFromJobMetrics(merged);
118129
return new PortableMetrics(
119130
countersFromJobMetrics,
120131
distributionsFromMetrics,
@@ -123,26 +134,52 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
123134
boundedTrieFromMetrics);
124135
}
125136

137+
/**
138+
* Build a stable deduplication key for a MonitoringInfo based on type and the metric identity
139+
* labels.
140+
*/
141+
private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
142+
StringBuilder sb = new StringBuilder();
143+
sb.append(mi.getType()).append('|');
144+
Map<String, String> labels = mi.getLabelsMap();
145+
// Use canonical labels that form the metric identity
146+
sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
147+
sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
148+
sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
149+
return sb.toString();
150+
}
151+
152+
private static class MiAndCommitted {
153+
final MetricsApi.MonitoringInfo mi;
154+
final boolean committed;
155+
156+
MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
157+
this.mi = mi;
158+
this.committed = committed;
159+
}
160+
}
161+
126162
private static Iterable<MetricResult<DistributionResult>>
127-
extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> monitoringInfoList) {
163+
extractDistributionMetricsFromJobMetrics(List<MiAndCommitted> monitoringInfoList) {
128164
return monitoringInfoList.stream()
129-
.filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
130-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
131-
.map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
165+
.filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType()))
166+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
167+
.map(m -> convertDistributionMonitoringInfoToDistribution(m))
132168
.collect(Collectors.toList());
133169
}
134170

135171
private static Iterable<MetricResult<GaugeResult>> extractGaugeMetricsFromJobMetrics(
136-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
172+
List<MiAndCommitted> monitoringInfoList) {
137173
return monitoringInfoList.stream()
138-
.filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
139-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
140-
.map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
174+
.filter(m -> LATEST_INT64_TYPE.equals(m.mi.getType()))
175+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
176+
.map(m -> convertGaugeMonitoringInfoToGauge(m))
141177
.collect(Collectors.toList());
142178
}
143179

144-
private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
145-
MetricsApi.MonitoringInfo monitoringInfo) {
180+
private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(MiAndCommitted m) {
181+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
182+
boolean isCommitted = m.committed;
146183
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
147184
MetricKey key =
148185
MetricKey.create(
@@ -151,29 +188,31 @@ private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
151188

152189
GaugeData data = decodeInt64Gauge(monitoringInfo.getPayload());
153190
GaugeResult result = GaugeResult.create(data.value(), data.timestamp());
154-
return MetricResult.create(key, false, result);
191+
return MetricResult.create(key, isCommitted, result);
155192
}
156193

157194
private static Iterable<MetricResult<StringSetResult>> extractStringSetMetricsFromJobMetrics(
158-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
195+
List<MiAndCommitted> monitoringInfoList) {
159196
return monitoringInfoList.stream()
160-
.filter(item -> SET_STRING_TYPE.equals(item.getType()))
161-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
162-
.map(PortableMetrics::convertStringSetMonitoringInfoToStringSet)
197+
.filter(m -> SET_STRING_TYPE.equals(m.mi.getType()))
198+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
199+
.map(m -> convertStringSetMonitoringInfoToStringSet(m))
163200
.collect(Collectors.toList());
164201
}
165202

166203
private static Iterable<MetricResult<BoundedTrieResult>> extractBoundedTrieMetricsFromJobMetrics(
167-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
204+
List<MiAndCommitted> monitoringInfoList) {
168205
return monitoringInfoList.stream()
169-
.filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType()))
170-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
171-
.map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie)
206+
.filter(m -> BOUNDED_TRIE_TYPE.equals(m.mi.getType()))
207+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
208+
.map(m -> convertBoundedTrieMonitoringInfoToBoundedTrie(m))
172209
.collect(Collectors.toList());
173210
}
174211

175212
private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStringSet(
176-
MetricsApi.MonitoringInfo monitoringInfo) {
213+
MiAndCommitted m) {
214+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
215+
boolean isCommitted = m.committed;
177216
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
178217
MetricKey key =
179218
MetricKey.create(
@@ -182,11 +221,13 @@ private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStr
182221

183222
StringSetData data = decodeStringSet(monitoringInfo.getPayload());
184223
StringSetResult result = StringSetResult.create(data.stringSet());
185-
return MetricResult.create(key, false, result);
224+
return MetricResult.create(key, isCommitted, result);
186225
}
187226

188227
private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoToBoundedTrie(
189-
MetricsApi.MonitoringInfo monitoringInfo) {
228+
MiAndCommitted m) {
229+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
230+
boolean isCommitted = m.committed;
190231
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
191232
MetricKey key =
192233
MetricKey.create(
@@ -195,11 +236,13 @@ private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoT
195236

196237
BoundedTrieData data = decodeBoundedTrie(monitoringInfo.getPayload());
197238
BoundedTrieResult result = BoundedTrieResult.create(data.extractResult().getResult());
198-
return MetricResult.create(key, false, result);
239+
return MetricResult.create(key, isCommitted, result);
199240
}
200241

201242
private static MetricResult<DistributionResult> convertDistributionMonitoringInfoToDistribution(
202-
MetricsApi.MonitoringInfo monitoringInfo) {
243+
MiAndCommitted m) {
244+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
245+
boolean isCommitted = m.committed;
203246
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
204247
MetricKey key =
205248
MetricKey.create(
@@ -208,27 +251,26 @@ private static MetricResult<DistributionResult> convertDistributionMonitoringInf
208251
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
209252
DistributionResult result =
210253
DistributionResult.create(data.sum(), data.count(), data.min(), data.max());
211-
return MetricResult.create(key, false, result);
254+
return MetricResult.create(key, isCommitted, result);
212255
}
213256

214257
private static Iterable<MetricResult<Long>> extractCountersFromJobMetrics(
215-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
258+
List<MiAndCommitted> monitoringInfoList) {
216259
return monitoringInfoList.stream()
217-
.filter(item -> SUM_INT64_TYPE.equals(item.getType()))
218-
.filter(
219-
item ->
220-
item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter out pcollection metrics
221-
.map(PortableMetrics::convertCounterMonitoringInfoToCounter)
260+
.filter(m -> SUM_INT64_TYPE.equals(m.mi.getType()))
261+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
262+
.map(m -> convertCounterMonitoringInfoToCounter(m))
222263
.collect(Collectors.toList());
223264
}
224265

225-
private static MetricResult<Long> convertCounterMonitoringInfoToCounter(
226-
MetricsApi.MonitoringInfo counterMonInfo) {
266+
private static MetricResult<Long> convertCounterMonitoringInfoToCounter(MiAndCommitted m) {
267+
MetricsApi.MonitoringInfo counterMonInfo = m.mi;
268+
boolean isCommitted = m.committed;
227269
Map<String, String> labelsMap = counterMonInfo.getLabelsMap();
228270
MetricKey key =
229271
MetricKey.create(
230272
labelsMap.get(STEP_NAME_LABEL),
231273
MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL)));
232-
return MetricResult.create(key, false, decodeInt64Counter(counterMonInfo.getPayload()));
274+
return MetricResult.create(key, isCommitted, decodeInt64Counter(counterMonInfo.getPayload()));
233275
}
234276
}

runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,52 @@ public void removeStagedArtifacts(String stagingToken) {}
222222
server.start();
223223
}
224224

225+
@Test
226+
public void deduplicatesAttemptedAndCommittedMetrics() throws Exception {
227+
Map<String, String> labelMap = new HashMap<>();
228+
labelMap.put(NAMESPACE_LABEL, NAMESPACE);
229+
labelMap.put(METRIC_NAME_LABEL, METRIC_NAME);
230+
labelMap.put(STEP_NAME_LABEL, STEP_NAME);
231+
232+
// attempted counter (value 7) and committed counter (value 10) with same identity
233+
MetricsApi.MonitoringInfo attemptedCounter =
234+
MetricsApi.MonitoringInfo.newBuilder()
235+
.setType(COUNTER_TYPE)
236+
.putAllLabels(labelMap)
237+
.setPayload(encodeInt64Counter(7L))
238+
.build();
239+
240+
MetricsApi.MonitoringInfo committedCounter =
241+
MetricsApi.MonitoringInfo.newBuilder()
242+
.setType(COUNTER_TYPE)
243+
.putAllLabels(labelMap)
244+
.setPayload(encodeInt64Counter(10L))
245+
.build();
246+
247+
JobApi.MetricResults metricResults =
248+
JobApi.MetricResults.newBuilder()
249+
.addAttempted(attemptedCounter)
250+
.addCommitted(committedCounter)
251+
.build();
252+
253+
createJobServer(JobState.Enum.DONE, metricResults);
254+
PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess());
255+
PipelineResult result = runner.run(p);
256+
result.waitUntilFinish();
257+
258+
Iterable<org.apache.beam.sdk.metrics.MetricResult<Long>> counters =
259+
result.metrics().allMetrics().getCounters();
260+
ImmutableList<org.apache.beam.sdk.metrics.MetricResult<Long>> list =
261+
ImmutableList.copyOf(counters);
262+
263+
// Only one MetricResult should be present for the same identity.
264+
assertThat(list.size(), is(1));
265+
org.apache.beam.sdk.metrics.MetricResult<Long> r = list.get(0);
266+
267+
// Committed value should be present and equal to the committed payload (10).
268+
assertThat(r.getCommitted(), is(10L));
269+
}
270+
225271
private static PipelineOptions createPipelineOptions() {
226272
PortablePipelineOptions options =
227273
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);

0 commit comments

Comments
 (0)