Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,20 +102,30 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {

private static PortableMetrics convertMonitoringInfosToMetricResults(
JobApi.MetricResults jobMetrics) {
List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
// TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and Committed metrics
monitoringInfoList.addAll(jobMetrics.getAttemptedList());
monitoringInfoList.addAll(jobMetrics.getCommittedList());
Iterable<MetricResult<Long>> countersFromJobMetrics =
extractCountersFromJobMetrics(monitoringInfoList);
// Deduplicate attempted + committed. Committed wins.
LinkedHashMap<String, MiAndCommitted> infoMap = new LinkedHashMap<>();

for (MetricsApi.MonitoringInfo attempted : jobMetrics.getAttemptedList()) {
String key = monitoringInfoKey(attempted);
infoMap.putIfAbsent(key, new MiAndCommitted(attempted, false));
}

for (MetricsApi.MonitoringInfo committed : jobMetrics.getCommittedList()) {
String key = monitoringInfoKey(committed);
infoMap.put(key, new MiAndCommitted(committed, true));
}

List<MiAndCommitted> merged = new ArrayList<>(infoMap.values());

Iterable<MetricResult<Long>> countersFromJobMetrics = extractCountersFromJobMetrics(merged);
Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
extractDistributionMetricsFromJobMetrics(monitoringInfoList);
extractDistributionMetricsFromJobMetrics(merged);
Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
extractGaugeMetricsFromJobMetrics(monitoringInfoList);
extractGaugeMetricsFromJobMetrics(merged);
Iterable<MetricResult<StringSetResult>> stringSetFromMetrics =
extractStringSetMetricsFromJobMetrics(monitoringInfoList);
extractStringSetMetricsFromJobMetrics(merged);
Iterable<MetricResult<BoundedTrieResult>> boundedTrieFromMetrics =
extractBoundedTrieMetricsFromJobMetrics(monitoringInfoList);
extractBoundedTrieMetricsFromJobMetrics(merged);
return new PortableMetrics(
countersFromJobMetrics,
distributionsFromMetrics,
Expand All @@ -123,26 +134,61 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
boundedTrieFromMetrics);
}

/**
* Build a stable deduplication key for a MonitoringInfo based on type and the metric identity
* labels.
*/
private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
StringBuilder sb = new StringBuilder();
sb.append(mi.getType()).append('|');
Map<String, String> labels = mi.getLabelsMap();
// Use canonical labels that form the metric identity
sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
return sb.toString();
}

private static class MiAndCommitted {
final MetricsApi.MonitoringInfo mi;
final boolean committed;

MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
this.mi = mi;
this.committed = committed;
}
}

private static Iterable<MetricResult<DistributionResult>>
extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> monitoringInfoList) {
extractDistributionMetricsFromJobMetrics(List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.map(m -> m.mi)
.filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
.map(
item -> {
boolean isCommitted = findCommittedFlag(monitoringInfoList, item);
return convertDistributionMonitoringInfoToDistribution(item, isCommitted);
})
.collect(Collectors.toList());
}

private static Iterable<MetricResult<GaugeResult>> extractGaugeMetricsFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.map(m -> m.mi)
.filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
.map(
item -> {
boolean isCommitted = findCommittedFlag(monitoringInfoList, item);
return convertGaugeMonitoringInfoToGauge(item, isCommitted);
})
.collect(Collectors.toList());
}

private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
MetricsApi.MonitoringInfo monitoringInfo) {
MetricsApi.MonitoringInfo monitoringInfo, boolean isCommitted) {
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -151,29 +197,39 @@ private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(

GaugeData data = decodeInt64Gauge(monitoringInfo.getPayload());
GaugeResult result = GaugeResult.create(data.value(), data.timestamp());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static Iterable<MetricResult<StringSetResult>> extractStringSetMetricsFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.map(m -> m.mi)
.filter(item -> SET_STRING_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertStringSetMonitoringInfoToStringSet)
.map(
item -> {
boolean isCommitted = findCommittedFlag(monitoringInfoList, item);
return convertStringSetMonitoringInfoToStringSet(item, isCommitted);
})
.collect(Collectors.toList());
}

private static Iterable<MetricResult<BoundedTrieResult>> extractBoundedTrieMetricsFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.map(m -> m.mi)
.filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie)
.map(
item -> {
boolean isCommitted = findCommittedFlag(monitoringInfoList, item);
return convertBoundedTrieMonitoringInfoToBoundedTrie(item, isCommitted);
})
.collect(Collectors.toList());
}

private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStringSet(
MetricsApi.MonitoringInfo monitoringInfo) {
MetricsApi.MonitoringInfo monitoringInfo, boolean isCommitted) {
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -182,11 +238,11 @@ private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStr

StringSetData data = decodeStringSet(monitoringInfo.getPayload());
StringSetResult result = StringSetResult.create(data.stringSet());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoToBoundedTrie(
MetricsApi.MonitoringInfo monitoringInfo) {
MetricsApi.MonitoringInfo monitoringInfo, boolean isCommitted) {
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -195,11 +251,11 @@ private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoT

BoundedTrieData data = decodeBoundedTrie(monitoringInfo.getPayload());
BoundedTrieResult result = BoundedTrieResult.create(data.extractResult().getResult());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static MetricResult<DistributionResult> convertDistributionMonitoringInfoToDistribution(
MetricsApi.MonitoringInfo monitoringInfo) {
MetricsApi.MonitoringInfo monitoringInfo, boolean isCommitted) {
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -208,27 +264,45 @@ private static MetricResult<DistributionResult> convertDistributionMonitoringInf
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
DistributionResult result =
DistributionResult.create(data.sum(), data.count(), data.min(), data.max());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static Iterable<MetricResult<Long>> extractCountersFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.map(m -> m.mi)
.filter(item -> SUM_INT64_TYPE.equals(item.getType()))
.filter(
item ->
item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter out pcollection metrics
.map(PortableMetrics::convertCounterMonitoringInfoToCounter)
.map(
item -> {
boolean isCommitted = findCommittedFlag(monitoringInfoList, item);
return convertCounterMonitoringInfoToCounter(item, isCommitted);
})
.collect(Collectors.toList());
}

private static MetricResult<Long> convertCounterMonitoringInfoToCounter(
MetricsApi.MonitoringInfo counterMonInfo) {
MetricsApi.MonitoringInfo counterMonInfo, boolean isCommitted) {
Map<String, String> labelsMap = counterMonInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
labelsMap.get(STEP_NAME_LABEL),
MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL)));
return MetricResult.create(key, false, decodeInt64Counter(counterMonInfo.getPayload()));
return MetricResult.create(key, isCommitted, decodeInt64Counter(counterMonInfo.getPayload()));
}

/** Helper to retrieve the committed flag for a MonitoringInfo from the merged list. */
private static boolean findCommittedFlag(
List<MiAndCommitted> merged, MetricsApi.MonitoringInfo mi) {
// Reconstruct the key and look up in the merged map entries.
String key = monitoringInfoKey(mi);
for (MiAndCommitted entry : merged) {
if (monitoringInfoKey(entry.mi).equals(key)) {
return entry.committed;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,52 @@ public void removeStagedArtifacts(String stagingToken) {}
server.start();
}

@Test
public void deduplicatesAttemptedAndCommittedMetrics() throws Exception {
Map<String, String> labelMap = new HashMap<>();
labelMap.put(NAMESPACE_LABEL, NAMESPACE);
labelMap.put(METRIC_NAME_LABEL, METRIC_NAME);
labelMap.put(STEP_NAME_LABEL, STEP_NAME);

// attempted counter (value 7) and committed counter (value 10) with same identity
MetricsApi.MonitoringInfo attemptedCounter =
MetricsApi.MonitoringInfo.newBuilder()
.setType(COUNTER_TYPE)
.putAllLabels(labelMap)
.setPayload(encodeInt64Counter(7L))
.build();

MetricsApi.MonitoringInfo committedCounter =
MetricsApi.MonitoringInfo.newBuilder()
.setType(COUNTER_TYPE)
.putAllLabels(labelMap)
.setPayload(encodeInt64Counter(10L))
.build();

JobApi.MetricResults metricResults =
JobApi.MetricResults.newBuilder()
.addAttempted(attemptedCounter)
.addCommitted(committedCounter)
.build();

createJobServer(JobState.Enum.DONE, metricResults);
PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess());
PipelineResult result = runner.run(p);
result.waitUntilFinish();

Iterable<org.apache.beam.sdk.metrics.MetricResult<Long>> counters =
result.metrics().allMetrics().getCounters();
ImmutableList<org.apache.beam.sdk.metrics.MetricResult<Long>> list =
ImmutableList.copyOf(counters);

// Only one MetricResult should be present for the same identity.
assertThat(list.size(), is(1));
org.apache.beam.sdk.metrics.MetricResult<Long> r = list.get(0);

// Committed value should be present and equal to the committed payload (10).
assertThat(r.getCommitted(), is(10L));
}

private static PipelineOptions createPipelineOptions() {
PortablePipelineOptions options =
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);
Expand Down
Loading