diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index ff6d7db5b7..d0ec5a463c 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -27,6 +27,11 @@ use super::{meter::SdkMeter, pipeline::Pipelines, reader::MetricReader, view::Vi /// [Meter]: opentelemetry::metrics::Meter #[derive(Clone, Debug)] pub struct SdkMeterProvider { + inner: Arc, +} + +#[derive(Clone, Debug)] +struct SdkMeterProviderInner { pipes: Arc, meters: Arc>>>, is_shutdown: Arc, @@ -84,7 +89,7 @@ impl SdkMeterProvider { /// } /// ``` pub fn force_flush(&self) -> Result<()> { - self.pipes.force_flush() + self.inner.force_flush() } /// Shuts down the meter provider flushing all pending telemetry and releasing @@ -100,6 +105,16 @@ impl SdkMeterProvider { /// There is no guaranteed that all telemetry be flushed or all resources have /// been released on error. pub fn shutdown(&self) -> Result<()> { + self.inner.shutdown() + } +} + +impl SdkMeterProviderInner { + fn force_flush(&self) -> Result<()> { + self.pipes.force_flush() + } + + fn shutdown(&self) -> Result<()> { if self .is_shutdown .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) @@ -114,7 +129,7 @@ impl SdkMeterProvider { } } -impl Drop for SdkMeterProvider { +impl Drop for SdkMeterProviderInner { fn drop(&mut self) { if let Err(err) = self.shutdown() { global::handle_error(err); @@ -129,17 +144,17 @@ impl MeterProvider for SdkMeterProvider { schema_url: Option>>, attributes: Option>, ) -> Meter { - if self.is_shutdown.load(Ordering::Relaxed) { + if self.inner.is_shutdown.load(Ordering::Relaxed) { return Meter::new(Arc::new(NoopMeterCore::new())); } let scope = Scope::new(name, version, schema_url, attributes); - if let Ok(mut meters) = self.meters.lock() { + if let Ok(mut meters) = self.inner.meters.lock() { let meter = meters .entry(scope) .or_insert_with_key(|scope| { - Arc::new(SdkMeter::new(scope.clone(), self.pipes.clone())) + Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone())) }) .clone(); Meter::new(meter) @@ -193,15 +208,18 @@ impl MeterProviderBuilder { } /// Construct a new [MeterProvider] with this configuration. + pub fn build(self) -> SdkMeterProvider { SdkMeterProvider { - pipes: Arc::new(Pipelines::new( - self.resource.unwrap_or_default(), - self.readers, - self.views, - )), - meters: Default::default(), - is_shutdown: Arc::new(AtomicBool::new(false)), + inner: Arc::new(SdkMeterProviderInner { + pipes: Arc::new(Pipelines::new( + self.resource.unwrap_or_default(), + self.readers, + self.views, + )), + meters: Default::default(), + is_shutdown: Arc::new(AtomicBool::new(false)), + }), } } } @@ -232,7 +250,7 @@ mod tests { resource_key: &'static str, expect: Option<&'static str>| { assert_eq!( - provider.pipes.0[0] + provider.inner.pipes.0[0] .resource .get(Key::from_static_str(resource_key)) .map(|v| v.to_string()), @@ -241,17 +259,19 @@ mod tests { }; let assert_telemetry_resource = |provider: &super::SdkMeterProvider| { assert_eq!( - provider.pipes.0[0] + provider.inner.pipes.0[0] .resource .get(TELEMETRY_SDK_LANGUAGE.into()), Some(Value::from("rust")) ); assert_eq!( - provider.pipes.0[0].resource.get(TELEMETRY_SDK_NAME.into()), + provider.inner.pipes.0[0] + .resource + .get(TELEMETRY_SDK_NAME.into()), Some(Value::from("opentelemetry")) ); assert_eq!( - provider.pipes.0[0] + provider.inner.pipes.0[0] .resource .get(TELEMETRY_SDK_VERSION.into()), Some(Value::from(env!("CARGO_PKG_VERSION"))) @@ -282,7 +302,7 @@ mod tests { )])) .build(); assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service")); - assert_eq!(custom_meter_provider.pipes.0[0].resource.len(), 1); + assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1); temp_env::with_var( "OTEL_RESOURCE_ATTRIBUTES", @@ -301,7 +321,7 @@ mod tests { assert_resource(&env_resource_provider, "key1", Some("value1")); assert_resource(&env_resource_provider, "k3", Some("value2")); assert_telemetry_resource(&env_resource_provider); - assert_eq!(env_resource_provider.pipes.0[0].resource.len(), 6); + assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6); }, ); @@ -340,7 +360,7 @@ mod tests { ); assert_telemetry_resource(&user_provided_resource_config_provider); assert_eq!( - user_provided_resource_config_provider.pipes.0[0] + user_provided_resource_config_provider.inner.pipes.0[0] .resource .len(), 7 @@ -355,7 +375,7 @@ mod tests { .with_resource(Resource::empty()) .build(); - assert_eq!(no_service_name.pipes.0[0].resource.len(), 0) + assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0) } #[test] @@ -364,22 +384,41 @@ mod tests { let provider = super::SdkMeterProvider::builder() .with_reader(reader.clone()) .build(); - global::set_meter_provider(provider.clone()); - assert!(!provider - .is_shutdown - .load(std::sync::atomic::Ordering::Relaxed)); + global::set_meter_provider(provider); assert!(!reader.is_shutdown()); // create a meter and an instrument let meter = global::meter("test"); let counter = meter.u64_counter("test_counter").init(); // no need to drop a meter for meter_provider shutdown global::shutdown_meter_provider(); - assert!(provider - .is_shutdown - .load(std::sync::atomic::Ordering::Relaxed)); assert!(reader.is_shutdown()); // TODO Fix: the instrument is still available, and can be used. // While the reader is shutdown, and no collect is happening counter.add(1, &[]); } + #[test] + fn test_shutdown_invoked_on_last_drop() { + let reader = TestMetricReader::new(); + let provider = super::SdkMeterProvider::builder() + .with_reader(reader.clone()) + .build(); + let clone1 = provider.clone(); + let clone2 = provider.clone(); + + // Initially, shutdown should not be called + assert!(!reader.is_shutdown()); + + // Drop the first clone + drop(clone1); + assert!(!reader.is_shutdown()); + + // Drop the second clone + drop(clone2); + assert!(!reader.is_shutdown()); + + // Drop the last original provider + drop(provider); + // Now the shutdown should be invoked + assert!(reader.is_shutdown()); + } }