Skip to content

Commit 23ffd0a

Browse files
committed
refactor: PushMetricExpoter interface
1 parent 4ff8e02 commit 23ffd0a

File tree

12 files changed

+164
-55
lines changed

12 files changed

+164
-55
lines changed

opentelemetry-otlp/src/exporter/http/metrics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use crate::metric::MetricsClient;
44
use http::{header::CONTENT_TYPE, Method};
55
use opentelemetry::otel_debug;
66
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
7-
use opentelemetry_sdk::metrics::data::ResourceMetrics;
7+
use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef;
88

99
use super::OtlpHttpClient;
1010

1111
impl MetricsClient for OtlpHttpClient {
12-
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
12+
async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult {
1313
let client = self
1414
.client
1515
.lock()

opentelemetry-otlp/src/exporter/http/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::time::Duration;
2727
mod metrics;
2828

2929
#[cfg(feature = "metrics")]
30-
use opentelemetry_sdk::metrics::data::ResourceMetrics;
30+
use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef;
3131

3232
#[cfg(feature = "logs")]
3333
pub(crate) mod logs;
@@ -326,11 +326,11 @@ impl OtlpHttpClient {
326326
#[cfg(feature = "metrics")]
327327
fn build_metrics_export_body(
328328
&self,
329-
metrics: &mut ResourceMetrics,
329+
metrics: ResourceMetricsRef<'_>,
330330
) -> Option<(Vec<u8>, &'static str)> {
331331
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
332332

333-
let req: ExportMetricsServiceRequest = (&*metrics).into();
333+
let req: ExportMetricsServiceRequest = metrics.into();
334334

335335
match self.protocol {
336336
#[cfg(feature = "http-json")]

opentelemetry-otlp/src/exporter/tonic/metrics.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{
66
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
77
};
88
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
9-
use opentelemetry_sdk::metrics::data::ResourceMetrics;
9+
use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef;
1010
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
1111

1212
use super::BoxInterceptor;
@@ -52,7 +52,7 @@ impl TonicMetricsClient {
5252
}
5353

5454
impl MetricsClient for TonicMetricsClient {
55-
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
55+
async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult {
5656
let (mut client, metadata, extensions) = self
5757
.inner
5858
.lock()
@@ -81,7 +81,7 @@ impl MetricsClient for TonicMetricsClient {
8181
.export(Request::from_parts(
8282
metadata,
8383
extensions,
84-
ExportMetricsServiceRequest::from(&*metrics),
84+
ExportMetricsServiceRequest::from(metrics),
8585
))
8686
.await
8787
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;

opentelemetry-otlp/src/metric.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ use crate::{ExporterBuildError, NoExporterBuilderSet};
1717
use core::fmt;
1818
use opentelemetry_sdk::error::OTelSdkResult;
1919

20-
use opentelemetry_sdk::metrics::{
21-
data::ResourceMetrics, exporter::PushMetricExporter, Temporality,
22-
};
20+
use opentelemetry_sdk::metrics::exporter::ResourceMetricsRef;
21+
use opentelemetry_sdk::metrics::{exporter::PushMetricExporter, Temporality};
2322
use std::fmt::{Debug, Formatter};
2423
use std::time::Duration;
2524

@@ -123,7 +122,7 @@ impl HasHttpConfig for MetricExporterBuilder<HttpExporterBuilderSet> {
123122
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
124123
fn export(
125124
&self,
126-
metrics: &mut ResourceMetrics,
125+
metrics: ResourceMetricsRef<'_>,
127126
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
128127
fn shutdown(&self) -> OTelSdkResult;
129128
}
@@ -149,7 +148,7 @@ impl Debug for MetricExporter {
149148
}
150149

151150
impl PushMetricExporter for MetricExporter {
152-
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
151+
async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult {
153152
match &self.client {
154153
#[cfg(feature = "grpc-tonic")]
155154
SupportedTransportClient::Tonic(client) => client.export(metrics).await,

opentelemetry-proto/src/transform/metrics.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ pub mod tonic {
1111
use opentelemetry_sdk::metrics::data::{
1212
AggregatedMetrics, Exemplar as SdkExemplar,
1313
ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge,
14-
Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics,
15-
ScopeMetrics as SdkScopeMetrics, Sum as SdkSum,
14+
Histogram as SdkHistogram, Metric as SdkMetric, MetricData, Sum as SdkSum,
1615
};
16+
use opentelemetry_sdk::metrics::exporter::{ResourceMetricsRef, ScopeMetricsRef};
1717
use opentelemetry_sdk::metrics::Temporality;
1818
use opentelemetry_sdk::Resource as SdkResource;
1919

@@ -110,12 +110,12 @@ pub mod tonic {
110110
}
111111
}
112112

113-
impl From<&ResourceMetrics> for ExportMetricsServiceRequest {
114-
fn from(rm: &ResourceMetrics) -> Self {
113+
impl From<ResourceMetricsRef<'_>> for ExportMetricsServiceRequest {
114+
fn from(rm: ResourceMetricsRef<'_>) -> Self {
115115
ExportMetricsServiceRequest {
116116
resource_metrics: vec![TonicResourceMetrics {
117-
resource: Some((&rm.resource).into()),
118-
scope_metrics: rm.scope_metrics.iter().map(Into::into).collect(),
117+
resource: Some(rm.resource.into()),
118+
scope_metrics: rm.scope_metrics.map(Into::into).collect(),
119119
schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(),
120120
}],
121121
}
@@ -131,11 +131,11 @@ pub mod tonic {
131131
}
132132
}
133133

134-
impl From<&SdkScopeMetrics> for TonicScopeMetrics {
135-
fn from(sm: &SdkScopeMetrics) -> Self {
134+
impl From<ScopeMetricsRef<'_>> for TonicScopeMetrics {
135+
fn from(sm: ScopeMetricsRef<'_>) -> Self {
136136
TonicScopeMetrics {
137-
scope: Some((&sm.scope, None).into()),
138-
metrics: sm.metrics.iter().map(Into::into).collect(),
137+
scope: Some((sm.scope, None).into()),
138+
metrics: sm.metrics.map(Into::into).collect(),
139139
schema_url: sm
140140
.scope
141141
.schema_url()

opentelemetry-sdk/src/metrics/data/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct ResourceMetrics {
1818
}
1919

2020
/// A collection of metrics produced by a meter.
21-
#[derive(Default, Debug)]
21+
#[derive(Debug, Default)]
2222
pub struct ScopeMetrics {
2323
/// The [InstrumentationScope] that the meter was created with.
2424
pub scope: InstrumentationScope,

opentelemetry-sdk/src/metrics/exporter.rs

+86-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,92 @@
11
//! Interfaces for exporting metrics
22
3-
use crate::error::OTelSdkResult;
4-
use std::time::Duration;
3+
use opentelemetry::InstrumentationScope;
54

6-
use crate::metrics::data::ResourceMetrics;
5+
use crate::{error::OTelSdkResult, Resource};
6+
use std::{fmt::Debug, slice::Iter, time::Duration};
77

8-
use super::Temporality;
8+
use super::{
9+
data::{Metric, ResourceMetrics, ScopeMetrics},
10+
Temporality,
11+
};
12+
13+
/// A collection of [`BatchScopeMetrics`] and the associated [Resource] that created them.
14+
#[derive(Debug)]
15+
pub struct ResourceMetricsRef<'a> {
16+
/// The entity that collected the metrics.
17+
pub resource: &'a Resource,
18+
/// The collection of metrics with unique [InstrumentationScope]s.
19+
pub scope_metrics: BatchScopeMetrics<'a>,
20+
}
21+
22+
/// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics.
23+
pub struct BatchScopeMetrics<'a> {
24+
iter: Iter<'a, ScopeMetrics>,
25+
}
26+
27+
/// A collection of metrics produced by a [`InstrumentationScope`] meter.
28+
#[derive(Debug)]
29+
pub struct ScopeMetricsRef<'a> {
30+
/// The [InstrumentationScope] that the meter was created with.
31+
pub scope: &'a InstrumentationScope,
32+
/// The list of aggregations created by the meter.
33+
pub metrics: BatchMetrics<'a>,
34+
}
35+
36+
/// Iterator over aggregations created by the meter.
37+
pub struct BatchMetrics<'a> {
38+
iter: Iter<'a, Metric>,
39+
}
40+
41+
impl<'a> ResourceMetricsRef<'a> {
42+
pub(crate) fn new(rm: &'a ResourceMetrics) -> Self {
43+
Self {
44+
resource: &rm.resource,
45+
scope_metrics: BatchScopeMetrics {
46+
iter: rm.scope_metrics.iter(),
47+
},
48+
}
49+
}
50+
}
51+
52+
impl<'a> ScopeMetricsRef<'a> {
53+
fn new(sm: &'a ScopeMetrics) -> Self {
54+
Self {
55+
scope: &sm.scope,
56+
metrics: BatchMetrics {
57+
iter: sm.metrics.iter(),
58+
},
59+
}
60+
}
61+
}
62+
63+
impl Debug for BatchScopeMetrics<'_> {
64+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65+
f.debug_struct("BatchScopeMetrics").finish()
66+
}
67+
}
68+
69+
impl<'a> Iterator for BatchScopeMetrics<'a> {
70+
type Item = ScopeMetricsRef<'a>;
71+
72+
fn next(&mut self) -> Option<Self::Item> {
73+
self.iter.next().map(ScopeMetricsRef::new)
74+
}
75+
}
76+
77+
impl<'a> Iterator for BatchMetrics<'a> {
78+
type Item = &'a Metric;
79+
80+
fn next(&mut self) -> Option<Self::Item> {
81+
self.iter.next()
82+
}
83+
}
84+
85+
impl Debug for BatchMetrics<'_> {
86+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87+
f.debug_struct("BatchMetrics").finish()
88+
}
89+
}
990

1091
/// Exporter handles the delivery of metric data to external receivers.
1192
///
@@ -18,7 +99,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
1899
/// considered unrecoverable and will be logged.
19100
fn export(
20101
&self,
21-
metrics: &mut ResourceMetrics,
102+
metrics: ResourceMetricsRef<'_>,
22103
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
23104

24105
/// Flushes any metric data held by an exporter.

opentelemetry-sdk/src/metrics/in_memory_exporter.rs

+30-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use crate::error::{OTelSdkError, OTelSdkResult};
2-
use crate::metrics::data::{
3-
ExponentialHistogram, Gauge, Histogram, MetricData, ResourceMetrics, Sum,
4-
};
2+
use crate::metrics::data::{ExponentialHistogram, Gauge, Histogram, MetricData, Sum};
53
use crate::metrics::exporter::PushMetricExporter;
64
use crate::metrics::Temporality;
75
use crate::InMemoryExporterError;
@@ -10,7 +8,8 @@ use std::fmt;
108
use std::sync::{Arc, Mutex};
119
use std::time::Duration;
1210

13-
use super::data::{AggregatedMetrics, Metric, ScopeMetrics};
11+
use super::data::{AggregatedMetrics, Metric, ResourceMetrics, ScopeMetrics};
12+
use super::exporter::ResourceMetricsRef;
1413

1514
/// An in-memory metrics exporter that stores metrics data in memory.
1615
///
@@ -150,7 +149,31 @@ impl InMemoryMetricExporter {
150149
let metrics = self
151150
.metrics
152151
.lock()
153-
.map(|metrics_guard| metrics_guard.iter().map(Self::clone_metrics).collect())
152+
.map(|metrics_guard| {
153+
metrics_guard
154+
.iter()
155+
.map(|data| ResourceMetrics {
156+
resource: data.resource.clone(),
157+
scope_metrics: data
158+
.scope_metrics
159+
.iter()
160+
.map(|data| ScopeMetrics {
161+
scope: data.scope.clone(),
162+
metrics: data
163+
.metrics
164+
.iter()
165+
.map(|data| Metric {
166+
name: data.name.clone(),
167+
description: data.description.clone(),
168+
unit: data.unit.clone(),
169+
data: Self::clone_data(&data.data),
170+
})
171+
.collect(),
172+
})
173+
.collect(),
174+
})
175+
.collect()
176+
})
154177
.map_err(InMemoryExporterError::from)?;
155178
Ok(metrics)
156179
}
@@ -172,17 +195,15 @@ impl InMemoryMetricExporter {
172195
.map(|mut metrics_guard| metrics_guard.clear());
173196
}
174197

175-
fn clone_metrics(metric: &ResourceMetrics) -> ResourceMetrics {
198+
fn clone_metrics(metric: ResourceMetricsRef<'_>) -> ResourceMetrics {
176199
ResourceMetrics {
177200
resource: metric.resource.clone(),
178201
scope_metrics: metric
179202
.scope_metrics
180-
.iter()
181203
.map(|scope_metric| ScopeMetrics {
182204
scope: scope_metric.scope.clone(),
183205
metrics: scope_metric
184206
.metrics
185-
.iter()
186207
.map(|metric| Metric {
187208
name: metric.name.clone(),
188209
description: metric.description.clone(),
@@ -237,7 +258,7 @@ impl InMemoryMetricExporter {
237258
}
238259

239260
impl PushMetricExporter for InMemoryMetricExporter {
240-
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
261+
async fn export(&self, metrics: ResourceMetricsRef<'_>) -> OTelSdkResult {
241262
self.metrics
242263
.lock()
243264
.map(|mut metrics_guard| {

opentelemetry-sdk/src/metrics/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,12 @@ pub enum Temporality {
112112

113113
#[cfg(all(test, feature = "testing"))]
114114
mod tests {
115-
use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint};
115+
use self::data::{HistogramDataPoint, SumDataPoint};
116116
use super::data::MetricData;
117+
use super::data::ResourceMetrics;
118+
use super::data::ScopeMetrics;
117119
use super::internal::Number;
118120
use super::*;
119-
use crate::metrics::data::ResourceMetrics;
120121
use crate::metrics::internal::AggregatedMetricsAccess;
121122
use crate::metrics::InMemoryMetricExporter;
122123
use crate::metrics::InMemoryMetricExporterBuilder;

opentelemetry-sdk/src/metrics/periodic_reader.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
1212

1313
use crate::{
1414
error::{OTelSdkError, OTelSdkResult},
15-
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
15+
metrics::{
16+
exporter::{PushMetricExporter, ResourceMetricsRef},
17+
reader::SdkProducer,
18+
},
1619
Resource,
1720
};
1821

@@ -410,7 +413,7 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
410413

411414
// Relying on futures executor to execute async call.
412415
// TODO: Pass timeout to exporter
413-
futures_executor::block_on(self.exporter.export(&mut rm))
416+
futures_executor::block_on(self.exporter.export(ResourceMetricsRef::new(&rm)))
414417
}
415418

416419
fn force_flush(&self) -> OTelSdkResult {
@@ -515,7 +518,9 @@ mod tests {
515518
use crate::{
516519
error::{OTelSdkError, OTelSdkResult},
517520
metrics::{
518-
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
521+
data::ResourceMetrics,
522+
exporter::{PushMetricExporter, ResourceMetricsRef},
523+
reader::MetricReader,
519524
InMemoryMetricExporter, SdkMeterProvider, Temporality,
520525
},
521526
Resource,
@@ -552,7 +557,7 @@ mod tests {
552557
}
553558

554559
impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
555-
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
560+
async fn export(&self, _metrics: ResourceMetricsRef<'_>) -> OTelSdkResult {
556561
if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
557562
Err(OTelSdkError::InternalFailure("export failed".into()))
558563
} else {
@@ -583,7 +588,7 @@ mod tests {
583588
}
584589

585590
impl PushMetricExporter for MockMetricExporter {
586-
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
591+
async fn export(&self, _metrics: ResourceMetricsRef<'_>) -> OTelSdkResult {
587592
Ok(())
588593
}
589594

0 commit comments

Comments
 (0)