From 58ac674ef1e69dea9647a1f3265015e1d3a29b81 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 9 Oct 2025 16:37:04 +1100 Subject: [PATCH 01/10] Aggregation Mode: Distribution addition --- src/transforms/aggregate.rs | 180 +++++++++++++++++++++++++++++++++--- 1 file changed, 169 insertions(+), 11 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 6c3f522f10a27..f11cbcf1213b7 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -11,7 +11,7 @@ use vector_lib::{ configurable::configurable_component, event::{ MetricValue, - metric::{Metric, MetricData, MetricKind, MetricSeries}, + metric::{Metric, MetricData, MetricKind, MetricSeries, Sample, StatisticKind}, }, }; @@ -73,6 +73,9 @@ pub enum AggregationMode { /// Stdev value of absolute metric, ignores incremental Stdev, + + /// Aggregates absolute metrics into a distribution, ignores incremental + Distribution, } const fn default_mode() -> AggregationMode { @@ -149,18 +152,17 @@ impl Aggregate { AggregationMode::Max | AggregationMode::Min => { self.record_comparison(series, data, metadata) } - AggregationMode::Mean | AggregationMode::Stdev => match data.kind { + AggregationMode::Mean | AggregationMode::Stdev | AggregationMode::Distribution => match data.kind { MetricKind::Incremental => (), MetricKind::Absolute => { - if matches!(data.value, MetricValue::Gauge { value: _ }) { - match self.multi_map.entry(series) { - Entry::Occupied(mut entry) => { - let existing = entry.get_mut(); - existing.push((data, metadata)); - } - Entry::Vacant(entry) => { - entry.insert(vec![(data, metadata)]); - } + // For Distribution mode, we accept any metric value type + match self.multi_map.entry(series) { + Entry::Occupied(mut entry) => { + let existing = entry.get_mut(); + existing.push((data, metadata)); + } + Entry::Vacant(entry) => { + entry.insert(vec![(data, metadata)]); } } } @@ -313,6 +315,11 @@ impl Aggregate { let metric = Metric::from_parts(series, final_stdev, final_metadata); output.push(Event::Metric(metric)); } + AggregationMode::Distribution => { + let distribution_data = self.create_distribution_from_entries(&entries); + let metric = Metric::from_parts(series, distribution_data, final_metadata); + output.push(Event::Metric(metric)); + } _ => (), } } @@ -320,6 +327,74 @@ impl Aggregate { self.prev_map = map; emit!(AggregateFlushed); } + + fn create_distribution_from_entries(&self, entries: &[MetricEntry]) -> MetricData { + let mut samples = Vec::new(); + + for (data, _) in entries { + match data.value() { + MetricValue::Gauge { value } => { + samples.push(Sample { + value: *value, + rate: 1, + }); + } + MetricValue::Counter { value } => { + samples.push(Sample { + value: *value, + rate: 1, + }); + } + MetricValue::Set { values } => { + // For sets, create a sample for each unique value with rate 1 + for _value in values { + samples.push(Sample { + value: 1.0, // Count each set element as 1 + rate: 1, + }); + } + } + MetricValue::Distribution { samples: dist_samples, .. } => { + // If already a distribution, merge the samples + samples.extend(dist_samples.clone()); + } + MetricValue::AggregatedHistogram { count, sum, .. } => { + // Convert histogram to samples using average value + if *count > 0 { + let avg_value = sum / (*count as f64); + for _ in 0..*count { + samples.push(Sample { + value: avg_value, + rate: 1, + }); + } + } + } + MetricValue::AggregatedSummary { count, sum, .. } => { + // Convert summary to samples using average value + if *count > 0 { + let avg_value = sum / (*count as f64); + for _ in 0..*count { + samples.push(Sample { + value: avg_value, + rate: 1, + }); + } + } + } + _ => (), + } + } + + MetricData { + time: entries.first().unwrap().0.time, + kind: MetricKind::Absolute, + value: MetricValue::Distribution { + samples, + statistic: StatisticKind::Histogram, + }, + } + } } impl TaskTransform for Aggregate { @@ -1142,4 +1217,87 @@ interval_ms = 999999 }) .await; } + + #[test] + fn absolute_distribution() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let gauge_a_1 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 25.0 }, + ); + let gauge_a_2 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 30.0 }, + ); + let gauge_a_3 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 35.0 }, + ); + let gauge_a_4 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 36.0 }, + ); + let gauge_a_5 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 37.0 }, + ); + let gauge_a_6 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 38.0 }, + ); + let gauge_a_7 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 39.0 }, + ); + let counter_a_1 = make_metric( + "counter_a", + MetricKind::Incremental, + MetricValue::Counter { value: 110.0 }, + ); + // Record four gauge values + agg.record(gauge_a_1); + agg.record(gauge_a_2); + agg.record(gauge_a_3); + agg.record(gauge_a_4); + agg.record(gauge_a_5); + agg.record(gauge_a_6); + agg.record(gauge_a_7); + // validating that if we add a counter it doesn't end up in the distribution. + agg.record(counter_a_1); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Verify it's a distribution with 7 metrics + if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { + assert_eq!(samples.len(), 7); + assert_eq!(*statistic, StatisticKind::Histogram); + + // Check sample values + let values: Vec = samples.iter().map(|s| s.value).collect(); + assert!(values.contains(&25.0)); + assert!(values.contains(&30.0)); + assert!(values.contains(&35.0)); + assert!(values.contains(&36.0)); + assert!(values.contains(&37.0)); + assert!(values.contains(&38.0)); + assert!(values.contains(&39.0)); + assert!(!values.contains(&110.0)); + } else { + panic!("Expected Distribution metric value"); + } + } } From 92878d11855e45a24b29d2d0dafe6f67dc535ab1 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 9 Oct 2025 18:14:23 +1100 Subject: [PATCH 02/10] add test for creating a distribution of distributions --- src/transforms/aggregate.rs | 108 +++++++++++++++++++++++++++++++----- 1 file changed, 93 insertions(+), 15 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index f11cbcf1213b7..a987959e6efad 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -152,21 +152,23 @@ impl Aggregate { AggregationMode::Max | AggregationMode::Min => { self.record_comparison(series, data, metadata) } - AggregationMode::Mean | AggregationMode::Stdev | AggregationMode::Distribution => match data.kind { - MetricKind::Incremental => (), - MetricKind::Absolute => { - // For Distribution mode, we accept any metric value type - match self.multi_map.entry(series) { - Entry::Occupied(mut entry) => { - let existing = entry.get_mut(); - existing.push((data, metadata)); - } - Entry::Vacant(entry) => { - entry.insert(vec![(data, metadata)]); + AggregationMode::Mean | AggregationMode::Stdev | AggregationMode::Distribution => { + match data.kind { + MetricKind::Incremental => (), + MetricKind::Absolute => { + // For Distribution mode, we accept any metric value type + match self.multi_map.entry(series) { + Entry::Occupied(mut entry) => { + let existing = entry.get_mut(); + existing.push((data, metadata)); + } + Entry::Vacant(entry) => { + entry.insert(vec![(data, metadata)]); + } } } } - }, + } } emit!(AggregateEventRecorded); @@ -330,7 +332,7 @@ impl Aggregate { fn create_distribution_from_entries(&self, entries: &[MetricEntry]) -> MetricData { let mut samples = Vec::new(); - + for (data, _) in entries { match data.value() { MetricValue::Gauge { value } => { @@ -354,7 +356,10 @@ impl Aggregate { }); } } - MetricValue::Distribution { samples: dist_samples, .. } => { + MetricValue::Distribution { + samples: dist_samples, + .. + } => { // If already a distribution, merge the samples samples.extend(dist_samples.clone()); } @@ -1283,9 +1288,10 @@ interval_ms = 999999 // Verify it's a distribution with 7 metrics if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { + println!("Samples: {:#?}", samples); assert_eq!(samples.len(), 7); assert_eq!(*statistic, StatisticKind::Histogram); - + // Check sample values let values: Vec = samples.iter().map(|s| s.value).collect(); assert!(values.contains(&25.0)); @@ -1300,4 +1306,76 @@ interval_ms = 999999 panic!("Expected Distribution metric value"); } } + + #[test] + fn absolute_distribution_distributions() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let distribution_a_1 = make_metric( + "dist_a", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![25.0, 30.0, 35.0, 36.0] + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + let distribution_a_2 = make_metric( + "dist_a", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![37.0, 38.0, 39.0] + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + let distribution_a_3 = make_metric( + "dist_a", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![99.0, 113.0, 456.2] + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + // Record four gauge values + agg.record(distribution_a_1); + agg.record(distribution_a_2); + agg.record(distribution_a_3); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Verify it's a distribution with 10 metrics + if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { + assert_eq!(samples.len(), 10, "expected 10 samples in distribution but got {:#?}", samples.len()); + assert_eq!(*statistic, StatisticKind::Histogram); + + // Check sample values + let values: Vec = samples.iter().map(|s| s.value).collect(); + assert!(values.contains(&25.0), "expected value: 25.0 to be present in aggregated value"); + assert!(values.contains(&30.0), "expected value: 30.0 to be present in aggregated value"); + assert!(values.contains(&35.0), "expected value: 35.0 to be present in aggregated value"); + assert!(values.contains(&36.0), "expected value: 36.0 to be present in aggregated value"); + assert!(values.contains(&37.0), "expected value: 37.0 to be present in aggregated value"); + assert!(values.contains(&38.0), "expected value: 38.0 to be present in aggregated value"); + assert!(values.contains(&39.0), "expected value: 39.0 to be present in aggregated value"); + assert!(values.contains(&99.0), "expected value: 99.0 to be present in aggregated value"); + assert!(values.contains(&113.0), "expected value: 113.0 to be present in aggregated value"); + assert!(values.contains(&456.2), "expected value: 456.2 to be present in aggregated value"); + } else { + panic!("Expected Distribution metric value"); + } + } } From 699ecb2678bd1b4e44d7dd9edda92a6cd022209b Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 10 Oct 2025 14:30:16 +1100 Subject: [PATCH 03/10] Update tests, improve logic --- src/transforms/aggregate.rs | 298 +++++++++++++++--- .../components/transforms/aggregate.cue | 4 +- 2 files changed, 257 insertions(+), 45 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index a987959e6efad..8f0f6ba61829c 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -341,19 +341,16 @@ impl Aggregate { rate: 1, }); } - MetricValue::Counter { value } => { - samples.push(Sample { - value: *value, - rate: 1, - }); - } MetricValue::Set { values } => { // For sets, create a sample for each unique value with rate 1 - for _value in values { - samples.push(Sample { - value: 1.0, // Count each set element as 1 - rate: 1, - }); + // sets use strings, that means we need to parse them to f64 + for value in values { + if let Ok(parsed_value) = value.parse::() { + samples.push(Sample { + value: parsed_value, + rate: 1, + }); + } } } MetricValue::Distribution { @@ -363,25 +360,13 @@ impl Aggregate { // If already a distribution, merge the samples samples.extend(dist_samples.clone()); } - MetricValue::AggregatedHistogram { count, sum, .. } => { - // Convert histogram to samples using average value - if *count > 0 { - let avg_value = sum / (*count as f64); - for _ in 0..*count { + MetricValue::AggregatedHistogram { buckets, .. } => { + for bucket in buckets { + // For each count in the bucket, add a sample at the bucket's upper limit + // This preserves the distribution shape + for _ in 0..bucket.count { samples.push(Sample { - value: avg_value, - rate: 1, - }); - } - } - } - MetricValue::AggregatedSummary { count, sum, .. } => { - // Convert summary to samples using average value - if *count > 0 { - let avg_value = sum / (*count as f64); - for _ in 0..*count { - samples.push(Sample { - value: avg_value, + value: bucket.upper_limit, rate: 1, }); } @@ -390,7 +375,11 @@ impl Aggregate { _ => (), } } - + samples.sort_by(|a, b| { + a.value + .partial_cmp(&b.value) + .unwrap_or(std::cmp::Ordering::Equal) + }); MetricData { time: entries.first().unwrap().0.time, kind: MetricKind::Absolute, @@ -458,6 +447,7 @@ mod tests { test_util::components::assert_transform_compliance, transforms::test::create_topology, }; + use vector_lib::event::metric::Bucket; #[test] fn generate_config() { @@ -1288,8 +1278,12 @@ interval_ms = 999999 // Verify it's a distribution with 7 metrics if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { - println!("Samples: {:#?}", samples); - assert_eq!(samples.len(), 7); + assert_eq!( + samples.len(), + 7, + "expected 7 samples in distribution but got {:#?}", + samples.len() + ); assert_eq!(*statistic, StatisticKind::Histogram); // Check sample values @@ -1348,7 +1342,7 @@ interval_ms = 999999 statistic: StatisticKind::Summary, }, ); - // Record four gauge values + // Record four Distribution values agg.record(distribution_a_1); agg.record(distribution_a_2); agg.record(distribution_a_3); @@ -1359,21 +1353,237 @@ interval_ms = 999999 // Verify it's a distribution with 10 metrics if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { - assert_eq!(samples.len(), 10, "expected 10 samples in distribution but got {:#?}", samples.len()); + assert_eq!( + samples.len(), + 10, + "expected 10 samples in distribution but got {:#?}", + samples.len() + ); + assert_eq!(*statistic, StatisticKind::Histogram); + + // Check sample values + let values: Vec = samples.iter().map(|s| s.value).collect(); + assert!(values.contains(&25.0)); + assert!(values.contains(&30.0)); + assert!(values.contains(&35.0)); + assert!(values.contains(&36.0)); + assert!(values.contains(&37.0)); + assert!(values.contains(&38.0)); + assert!(values.contains(&39.0)); + assert!(values.contains(&99.0)); + assert!(values.contains(&113.0)); + assert!(values.contains(&456.2)); + } else { + panic!("Expected Distribution metric value"); + } + } + + #[test] + fn distribution_histogram() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let histogram_a_1 = make_metric( + "histogram_a", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 5, + sum: 18.0, + buckets: vec![ + Bucket { + upper_limit: 1.0, + count: 1, + }, + Bucket { + upper_limit: 2.0, + count: 2, + }, + Bucket { + upper_limit: 5.0, + count: 1, + }, + Bucket { + upper_limit: 10.0, + count: 1, + }, + ], + }, + ); + let histogram_a_2 = make_metric( + "histogram_a", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 5, + sum: 18.0, + buckets: vec![ + Bucket { + upper_limit: 1.0, + count: 1, + }, + Bucket { + upper_limit: 2.0, + count: 2, + }, + Bucket { + upper_limit: 5.0, + count: 1, + }, + Bucket { + upper_limit: 10.0, + count: 1, + }, + ], + }, + ); + let histogram_a_3 = make_metric( + "histogram_a", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 5, + sum: 18.0, + buckets: vec![ + Bucket { + upper_limit: 1.0, + count: 1, + }, + Bucket { + upper_limit: 2.0, + count: 2, + }, + Bucket { + upper_limit: 5.0, + count: 1, + }, + Bucket { + upper_limit: 10.0, + count: 1, + }, + ], + }, + ); + + let expected_vals = vec![ + 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 5.0, 5.0, 5.0, 10.0, 10.0, 10.0, + ]; + + // Record Metrics Values + agg.record(histogram_a_1); + agg.record(histogram_a_2); + agg.record(histogram_a_3); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Verify it's a distribution with 15 metrics + if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { + assert_eq!( + samples.len(), + 15, + "expected 15 samples in distribution but got {:#?}", + samples.len() + ); + assert_eq!(*statistic, StatisticKind::Histogram); + + // Check sample values + let values: Vec = samples.iter().map(|s| s.value).collect(); + assert_eq!(values, expected_vals); + } else { + panic!("Expected Distribution metric value"); + } + } + + #[test] + fn distribution_set() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let set_a_1 = make_metric( + "set_a", + MetricKind::Absolute, + MetricValue::Set { + values: vec![ + "25".into(), + "30".into(), + "35".into(), + "36".into(), + "37".into(), + "38".into(), + "39".into(), + "39".into(), // duplicate to verify set behavior + "99".into(), + "113".into(), + "456.2".into(), + ] + .into_iter() + .collect(), + }, + ); + + let set_a_2 = make_metric( + "set_a", + MetricKind::Absolute, + MetricValue::Set { + values: vec![ + "26".into(), + "31".into(), + "36".into(), + "37".into(), + "38".into(), + "39".into(), + "40".into(), + "40".into(), // duplicate to verify set behavior + "100".into(), + "114".into(), + "556.2".into(), + ] + .into_iter() + .collect(), + }, + ); + // Record two set values + agg.record(set_a_1); + agg.record(set_a_2); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!( + 1, + out.len(), + "expected 1 output metric but got {:#?}", + out.len() + ); + + // Verify it's a distribution with 10 metrics + if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { + assert_eq!( + samples.len(), + 20, + "expected 20 samples in distribution but got {:#?}", + samples.len() + ); assert_eq!(*statistic, StatisticKind::Histogram); // Check sample values let values: Vec = samples.iter().map(|s| s.value).collect(); - assert!(values.contains(&25.0), "expected value: 25.0 to be present in aggregated value"); - assert!(values.contains(&30.0), "expected value: 30.0 to be present in aggregated value"); - assert!(values.contains(&35.0), "expected value: 35.0 to be present in aggregated value"); - assert!(values.contains(&36.0), "expected value: 36.0 to be present in aggregated value"); - assert!(values.contains(&37.0), "expected value: 37.0 to be present in aggregated value"); - assert!(values.contains(&38.0), "expected value: 38.0 to be present in aggregated value"); - assert!(values.contains(&39.0), "expected value: 39.0 to be present in aggregated value"); - assert!(values.contains(&99.0), "expected value: 99.0 to be present in aggregated value"); - assert!(values.contains(&113.0), "expected value: 113.0 to be present in aggregated value"); - assert!(values.contains(&456.2), "expected value: 456.2 to be present in aggregated value"); + assert!(values.contains(&25.0)); + assert!(values.contains(&30.0)); + assert!(values.contains(&35.0)); + assert!(values.contains(&36.0)); + assert!(values.contains(&37.0)); + assert!(values.contains(&38.0)); + assert!(values.contains(&39.0)); + assert!(values.contains(&40.0)); + assert!(values.contains(&99.0)); + assert!(values.contains(&113.0)); + assert!(values.contains(&456.2)); + assert!(values.contains(&556.2)); } else { panic!("Expected Distribution metric value"); } diff --git a/website/cue/reference/components/transforms/aggregate.cue b/website/cue/reference/components/transforms/aggregate.cue index 37980aa252e23..f064117be3665 100644 --- a/website/cue/reference/components/transforms/aggregate.cue +++ b/website/cue/reference/components/transforms/aggregate.cue @@ -175,7 +175,9 @@ components: transforms: aggregate: { aggregated into a single `incremental` `counter` with a value of 23. Two `absolute` `gauge` metrics with values 93 and 95 would result in a single `absolute` `gauge` with the value of 95. More complex types like `distribution`, `histogram`, `set`, and `summary` behave similarly with `incremental` - values being combined in a manner that makes sense based on their type. + values being combined in a manner that makes sense based on their type. When using distribution as the + aggregation mode, the individual samples are collected and their values are combined into a single + sorted distribution representation. """ } From c65f40b049d1d55e435fdd2ca2463f48b36e427a Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 13 Oct 2025 15:11:19 +1100 Subject: [PATCH 04/10] add changelog for aggregate transforms into distributions --- changelog.d/23994_aggregate_transform_enhancement.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/23994_aggregate_transform_enhancement.md diff --git a/changelog.d/23994_aggregate_transform_enhancement.md b/changelog.d/23994_aggregate_transform_enhancement.md new file mode 100644 index 0000000000000..1bbd1a2b41313 --- /dev/null +++ b/changelog.d/23994_aggregate_transform_enhancement.md @@ -0,0 +1,3 @@ +The aggregate transform now supports aggregating metrics into Distribution metric types, enabling statistical analysis of metric values over time windows. + +authors: jlambatl \ No newline at end of file From 45ccfcad23ba3c0e3428f00a40f42803d95e6cab Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 13 Oct 2025 16:04:51 +1100 Subject: [PATCH 05/10] rename file due to _ vs . in original filename --- ...rm_enhancement.md => 23994_aggregate_transform.enhancement.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{23994_aggregate_transform_enhancement.md => 23994_aggregate_transform.enhancement.md} (100%) diff --git a/changelog.d/23994_aggregate_transform_enhancement.md b/changelog.d/23994_aggregate_transform.enhancement.md similarity index 100% rename from changelog.d/23994_aggregate_transform_enhancement.md rename to changelog.d/23994_aggregate_transform.enhancement.md From 8d7c5a09523dfb18226dd52e35f09c269487b67a Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 14 Oct 2025 09:04:24 +1100 Subject: [PATCH 06/10] Update rustdoc lines, changelog error with nonewline on checkdoc and improve the documentation that is generated for the distribution aggregation mode in the configuration parameter --- .../23994_aggregate_transform.enhancement.md | 2 +- src/transforms/aggregate.rs | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/changelog.d/23994_aggregate_transform.enhancement.md b/changelog.d/23994_aggregate_transform.enhancement.md index 1bbd1a2b41313..0f824c6fe28e4 100644 --- a/changelog.d/23994_aggregate_transform.enhancement.md +++ b/changelog.d/23994_aggregate_transform.enhancement.md @@ -1,3 +1,3 @@ The aggregate transform now supports aggregating metrics into Distribution metric types, enabling statistical analysis of metric values over time windows. -authors: jlambatl \ No newline at end of file +authors: jlambatl diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 8f0f6ba61829c..94b8e087b28c2 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -74,7 +74,9 @@ pub enum AggregationMode { /// Stdev value of absolute metric, ignores incremental Stdev, - /// Aggregates absolute metrics into a distribution, ignores incremental + /// Aggregates absolute metrics into a distribution, ignores incremental. + /// Histograms: For each count in the bucket, add a sample at the bucket's upper limit + /// This preserves the distribution shape. Distribution, } @@ -330,6 +332,13 @@ impl Aggregate { emit!(AggregateFlushed); } + // Creates a distribution metric from a collection of metric entries by converting various metric types into samples. + // This function handles Gauge, Set, Distribution, and AggregatedHistogram metric types. + // For Gauge and Set types, it creates samples directly from their values. + // For Distribution types, it merges existing samples. + // For AggregatedHistogram types, it generates samples based on bucket counts and upper limits. + // The resulting samples are sorted and used to create a new Distribution metric. + // The distribution is sorted. fn create_distribution_from_entries(&self, entries: &[MetricEntry]) -> MetricData { let mut samples = Vec::new(); @@ -385,7 +394,7 @@ impl Aggregate { kind: MetricKind::Absolute, value: MetricValue::Distribution { samples, - statistic: StatisticKind::Histogram, + statistic: StatisticKind::Summary, }, } } From de3663ba3ca4a095e85203e573dfbf413d68a2d3 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 14 Oct 2025 09:15:56 +1100 Subject: [PATCH 07/10] reword docs for Distribution aggregation mode to be more clear in english --- src/transforms/aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 94b8e087b28c2..80cd213c53d36 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -75,7 +75,7 @@ pub enum AggregationMode { Stdev, /// Aggregates absolute metrics into a distribution, ignores incremental. - /// Histograms: For each count in the bucket, add a sample at the bucket's upper limit + /// Histograms: For each count in the bucket, a sample is added at the bucket's upper limit /// This preserves the distribution shape. Distribution, } From d1b7121226d59467324f442158b1d94b1867e76b Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 24 Oct 2025 14:29:02 -0400 Subject: [PATCH 08/10] make generate-component-docs --- .../components/transforms/generated/aggregate.cue | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/website/cue/reference/components/transforms/generated/aggregate.cue b/website/cue/reference/components/transforms/generated/aggregate.cue index 725285a35281b..b1b3360135feb 100644 --- a/website/cue/reference/components/transforms/generated/aggregate.cue +++ b/website/cue/reference/components/transforms/generated/aggregate.cue @@ -20,9 +20,14 @@ generated: components: transforms: aggregate: configuration: { type: string: { default: "Auto" enum: { - Auto: "Default mode. Sums incremental metrics and uses the latest value for absolute metrics." - Count: "Counts metrics for incremental and absolute metrics" - Diff: "Returns difference between latest value for absolute, ignores incremental" + Auto: "Default mode. Sums incremental metrics and uses the latest value for absolute metrics." + Count: "Counts metrics for incremental and absolute metrics" + Diff: "Returns difference between latest value for absolute, ignores incremental" + Distribution: """ + Aggregates absolute metrics into a distribution, ignores incremental. + Histograms: For each count in the bucket, a sample is added at the bucket's upper limit + This preserves the distribution shape. + """ Latest: "Returns the latest value for absolute metrics, ignores incremental" Max: "Max value of absolute metric, ignores incremental" Mean: "Mean value of absolute metric, ignores incremental" From d1e6455c06e73319a27037926e4b7213130453fa Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 24 Oct 2025 14:54:18 -0400 Subject: [PATCH 09/10] fix and strengthen tests --- src/transforms/aggregate.rs | 321 +++++++++++++++++++++++------------- 1 file changed, 205 insertions(+), 116 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 80cd213c53d36..ced44e30fa541 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -394,7 +394,7 @@ impl Aggregate { kind: MetricKind::Absolute, value: MetricValue::Distribution { samples, - statistic: StatisticKind::Summary, + statistic: StatisticKind::Histogram, }, } } @@ -476,6 +476,32 @@ mod tests { event } + // Helper function to verify distribution metric properties + fn assert_distribution_samples( + event: &Event, + expected_samples: &[f64], + expected_statistic: StatisticKind, + ) { + if let MetricValue::Distribution { samples, statistic } = event.as_metric().value() { + assert_eq!( + samples.len(), + expected_samples.len(), + "expected {} samples in distribution but got {}", + expected_samples.len(), + samples.len() + ); + assert_eq!(*statistic, expected_statistic); + + let values: Vec = samples.iter().map(|s| s.value).collect(); + assert_eq!( + values, expected_samples, + "samples should match expected values and be sorted" + ); + } else { + panic!("Expected Distribution metric value"); + } + } + #[test] fn incremental_auto() { let mut agg = Aggregate::new(&AggregateConfig { @@ -1270,44 +1296,27 @@ interval_ms = 999999 MetricKind::Incremental, MetricValue::Counter { value: 110.0 }, ); - // Record four gauge values - agg.record(gauge_a_1); - agg.record(gauge_a_2); - agg.record(gauge_a_3); - agg.record(gauge_a_4); - agg.record(gauge_a_5); - agg.record(gauge_a_6); - agg.record(gauge_a_7); - // validating that if we add a counter it doesn't end up in the distribution. + // Record seven gauge values in non-sorted order to verify sorting + agg.record(gauge_a_4); // 36.0 + agg.record(gauge_a_1); // 25.0 + agg.record(gauge_a_7); // 39.0 + agg.record(gauge_a_2); // 30.0 + agg.record(gauge_a_5); // 37.0 + agg.record(gauge_a_3); // 35.0 + agg.record(gauge_a_6); // 38.0 + // Verify that incremental metrics are ignored agg.record(counter_a_1); let mut out = vec![]; agg.flush_into(&mut out); assert_eq!(1, out.len()); - // Verify it's a distribution with 7 metrics - if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { - assert_eq!( - samples.len(), - 7, - "expected 7 samples in distribution but got {:#?}", - samples.len() - ); - assert_eq!(*statistic, StatisticKind::Histogram); - - // Check sample values - let values: Vec = samples.iter().map(|s| s.value).collect(); - assert!(values.contains(&25.0)); - assert!(values.contains(&30.0)); - assert!(values.contains(&35.0)); - assert!(values.contains(&36.0)); - assert!(values.contains(&37.0)); - assert!(values.contains(&38.0)); - assert!(values.contains(&39.0)); - assert!(!values.contains(&110.0)); - } else { - panic!("Expected Distribution metric value"); - } + // Verify it's a distribution with 7 samples, properly sorted + assert_distribution_samples( + &out[0], + &[25.0, 30.0, 35.0, 36.0, 37.0, 38.0, 39.0], + StatisticKind::Histogram, + ); } #[test] @@ -1322,7 +1331,7 @@ interval_ms = 999999 "dist_a", MetricKind::Absolute, MetricValue::Distribution { - samples: vec![25.0, 30.0, 35.0, 36.0] + samples: vec![35.0, 25.0, 36.0, 30.0] // Intentionally unsorted .into_iter() .map(|v| Sample { value: v, rate: 1 }) .collect(), @@ -1333,7 +1342,7 @@ interval_ms = 999999 "dist_a", MetricKind::Absolute, MetricValue::Distribution { - samples: vec![37.0, 38.0, 39.0] + samples: vec![39.0, 37.0, 38.0] // Intentionally unsorted .into_iter() .map(|v| Sample { value: v, rate: 1 }) .collect(), @@ -1344,14 +1353,14 @@ interval_ms = 999999 "dist_a", MetricKind::Absolute, MetricValue::Distribution { - samples: vec![99.0, 113.0, 456.2] + samples: vec![456.2, 99.0, 113.0] // Intentionally unsorted .into_iter() .map(|v| Sample { value: v, rate: 1 }) .collect(), statistic: StatisticKind::Summary, }, ); - // Record four Distribution values + // Record three Distribution values agg.record(distribution_a_1); agg.record(distribution_a_2); agg.record(distribution_a_3); @@ -1360,31 +1369,12 @@ interval_ms = 999999 agg.flush_into(&mut out); assert_eq!(1, out.len()); - // Verify it's a distribution with 10 metrics - if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { - assert_eq!( - samples.len(), - 10, - "expected 10 samples in distribution but got {:#?}", - samples.len() - ); - assert_eq!(*statistic, StatisticKind::Histogram); - - // Check sample values - let values: Vec = samples.iter().map(|s| s.value).collect(); - assert!(values.contains(&25.0)); - assert!(values.contains(&30.0)); - assert!(values.contains(&35.0)); - assert!(values.contains(&36.0)); - assert!(values.contains(&37.0)); - assert!(values.contains(&38.0)); - assert!(values.contains(&39.0)); - assert!(values.contains(&99.0)); - assert!(values.contains(&113.0)); - assert!(values.contains(&456.2)); - } else { - panic!("Expected Distribution metric value"); - } + // Verify it's a distribution with 10 samples, properly sorted + assert_distribution_samples( + &out[0], + &[25.0, 30.0, 35.0, 36.0, 37.0, 38.0, 39.0, 99.0, 113.0, 456.2], + StatisticKind::Histogram, + ); } #[test] @@ -1487,22 +1477,8 @@ interval_ms = 999999 agg.flush_into(&mut out); assert_eq!(1, out.len()); - // Verify it's a distribution with 15 metrics - if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { - assert_eq!( - samples.len(), - 15, - "expected 15 samples in distribution but got {:#?}", - samples.len() - ); - assert_eq!(*statistic, StatisticKind::Histogram); - - // Check sample values - let values: Vec = samples.iter().map(|s| s.value).collect(); - assert_eq!(values, expected_vals); - } else { - panic!("Expected Distribution metric value"); - } + // Verify it's a distribution with 15 samples + assert_distribution_samples(&out[0], &expected_vals, StatisticKind::Histogram); } #[test] @@ -1518,17 +1494,16 @@ interval_ms = 999999 MetricKind::Absolute, MetricValue::Set { values: vec![ + "39".into(), "25".into(), - "30".into(), + "99".into(), "35".into(), + "113".into(), + "30".into(), + "456.2".into(), "36".into(), "37".into(), "38".into(), - "39".into(), - "39".into(), // duplicate to verify set behavior - "99".into(), - "113".into(), - "456.2".into(), ] .into_iter() .collect(), @@ -1540,17 +1515,16 @@ interval_ms = 999999 MetricKind::Absolute, MetricValue::Set { values: vec![ + "40".into(), "26".into(), - "31".into(), + "100".into(), "36".into(), + "114".into(), + "31".into(), + "556.2".into(), "37".into(), "38".into(), "39".into(), - "40".into(), - "40".into(), // duplicate to verify set behavior - "100".into(), - "114".into(), - "556.2".into(), ] .into_iter() .collect(), @@ -1565,36 +1539,151 @@ interval_ms = 999999 assert_eq!( 1, out.len(), - "expected 1 output metric but got {:#?}", + "expected 1 output metric but got {}", out.len() ); - // Verify it's a distribution with 10 metrics - if let MetricValue::Distribution { samples, statistic } = out[0].as_metric().value() { - assert_eq!( - samples.len(), - 20, - "expected 20 samples in distribution but got {:#?}", - samples.len() - ); - assert_eq!(*statistic, StatisticKind::Histogram); + // Verify it's a distribution with 20 samples (10 from each set), properly sorted + let expected = vec![ + 25.0, 26.0, 30.0, 31.0, 35.0, 36.0, 36.0, 37.0, 37.0, 38.0, 38.0, 39.0, 39.0, 40.0, + 99.0, 100.0, 113.0, 114.0, 456.2, 556.2, + ]; + assert_distribution_samples(&out[0], &expected, StatisticKind::Histogram); + } - // Check sample values - let values: Vec = samples.iter().map(|s| s.value).collect(); - assert!(values.contains(&25.0)); - assert!(values.contains(&30.0)); - assert!(values.contains(&35.0)); - assert!(values.contains(&36.0)); - assert!(values.contains(&37.0)); - assert!(values.contains(&38.0)); - assert!(values.contains(&39.0)); - assert!(values.contains(&40.0)); - assert!(values.contains(&99.0)); - assert!(values.contains(&113.0)); - assert!(values.contains(&456.2)); - assert!(values.contains(&556.2)); - } else { - panic!("Expected Distribution metric value"); + #[test] + fn distribution_mixed_types() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + // Test that different metric series can each become distributions + let gauge_a = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 10.0 }, + ); + + let distribution_b = make_metric( + "dist_b", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![20.0, 30.0] + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + + let histogram_c = make_metric( + "histogram_c", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 2, + sum: 90.0, + buckets: vec![ + Bucket { + upper_limit: 40.0, + count: 1, + }, + Bucket { + upper_limit: 50.0, + count: 1, + }, + ], + }, + ); + + agg.record(gauge_a); + agg.record(distribution_b); + agg.record(histogram_c); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(3, out.len()); // Each different series produces its own distribution + + // Verify all outputs are distributions + for event in out { + if let MetricValue::Distribution { samples, statistic } = event.as_metric().value() { + assert_eq!(*statistic, StatisticKind::Histogram); + assert!(samples.len() >= 1); + } else { + panic!("Expected Distribution metric value"); + } + } + } + + #[test] + fn distribution_set_with_invalid_values() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let set_with_invalid = make_metric( + "set_a", + MetricKind::Absolute, + MetricValue::Set { + values: vec![ + "10.0".into(), + "not_a_number".into(), // Should be skipped + "20.0".into(), + "invalid".into(), // Should be skipped + "30.0".into(), + ] + .into_iter() + .collect(), + }, + ); + + agg.record(set_with_invalid); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Only the 3 valid numeric values should be included + assert_distribution_samples(&out[0], &[10.0, 20.0, 30.0], StatisticKind::Histogram); + } + + #[test] + fn distribution_multiple_series() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let gauge_a = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 10.0 }, + ); + let gauge_b = make_metric( + "gauge_b", + MetricKind::Absolute, + MetricValue::Gauge { value: 20.0 }, + ); + + agg.record(gauge_a); + agg.record(gauge_b); + + let mut out = vec![]; + agg.flush_into(&mut out); + + // Should produce 2 separate distributions + assert_eq!(2, out.len()); + + for event in out { + if let MetricValue::Distribution { samples, .. } = event.as_metric().value() { + assert_eq!(samples.len(), 1); + } else { + panic!("Expected Distribution metric value"); + } } } } From bc4d9845d6a028a4d0f3713b6c683076e85c4895 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Wed, 5 Nov 2025 19:03:47 +1100 Subject: [PATCH 10/10] fix clippy error is_empty --- src/transforms/aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index ced44e30fa541..85858c4a0d0ce 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -1609,7 +1609,7 @@ interval_ms = 999999 for event in out { if let MetricValue::Distribution { samples, statistic } = event.as_metric().value() { assert_eq!(*statistic, StatisticKind::Histogram); - assert!(samples.len() >= 1); + assert!(!samples.is_empty()); } else { panic!("Expected Distribution metric value"); }