Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,20 +102,30 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {

private static PortableMetrics convertMonitoringInfosToMetricResults(
JobApi.MetricResults jobMetrics) {
List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
// TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and Committed metrics
monitoringInfoList.addAll(jobMetrics.getAttemptedList());
monitoringInfoList.addAll(jobMetrics.getCommittedList());
Iterable<MetricResult<Long>> countersFromJobMetrics =
extractCountersFromJobMetrics(monitoringInfoList);
// Deduplicate attempted + committed. Committed wins.
LinkedHashMap<String, MiAndCommitted> infoMap = new LinkedHashMap<>();

for (MetricsApi.MonitoringInfo attempted : jobMetrics.getAttemptedList()) {
String key = monitoringInfoKey(attempted);
infoMap.putIfAbsent(key, new MiAndCommitted(attempted, false));
}

for (MetricsApi.MonitoringInfo committed : jobMetrics.getCommittedList()) {
String key = monitoringInfoKey(committed);
infoMap.put(key, new MiAndCommitted(committed, true));
}

List<MiAndCommitted> merged = new ArrayList<>(infoMap.values());

Iterable<MetricResult<Long>> countersFromJobMetrics = extractCountersFromJobMetrics(merged);
Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
extractDistributionMetricsFromJobMetrics(monitoringInfoList);
extractDistributionMetricsFromJobMetrics(merged);
Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
extractGaugeMetricsFromJobMetrics(monitoringInfoList);
extractGaugeMetricsFromJobMetrics(merged);
Iterable<MetricResult<StringSetResult>> stringSetFromMetrics =
extractStringSetMetricsFromJobMetrics(monitoringInfoList);
extractStringSetMetricsFromJobMetrics(merged);
Iterable<MetricResult<BoundedTrieResult>> boundedTrieFromMetrics =
extractBoundedTrieMetricsFromJobMetrics(monitoringInfoList);
extractBoundedTrieMetricsFromJobMetrics(merged);
return new PortableMetrics(
countersFromJobMetrics,
distributionsFromMetrics,
Expand All @@ -123,26 +134,52 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
boundedTrieFromMetrics);
}

/**
* Build a stable deduplication key for a MonitoringInfo based on type and the metric identity
* labels.
*/
private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
StringBuilder sb = new StringBuilder();
sb.append(mi.getType()).append('|');
Map<String, String> labels = mi.getLabelsMap();
// Use canonical labels that form the metric identity
sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
return sb.toString();
}

private static class MiAndCommitted {
final MetricsApi.MonitoringInfo mi;
final boolean committed;

MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
this.mi = mi;
this.committed = committed;
}
}

private static Iterable<MetricResult<DistributionResult>>
extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> monitoringInfoList) {
extractDistributionMetricsFromJobMetrics(List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
.filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType()))
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(m -> convertDistributionMonitoringInfoToDistribution(m))
.collect(Collectors.toList());
}

private static Iterable<MetricResult<GaugeResult>> extractGaugeMetricsFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
.filter(m -> LATEST_INT64_TYPE.equals(m.mi.getType()))
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(m -> convertGaugeMonitoringInfoToGauge(m))
.collect(Collectors.toList());
}

private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
MetricsApi.MonitoringInfo monitoringInfo) {
private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(MiAndCommitted m) {
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -151,29 +188,31 @@ private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(

GaugeData data = decodeInt64Gauge(monitoringInfo.getPayload());
GaugeResult result = GaugeResult.create(data.value(), data.timestamp());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static Iterable<MetricResult<StringSetResult>> extractStringSetMetricsFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.filter(item -> SET_STRING_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertStringSetMonitoringInfoToStringSet)
.filter(m -> SET_STRING_TYPE.equals(m.mi.getType()))
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(m -> convertStringSetMonitoringInfoToStringSet(m))
.collect(Collectors.toList());
}

private static Iterable<MetricResult<BoundedTrieResult>> extractBoundedTrieMetricsFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie)
.filter(m -> BOUNDED_TRIE_TYPE.equals(m.mi.getType()))
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(m -> convertBoundedTrieMonitoringInfoToBoundedTrie(m))
.collect(Collectors.toList());
}

private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStringSet(
MetricsApi.MonitoringInfo monitoringInfo) {
MiAndCommitted m) {
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -182,11 +221,13 @@ private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStr

StringSetData data = decodeStringSet(monitoringInfo.getPayload());
StringSetResult result = StringSetResult.create(data.stringSet());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoToBoundedTrie(
MetricsApi.MonitoringInfo monitoringInfo) {
MiAndCommitted m) {
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -195,11 +236,13 @@ private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoT

BoundedTrieData data = decodeBoundedTrie(monitoringInfo.getPayload());
BoundedTrieResult result = BoundedTrieResult.create(data.extractResult().getResult());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static MetricResult<DistributionResult> convertDistributionMonitoringInfoToDistribution(
MetricsApi.MonitoringInfo monitoringInfo) {
MiAndCommitted m) {
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
Expand All @@ -208,27 +251,26 @@ private static MetricResult<DistributionResult> convertDistributionMonitoringInf
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
DistributionResult result =
DistributionResult.create(data.sum(), data.count(), data.min(), data.max());
return MetricResult.create(key, false, result);
return MetricResult.create(key, isCommitted, result);
}

private static Iterable<MetricResult<Long>> extractCountersFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
.filter(item -> SUM_INT64_TYPE.equals(item.getType()))
.filter(
item ->
item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter out pcollection metrics
.map(PortableMetrics::convertCounterMonitoringInfoToCounter)
.filter(m -> SUM_INT64_TYPE.equals(m.mi.getType()))
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(m -> convertCounterMonitoringInfoToCounter(m))
.collect(Collectors.toList());
}

private static MetricResult<Long> convertCounterMonitoringInfoToCounter(
MetricsApi.MonitoringInfo counterMonInfo) {
private static MetricResult<Long> convertCounterMonitoringInfoToCounter(MiAndCommitted m) {
MetricsApi.MonitoringInfo counterMonInfo = m.mi;
boolean isCommitted = m.committed;
Map<String, String> labelsMap = counterMonInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
labelsMap.get(STEP_NAME_LABEL),
MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL)));
return MetricResult.create(key, false, decodeInt64Counter(counterMonInfo.getPayload()));
return MetricResult.create(key, isCommitted, decodeInt64Counter(counterMonInfo.getPayload()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,52 @@ public void removeStagedArtifacts(String stagingToken) {}
server.start();
}

@Test
public void deduplicatesAttemptedAndCommittedMetrics() throws Exception {
Map<String, String> labelMap = new HashMap<>();
labelMap.put(NAMESPACE_LABEL, NAMESPACE);
labelMap.put(METRIC_NAME_LABEL, METRIC_NAME);
labelMap.put(STEP_NAME_LABEL, STEP_NAME);

// attempted counter (value 7) and committed counter (value 10) with same identity
MetricsApi.MonitoringInfo attemptedCounter =
MetricsApi.MonitoringInfo.newBuilder()
.setType(COUNTER_TYPE)
.putAllLabels(labelMap)
.setPayload(encodeInt64Counter(7L))
.build();

MetricsApi.MonitoringInfo committedCounter =
MetricsApi.MonitoringInfo.newBuilder()
.setType(COUNTER_TYPE)
.putAllLabels(labelMap)
.setPayload(encodeInt64Counter(10L))
.build();

JobApi.MetricResults metricResults =
JobApi.MetricResults.newBuilder()
.addAttempted(attemptedCounter)
.addCommitted(committedCounter)
.build();

createJobServer(JobState.Enum.DONE, metricResults);
PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess());
PipelineResult result = runner.run(p);
result.waitUntilFinish();

Iterable<org.apache.beam.sdk.metrics.MetricResult<Long>> counters =
result.metrics().allMetrics().getCounters();
ImmutableList<org.apache.beam.sdk.metrics.MetricResult<Long>> list =
ImmutableList.copyOf(counters);

// Only one MetricResult should be present for the same identity.
assertThat(list.size(), is(1));
org.apache.beam.sdk.metrics.MetricResult<Long> r = list.get(0);

// Committed value should be present and equal to the committed payload (10).
assertThat(r.getCommitted(), is(10L));
}

private static PipelineOptions createPipelineOptions() {
PortablePipelineOptions options =
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);
Expand Down
Loading