diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index a7aa09aac9..6367d49e60 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -145,7 +145,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor { } } - fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {} + fn on_end(&self, _span: &mut opentelemetry_sdk::trace::SpanData) {} } fn init_tracer() -> SdkTracerProvider { diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index b50c1fe1c4..6ed7475594 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -61,8 +61,8 @@ fn criterion_benchmark(c: &mut Criterion) { let span_processor = shared_span_processor.clone(); let spans = get_span_data(); handles.push(tokio::spawn(async move { - for span in spans { - span_processor.on_end(span); + for mut span in spans { + span_processor.on_end(&mut span); tokio::task::yield_now().await; } })); diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 6561da7d2a..cc8538acfa 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -136,7 +136,7 @@ mod tests { } } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut SpanData) { // TODO: Accessing Context::current() will panic today and hence commented out. // See https://github.com/open-telemetry/opentelemetry-rust/issues/2871 // let _c = Context::current(); diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2773b8778b..a4efb8b419 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -516,7 +516,7 @@ mod tests { .fetch_add(1, Ordering::SeqCst); } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut SpanData) { // ignore } @@ -779,7 +779,7 @@ mod tests { // No operation needed for this processor } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut SpanData) { // No operation needed for this processor } diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 9cb8b88045..802a949288 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -200,7 +200,20 @@ impl Span { fn ensure_ended_and_exported(&mut self, timestamp: Option) { // skip if data has already been exported let mut data = match self.data.take() { - Some(data) => data, + Some(data) => crate::trace::SpanData { + span_context: self.span_context.clone(), + parent_span_id: data.parent_span_id, + span_kind: data.span_kind, + name: data.name, + start_time: data.start_time, + end_time: data.end_time, + attributes: data.attributes, + dropped_attributes_count: data.dropped_attributes_count, + events: data.events, + links: data.links, + status: data.status, + instrumentation_scope: self.tracer.instrumentation_scope().clone(), + }, None => return, }; @@ -219,20 +232,10 @@ impl Span { match provider.span_processors() { [] => {} - [processor] => { - processor.on_end(build_export_data( - data, - self.span_context.clone(), - &self.tracer, - )); - } + [processor] => processor.on_end(&mut data), processors => { for processor in processors { - processor.on_end(build_export_data( - data.clone(), - self.span_context.clone(), - &self.tracer, - )); + processor.on_end(&mut data); } } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 595099ef7f..0a53b2830e 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -82,8 +82,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. - /// TODO - This method should take reference to `SpanData` - fn on_end(&self, span: SpanData); + fn on_end(&self, span: &mut SpanData); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> OTelSdkResult; /// Shuts down the processor. Called when SDK is shut down. This is an @@ -129,7 +128,7 @@ impl SpanProcessor for SimpleSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { + fn on_end(&self, span: &mut SpanData) { if !span.span_context.is_sampled() { return; } @@ -138,7 +137,7 @@ impl SpanProcessor for SimpleSpanProcessor { .exporter .lock() .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into())) - .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span]))); + .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span.clone()]))); if let Err(err) = result { // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug` @@ -513,7 +512,7 @@ impl SpanProcessor for BatchSpanProcessor { } /// Handles span end. - fn on_end(&self, span: SpanData) { + fn on_end(&self, span: &mut SpanData) { if self.is_shutdown.load(Ordering::Relaxed) { // this is a warning, as the user is trying to emit after the processor has been shutdown otel_warn!( @@ -522,7 +521,7 @@ impl SpanProcessor for BatchSpanProcessor { ); return; } - let result = self.span_sender.try_send(span); + let result = self.span_sender.try_send(span.clone()); if result.is_err() { // Increment dropped span count. The first time we have to drop a span, @@ -876,8 +875,8 @@ mod tests { fn simple_span_processor_on_end_calls_export() { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); - let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + let mut span_data = new_test_export_span_data(); + processor.on_end(&mut span_data); assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data); let _result = processor.shutdown(); } @@ -886,7 +885,7 @@ mod tests { fn simple_span_processor_on_end_skips_export_if_not_sampled() { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); - let unsampled = SpanData { + let mut unsampled = SpanData { span_context: SpanContext::empty_context(), parent_span_id: SpanId::INVALID, span_kind: SpanKind::Internal, @@ -900,7 +899,7 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - processor.on_end(unsampled); + processor.on_end(&mut unsampled); assert!(exporter.get_finished_spans().unwrap().is_empty()); } @@ -908,8 +907,8 @@ mod tests { fn simple_span_processor_shutdown_calls_shutdown() { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); - let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + let mut span_data = new_test_export_span_data(); + processor.on_end(&mut span_data); assert!(!exporter.get_finished_spans().unwrap().is_empty()); let _result = processor.shutdown(); // Assume shutdown is called by ensuring spans are empty in the exporter @@ -1110,8 +1109,8 @@ mod tests { .build(); let processor = BatchSpanProcessor::new(exporter, config); - let test_span = create_test_span("test_span"); - processor.on_end(test_span.clone()); + let mut test_span = create_test_span("test_span"); + processor.on_end(&mut test_span); // Wait for flush interval to ensure the span is processed std::thread::sleep(Duration::from_secs(6)); @@ -1133,8 +1132,8 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); // Create a test span and send it to the processor - let test_span = create_test_span("force_flush_span"); - processor.on_end(test_span.clone()); + let mut test_span = create_test_span("force_flush_span"); + processor.on_end(&mut test_span); // Call force_flush to immediately export the spans let flush_result = processor.force_flush(); @@ -1162,8 +1161,8 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); // Create a test span and send it to the processor - let test_span = create_test_span("shutdown_span"); - processor.on_end(test_span.clone()); + let mut test_span = create_test_span("shutdown_span"); + processor.on_end(&mut test_span); // Call shutdown to flush and export all pending spans let shutdown_result = processor.shutdown(); @@ -1197,13 +1196,13 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); // Create test spans and send them to the processor - let span1 = create_test_span("span1"); - let span2 = create_test_span("span2"); - let span3 = create_test_span("span3"); // This span should be dropped + let mut span1 = create_test_span("span1"); + let mut span2 = create_test_span("span2"); + let mut span3 = create_test_span("span3"); // This span should be dropped - processor.on_end(span1.clone()); - processor.on_end(span2.clone()); - processor.on_end(span3.clone()); // This span exceeds the queue size + processor.on_end(&mut span1); + processor.on_end(&mut span2); + processor.on_end(&mut span3); // This span exceeds the queue size // Wait for the scheduled delay to expire std::thread::sleep(Duration::from_secs(3)); @@ -1243,7 +1242,7 @@ mod tests { KeyValue::new("key1", "value1"), KeyValue::new("key2", "value2"), ]; - processor.on_end(span_data.clone()); + processor.on_end(&mut span_data); // Force flush to export the span let _ = processor.force_flush(); @@ -1273,8 +1272,8 @@ mod tests { processor.set_resource(&resource); // Create a span and send it to the processor - let test_span = create_test_span("resource_test"); - processor.on_end(test_span.clone()); + let mut test_span = create_test_span("resource_test"); + processor.on_end(&mut test_span); // Force flush to ensure the span is exported let _ = processor.force_flush(); @@ -1308,8 +1307,8 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); for _ in 0..4 { - let span = new_test_export_span_data(); - processor.on_end(span); + let mut span = new_test_export_span_data(); + processor.on_end(&mut span); } processor.force_flush().unwrap(); @@ -1331,8 +1330,8 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); for _ in 0..4 { - let span = new_test_export_span_data(); - processor.on_end(span); + let mut span = new_test_export_span_data(); + processor.on_end(&mut span); } processor.force_flush().unwrap(); @@ -1358,8 +1357,8 @@ mod tests { for _ in 0..10 { let processor_clone = Arc::clone(&processor); let handle = tokio::spawn(async move { - let span = new_test_export_span_data(); - processor_clone.on_end(span); + let mut span = new_test_export_span_data(); + processor_clone.on_end(&mut span); }); handles.push(handle); } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index cb4d2fc14b..bcebafa2ae 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -102,12 +102,14 @@ impl SpanProcessor for BatchSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { + fn on_end(&self, span: &mut SpanData) { if !span.span_context.is_sampled() { return; } - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); + let result = self + .message_sender + .try_send(BatchMessage::ExportSpan(span.clone())); // If the queue is full, and we can't buffer a span if result.is_err() { @@ -518,7 +520,7 @@ mod tests { } }); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end(&mut new_test_export_span_data()); let flush_res = processor.force_flush(); assert!(flush_res.is_ok()); let _shutdown_result = processor.shutdown(); @@ -545,7 +547,7 @@ mod tests { }; let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end(&mut new_test_export_span_data()); let flush_res = processor.force_flush(); if time_out { assert!(flush_res.is_err()); diff --git a/stress/src/traces.rs b/stress/src/traces.rs index 4cd713e4b2..10d6cf31e9 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -37,7 +37,7 @@ impl SpanProcessor for NoOpSpanProcessor { // No-op } - fn on_end(&self, _span: SpanData) { + fn on_end(&self, _span: &mut SpanData) { // No-op }