Skip to content

Refactor SdkMeterProvider with Inner Structure for Better Lifecycle Control #1663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 66 additions & 27 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use super::{meter::SdkMeter, pipeline::Pipelines, reader::MetricReader, view::Vi
/// [Meter]: opentelemetry::metrics::Meter
#[derive(Clone, Debug)]
pub struct SdkMeterProvider {
inner: Arc<SdkMeterProviderInner>,
}

#[derive(Clone, Debug)]
struct SdkMeterProviderInner {
pipes: Arc<Pipelines>,
meters: Arc<Mutex<HashMap<Scope, Arc<SdkMeter>>>>,
is_shutdown: Arc<AtomicBool>,
Expand Down Expand Up @@ -84,7 +89,7 @@ impl SdkMeterProvider {
/// }
/// ```
pub fn force_flush(&self) -> Result<()> {
self.pipes.force_flush()
self.inner.force_flush()
}

/// Shuts down the meter provider flushing all pending telemetry and releasing
Expand All @@ -100,6 +105,16 @@ impl SdkMeterProvider {
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
pub fn shutdown(&self) -> Result<()> {
self.inner.shutdown()
}
}

impl SdkMeterProviderInner {
fn force_flush(&self) -> Result<()> {
self.pipes.force_flush()
}

fn shutdown(&self) -> Result<()> {
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
Expand All @@ -114,7 +129,7 @@ impl SdkMeterProvider {
}
}

impl Drop for SdkMeterProvider {
impl Drop for SdkMeterProviderInner {
fn drop(&mut self) {
if let Err(err) = self.shutdown() {
global::handle_error(err);
Expand All @@ -129,17 +144,17 @@ impl MeterProvider for SdkMeterProvider {
schema_url: Option<impl Into<Cow<'static, str>>>,
attributes: Option<Vec<KeyValue>>,
) -> Meter {
if self.is_shutdown.load(Ordering::Relaxed) {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return Meter::new(Arc::new(NoopMeterCore::new()));
}

let scope = Scope::new(name, version, schema_url, attributes);

if let Ok(mut meters) = self.meters.lock() {
if let Ok(mut meters) = self.inner.meters.lock() {
let meter = meters
.entry(scope)
.or_insert_with_key(|scope| {
Arc::new(SdkMeter::new(scope.clone(), self.pipes.clone()))
Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone()))
})
.clone();
Meter::new(meter)
Expand Down Expand Up @@ -193,15 +208,18 @@ impl MeterProviderBuilder {
}

/// Construct a new [MeterProvider] with this configuration.

pub fn build(self) -> SdkMeterProvider {
SdkMeterProvider {
pipes: Arc::new(Pipelines::new(
self.resource.unwrap_or_default(),
self.readers,
self.views,
)),
meters: Default::default(),
is_shutdown: Arc::new(AtomicBool::new(false)),
inner: Arc::new(SdkMeterProviderInner {
pipes: Arc::new(Pipelines::new(
self.resource.unwrap_or_default(),
self.readers,
self.views,
)),
meters: Default::default(),
is_shutdown: Arc::new(AtomicBool::new(false)),
}),
}
}
}
Expand Down Expand Up @@ -232,7 +250,7 @@ mod tests {
resource_key: &'static str,
expect: Option<&'static str>| {
assert_eq!(
provider.pipes.0[0]
provider.inner.pipes.0[0]
.resource
.get(Key::from_static_str(resource_key))
.map(|v| v.to_string()),
Expand All @@ -241,17 +259,19 @@ mod tests {
};
let assert_telemetry_resource = |provider: &super::SdkMeterProvider| {
assert_eq!(
provider.pipes.0[0]
provider.inner.pipes.0[0]
.resource
.get(TELEMETRY_SDK_LANGUAGE.into()),
Some(Value::from("rust"))
);
assert_eq!(
provider.pipes.0[0].resource.get(TELEMETRY_SDK_NAME.into()),
provider.inner.pipes.0[0]
.resource
.get(TELEMETRY_SDK_NAME.into()),
Some(Value::from("opentelemetry"))
);
assert_eq!(
provider.pipes.0[0]
provider.inner.pipes.0[0]
.resource
.get(TELEMETRY_SDK_VERSION.into()),
Some(Value::from(env!("CARGO_PKG_VERSION")))
Expand Down Expand Up @@ -282,7 +302,7 @@ mod tests {
)]))
.build();
assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service"));
assert_eq!(custom_meter_provider.pipes.0[0].resource.len(), 1);
assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1);

temp_env::with_var(
"OTEL_RESOURCE_ATTRIBUTES",
Expand All @@ -301,7 +321,7 @@ mod tests {
assert_resource(&env_resource_provider, "key1", Some("value1"));
assert_resource(&env_resource_provider, "k3", Some("value2"));
assert_telemetry_resource(&env_resource_provider);
assert_eq!(env_resource_provider.pipes.0[0].resource.len(), 6);
assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6);
},
);

Expand Down Expand Up @@ -340,7 +360,7 @@ mod tests {
);
assert_telemetry_resource(&user_provided_resource_config_provider);
assert_eq!(
user_provided_resource_config_provider.pipes.0[0]
user_provided_resource_config_provider.inner.pipes.0[0]
.resource
.len(),
7
Expand All @@ -355,7 +375,7 @@ mod tests {
.with_resource(Resource::empty())
.build();

assert_eq!(no_service_name.pipes.0[0].resource.len(), 0)
assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0)
}

#[test]
Expand All @@ -364,22 +384,41 @@ mod tests {
let provider = super::SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
global::set_meter_provider(provider.clone());
assert!(!provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
global::set_meter_provider(provider);
assert!(!reader.is_shutdown());
// create a meter and an instrument
let meter = global::meter("test");
let counter = meter.u64_counter("test_counter").init();
// no need to drop a meter for meter_provider shutdown
global::shutdown_meter_provider();
assert!(provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
assert!(reader.is_shutdown());
// TODO Fix: the instrument is still available, and can be used.
// While the reader is shutdown, and no collect is happening
counter.add(1, &[]);
}
#[test]
fn test_shutdown_invoked_on_last_drop() {
let reader = TestMetricReader::new();
let provider = super::SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
let clone1 = provider.clone();
let clone2 = provider.clone();

// Initially, shutdown should not be called
assert!(!reader.is_shutdown());

// Drop the first clone
drop(clone1);
assert!(!reader.is_shutdown());

// Drop the second clone
drop(clone2);
assert!(!reader.is_shutdown());

// Drop the last original provider
drop(provider);
// Now the shutdown should be invoked
assert!(reader.is_shutdown());
}
}
Loading