From fdc4b58a27a4cb82ebc78e56dc0966edf8b6e4a2 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 2 Jun 2024 15:44:57 -0700 Subject: [PATCH 1/6] feat: add `shutdown` in `TracerProvider` --- opentelemetry-sdk/src/logs/log_emitter.rs | 2 +- opentelemetry-sdk/src/trace/provider.rs | 87 +++++++++++++++++++---- opentelemetry-sdk/src/trace/span.rs | 10 +-- opentelemetry-sdk/src/trace/tracer.rs | 16 ++--- 4 files changed, 89 insertions(+), 26 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 6ed216427e..22f5824b3a 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -77,7 +77,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider { fn library_logger(&self, library: Arc) -> Self::Logger { // If the provider is shutdown, new logger will refer a no-op logger provider. - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + if self.is_shutdown.load(Ordering::Relaxed) { return Logger::new(library, NOOP_LOGGER_PROVIDER.clone()); } Logger::new(library, self.clone()) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 701a784f90..313a8bcc37 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -9,18 +9,37 @@ //! not duplicate this data to avoid that different [`Tracer`] instances //! of the [`TracerProvider`] have different versions of these data. use crate::runtime::RuntimeChannel; -use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer}; +use crate::trace::{ + BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer, +}; use crate::{export::trace::SpanExporter, trace::SpanProcessor}; use crate::{InstrumentationLibrary, Resource}; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; +use opentelemetry::trace::TraceError; use opentelemetry::{global, trace::TraceResult}; use std::borrow::Cow; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; /// Default tracer name if empty string is provided. const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer"; static PROVIDER_RESOURCE: OnceCell = OnceCell::new(); +// a no nop logger provider used as placeholder when the provider is shutdown +static NOOP_TRACER_PROVIDER: Lazy = Lazy::new(|| TracerProvider { + inner: Arc::new(TracerProviderInner { + processors: Vec::new(), + config: Config { + // cannot use default here as the default resource is not empty + sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))), + id_generator: Box::::default(), + span_limits: SpanLimits::default(), + resource: Cow::Owned(Resource::empty()), + }, + }), + is_shutdown: Arc::new(AtomicBool::new(true)), +}); + /// TracerProvider inner type #[derive(Debug)] pub(crate) struct TracerProviderInner { @@ -39,9 +58,14 @@ impl Drop for TracerProviderInner { } /// Creator and registry of named [`Tracer`] instances. +/// +/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components. +/// Cloning and dropping them will not stop the span processing. To stop span processing, users +/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`. #[derive(Clone, Debug)] pub struct TracerProvider { inner: Arc, + is_shutdown: Arc, } impl Default for TracerProvider { @@ -52,8 +76,11 @@ impl Default for TracerProvider { impl TracerProvider { /// Build a new tracer provider - pub(crate) fn new(inner: Arc) -> Self { - TracerProvider { inner } + pub(crate) fn new(inner: TracerProviderInner) -> Self { + TracerProvider { + inner: Arc::new(inner), + is_shutdown: Arc::new(AtomicBool::new(false)), + } } /// Create a new [`TracerProvider`] builder. @@ -71,6 +98,12 @@ impl TracerProvider { &self.inner.config } + /// true if the provider has been shutdown + /// Don't start span or export spans when provider is shutdown + pub(crate) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::Relaxed) + } + /// Force flush all remaining spans in span processors and return results. /// /// # Examples @@ -114,11 +147,41 @@ impl TracerProvider { .map(|processor| processor.force_flush()) .collect() } + + /// Shuts down the current `TracerProvider`. + /// + /// Note that shut down doesn't means the TracerProvider has dropped + pub fn shutdown(&self) -> TraceResult<()> { + if self + .is_shutdown + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + // propagate the shutdown signal to processors + // it's up to the processor to properly block new logs after shutdown + let mut errs = vec![]; + for processor in &self.inner.processors { + if let Err(err) = processor.shutdown() { + errs.push(err); + } + } + + if errs.is_empty() { + Ok(()) + } else { + Err(TraceError::Other(format!("{errs:?}").into())) + } + } else { + Err(TraceError::Other( + "tracer provider already shut down".into(), + )) + } + } } impl opentelemetry::trace::TracerProvider for TracerProvider { /// This implementation of `TracerProvider` produces `Tracer` instances. - type Tracer = crate::trace::Tracer; + type Tracer = Tracer; /// Create a new versioned `Tracer` instance. fn versioned_tracer( @@ -152,7 +215,10 @@ impl opentelemetry::trace::TracerProvider for TracerProvider { } fn library_tracer(&self, library: Arc) -> Self::Tracer { - Tracer::new(library, Arc::downgrade(&self.inner)) + if self.is_shutdown.load(Ordering::Relaxed) { + return Tracer::new(library, NOOP_TRACER_PROVIDER.clone()); + } + Tracer::new(library, self.clone()) } } @@ -226,9 +292,7 @@ impl Builder { p.set_resource(config.resource.as_ref()); } - TracerProvider { - inner: Arc::new(TracerProviderInner { processors, config }), - } + TracerProvider::new(TracerProviderInner { processors, config }) } } @@ -245,7 +309,6 @@ mod tests { use opentelemetry::{Context, Key, KeyValue, Value}; use std::borrow::Cow; use std::env; - use std::sync::Arc; #[derive(Debug)] struct TestSpanProcessor { @@ -276,13 +339,13 @@ mod tests { #[test] fn test_force_flush() { - let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner { + let tracer_provider = super::TracerProvider::new(TracerProviderInner { processors: vec![ Box::from(TestSpanProcessor { success: true }), Box::from(TestSpanProcessor { success: false }), ], config: Default::default(), - })); + }); let results = tracer_provider.force_flush(); assert_eq!(results.len(), 2); diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index d672348885..ea03d9ab53 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -204,11 +204,11 @@ impl Span { None => return, }; + let provider = self.tracer.provider(); // skip if provider has been shut down - let provider = match self.tracer.provider() { - Some(provider) => provider, - None => return, - }; + if provider.is_shutdown() { + return; + } // ensure end time is set via explicit end or implicitly on drop if let Some(timestamp) = timestamp { @@ -719,7 +719,7 @@ mod tests { let exported_data = span.exported_data(); assert!(exported_data.is_some()); - drop(provider); + provider.shutdown().expect("shutdown panicked"); let dropped_span = tracer.start("span_with_dropped_provider"); // return none if the provider has already been dropped assert!(dropped_span.exported_data().is_none()); diff --git a/opentelemetry-sdk/src/trace/tracer.rs b/opentelemetry-sdk/src/trace/tracer.rs index 0749937cd7..ea56725d15 100644 --- a/opentelemetry-sdk/src/trace/tracer.rs +++ b/opentelemetry-sdk/src/trace/tracer.rs @@ -9,7 +9,7 @@ //! Docs: use crate::{ trace::{ - provider::{TracerProvider, TracerProviderInner}, + provider::TracerProvider, span::{Span, SpanData}, SpanLimits, SpanLinks, }, @@ -20,7 +20,7 @@ use opentelemetry::{ Context, KeyValue, }; use std::fmt; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use super::SpanEvents; @@ -28,7 +28,7 @@ use super::SpanEvents; #[derive(Clone)] pub struct Tracer { instrumentation_lib: Arc, - provider: Weak, + provider: TracerProvider, } impl fmt::Debug for Tracer { @@ -46,7 +46,7 @@ impl Tracer { /// Create a new tracer (used internally by `TracerProvider`s). pub(crate) fn new( instrumentation_lib: Arc, - provider: Weak, + provider: TracerProvider, ) -> Self { Tracer { instrumentation_lib, @@ -55,8 +55,8 @@ impl Tracer { } /// TracerProvider associated with this tracer. - pub(crate) fn provider(&self) -> Option { - self.provider.upgrade().map(TracerProvider::new) + pub(crate) fn provider(&self) -> &TracerProvider { + &self.provider } /// Instrumentation library information of this tracer. @@ -175,7 +175,8 @@ impl opentelemetry::trace::Tracer for Tracer { /// spans in the trace. fn build_with_context(&self, mut builder: SpanBuilder, parent_cx: &Context) -> Self::Span { let provider = self.provider(); - if provider.is_none() { + // no point start a span if the tracer provider has already being shutdown + if provider.is_shutdown() { return Span::new( SpanContext::empty_context(), None, @@ -184,7 +185,6 @@ impl opentelemetry::trace::Tracer for Tracer { ); } - let provider = provider.unwrap(); let config = provider.config(); let span_id = builder .span_id From 924e33c85e69d1402847acc9b7d330cd982ade2b Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 9 Jun 2024 15:19:22 -0700 Subject: [PATCH 2/6] Update opentelemetry-sdk/src/trace/provider.rs Co-authored-by: Lalit Kumar Bhasin --- opentelemetry-sdk/src/trace/provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 313a8bcc37..cac592a242 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -25,7 +25,7 @@ use std::sync::Arc; const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer"; static PROVIDER_RESOURCE: OnceCell = OnceCell::new(); -// a no nop logger provider used as placeholder when the provider is shutdown +// a no nop tracer provider used as placeholder when the provider is shutdown static NOOP_TRACER_PROVIDER: Lazy = Lazy::new(|| TracerProvider { inner: Arc::new(TracerProviderInner { processors: Vec::new(), From 31a0a575e90456d86acc78dc0eed09e596a51d12 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 9 Jun 2024 15:19:27 -0700 Subject: [PATCH 3/6] Update opentelemetry-sdk/src/trace/provider.rs Co-authored-by: Lalit Kumar Bhasin --- opentelemetry-sdk/src/trace/provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index cac592a242..87afdd99f3 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -158,7 +158,7 @@ impl TracerProvider { .is_ok() { // propagate the shutdown signal to processors - // it's up to the processor to properly block new logs after shutdown + // it's up to the processor to properly block new spans after shutdown let mut errs = vec![]; for processor in &self.inner.processors { if let Err(err) = processor.shutdown() { From 639cb2df8a920935b717098519bcfb1ebc21c69a Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 9 Jun 2024 16:10:23 -0700 Subject: [PATCH 4/6] unit tests --- opentelemetry-sdk/src/trace/provider.rs | 96 +++++++++++++++++-- opentelemetry-sdk/src/trace/span_processor.rs | 21 ++++ 2 files changed, 111 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 87afdd99f3..509d64c152 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -305,23 +305,59 @@ mod tests { use crate::trace::provider::TracerProviderInner; use crate::trace::{Config, Span, SpanProcessor}; use crate::Resource; - use opentelemetry::trace::{TraceError, TraceResult}; + use opentelemetry::trace::{TraceError, TraceResult, Tracer, TracerProvider}; use opentelemetry::{Context, Key, KeyValue, Value}; use std::borrow::Cow; use std::env; + use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; + use std::sync::Arc; + + // fields below is wrapped with Arc so we can assert it + #[derive(Default, Debug)] + struct AssertInfo { + started_span: AtomicU32, + is_shutdown: AtomicBool, + } + + #[derive(Default, Debug, Clone)] + struct SharedAssertInfo(Arc); + + impl SharedAssertInfo { + fn started_span_count(&self, count: u32) -> bool { + return self.0.started_span.load(Ordering::SeqCst) == count; + } + } #[derive(Debug)] struct TestSpanProcessor { success: bool, + assert_info: SharedAssertInfo, + } + + impl TestSpanProcessor { + fn new(success: bool) -> TestSpanProcessor { + TestSpanProcessor { + success, + assert_info: SharedAssertInfo::default(), + } + } + + // get handle to assert info + fn assert_info(&self) -> SharedAssertInfo { + self.assert_info.clone() + } } impl SpanProcessor for TestSpanProcessor { fn on_start(&self, _span: &mut Span, _cx: &Context) { - unimplemented!() + self.assert_info + .0 + .started_span + .fetch_add(1, Ordering::SeqCst); } fn on_end(&self, _span: SpanData) { - unimplemented!() + // ignore } fn force_flush(&self) -> TraceResult<()> { @@ -333,7 +369,17 @@ mod tests { } fn shutdown(&self) -> TraceResult<()> { - self.force_flush() + if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) { + return Ok(()); + } else { + let _ = self.assert_info.0.is_shutdown.compare_exchange( + false, + true, + Ordering::SeqCst, + Ordering::SeqCst, + ); + self.force_flush() + } } } @@ -341,8 +387,8 @@ mod tests { fn test_force_flush() { let tracer_provider = super::TracerProvider::new(TracerProviderInner { processors: vec![ - Box::from(TestSpanProcessor { success: true }), - Box::from(TestSpanProcessor { success: false }), + Box::from(TestSpanProcessor::new(true)), + Box::from(TestSpanProcessor::new(false)), ], config: Default::default(), }); @@ -480,4 +526,42 @@ mod tests { assert_eq!(no_service_name.config().resource.len(), 0) } + + #[test] + fn test_shutdown_noops() { + let processor = TestSpanProcessor::new(false); + let assert_handle = processor.assert_info(); + let tracer_provider = super::TracerProvider::new(TracerProviderInner { + processors: vec![Box::from(processor)], + config: Default::default(), + }); + + let test_tracer_1 = tracer_provider.tracer("test1"); + let _ = test_tracer_1.start("test"); + + assert!(assert_handle.started_span_count(1)); + + let _ = test_tracer_1.start("test"); + + assert!(assert_handle.started_span_count(2)); + + let shutdown = |tracer_provider: super::TracerProvider| { + let _ = tracer_provider.shutdown(); // shutdown once + }; + + // assert tracer provider can be shutdown using on a cloned version + shutdown(tracer_provider.clone()); + + // after shutdown we should get noop tracer + let noop_tracer = tracer_provider.tracer("noop"); + // noop tracer cannot start anything + let _ = noop_tracer.start("test"); + assert!(assert_handle.started_span_count(2)); + // noop tracer's tracer provider should be shutdown + assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst)); + + // existing tracer becomes noops after shutdown + let _ = test_tracer_1.start("test"); + assert!(assert_handle.started_span_count(2)); + } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index f9b634e850..df66eb7838 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -92,6 +92,8 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { fn force_flush(&self) -> TraceResult<()>; /// Shuts down the processor. Called when SDK is shut down. This is an /// opportunity for processors to do any cleanup required. + /// + /// Implementation should make sure shutdown can be called multiple times. fn shutdown(&self) -> TraceResult<()>; /// Set the resource for the log processor. fn set_resource(&mut self, _resource: &Resource) {} @@ -691,8 +693,10 @@ mod tests { }; use crate::export::trace::{ExportResult, SpanData, SpanExporter}; use crate::runtime; + use crate::runtime::Tokio; use crate::testing::trace::{ new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder, + NoopSpanExporter, }; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS, @@ -920,6 +924,23 @@ mod tests { ); } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_batch_span_processor_multiple_shutdown() { + let processor = BatchSpanProcessor::new( + Box::new(NoopSpanExporter::new()), + BatchConfig::default(), + Tokio, + ); + + let shutdown = |processor: &BatchSpanProcessor| { + let result = processor.shutdown(); + assert!(result.is_ok()); + }; + + shutdown(&processor); + shutdown(&processor); + } + struct BlockingExporter { delay_for: Duration, delay_fn: D, From 9f18239885539cddc8f813fe0c20116b941c04d5 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 9 Jun 2024 16:47:40 -0700 Subject: [PATCH 5/6] unit tests --- opentelemetry-sdk/src/trace/span_processor.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index df66eb7838..58ba00b77f 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -924,23 +924,6 @@ mod tests { ); } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_batch_span_processor_multiple_shutdown() { - let processor = BatchSpanProcessor::new( - Box::new(NoopSpanExporter::new()), - BatchConfig::default(), - Tokio, - ); - - let shutdown = |processor: &BatchSpanProcessor| { - let result = processor.shutdown(); - assert!(result.is_ok()); - }; - - shutdown(&processor); - shutdown(&processor); - } - struct BlockingExporter { delay_for: Duration, delay_fn: D, From d3057ac6b16370db578e3e62a2b89d1d0582376b Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 9 Jun 2024 17:05:13 -0700 Subject: [PATCH 6/6] make linter happy --- opentelemetry-sdk/src/trace/provider.rs | 4 ++-- opentelemetry-sdk/src/trace/span_processor.rs | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 509d64c152..9550ce11d2 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -324,7 +324,7 @@ mod tests { impl SharedAssertInfo { fn started_span_count(&self, count: u32) -> bool { - return self.0.started_span.load(Ordering::SeqCst) == count; + self.0.started_span.load(Ordering::SeqCst) == count } } @@ -370,7 +370,7 @@ mod tests { fn shutdown(&self) -> TraceResult<()> { if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) { - return Ok(()); + Ok(()) } else { let _ = self.assert_info.0.is_shutdown.compare_exchange( false, diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 58ba00b77f..214c0e5768 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -693,10 +693,8 @@ mod tests { }; use crate::export::trace::{ExportResult, SpanData, SpanExporter}; use crate::runtime; - use crate::runtime::Tokio; use crate::testing::trace::{ new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder, - NoopSpanExporter, }; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,