From 326e64d2dc27cabced4de58eedc55ee33c68cbd4 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 02:36:19 -0700 Subject: [PATCH 01/18] initial commit --- opentelemetry-otlp/src/exporter/http/logs.rs | 2 +- opentelemetry-otlp/src/exporter/http/mod.rs | 2 +- opentelemetry-otlp/src/exporter/tonic/logs.rs | 2 +- opentelemetry-otlp/src/logs.rs | 2 +- opentelemetry-proto/src/transform/logs.rs | 12 +++++------ opentelemetry-sdk/src/export/logs/mod.rs | 2 +- opentelemetry-sdk/src/logs/log_processor.rs | 21 +++++++------------ .../src/testing/logs/in_memory_exporter.rs | 8 +++---- opentelemetry-stdout/src/logs/exporter.rs | 2 +- 9 files changed, 24 insertions(+), 29 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 83f25c1f9f..f06a0a79ce 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -11,7 +11,7 @@ use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { let client = self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 1b60971d76..973366be22 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -330,7 +330,7 @@ impl OtlpHttpClient { #[cfg(feature = "logs")] fn build_logs_export_body( &self, - logs: Vec<(&LogRecord, &InstrumentationLibrary)>, + logs: &[(&LogRecord, &InstrumentationLibrary)], ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index e6d9661b8d..acc3e7c206 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -56,7 +56,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 65b913d11b..b6156126f4 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -99,7 +99,7 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index b3b4b895fd..513d31184f 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -177,10 +177,10 @@ pub mod tonic { } pub fn group_logs_by_resource_and_scope( - logs: Vec<( + logs: &[( &opentelemetry_sdk::logs::LogRecord, &opentelemetry::InstrumentationLibrary, - )>, + )], resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -258,11 +258,11 @@ mod tests { let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1"); let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2"); - let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&logs, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -278,10 +278,10 @@ mod tests { let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1"); let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2"); - let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&logs, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index afa6df0ee2..25fdb145dc 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -14,7 +14,7 @@ use std::fmt::Debug; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogRecord`, `InstrumentationLibrary`]. - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index ce1032ec62..e5d8acc197 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -106,7 +106,8 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - futures_executor::block_on(exporter.export(vec![(record, instrumentation)])) + let log_batch = &[(record as &LogRecord, instrumentation)][..]; + futures_executor::block_on(exporter.export(log_batch)) }); if let Err(err) = result { global::handle_error(err); @@ -222,7 +223,7 @@ impl BatchLogProcessor { config.max_export_timeout, exporter.as_mut(), &timeout_runtime, - logs.split_off(0), + &logs.iter().map(|log| (&log.0, &log.1)).collect::>(), ) .await; @@ -237,7 +238,7 @@ impl BatchLogProcessor { config.max_export_timeout, exporter.as_mut(), &timeout_runtime, - logs.split_off(0), + &logs.iter().map(|log| (&log.0, &log.1)).collect::>(), ) .await; @@ -258,7 +259,7 @@ impl BatchLogProcessor { config.max_export_timeout, exporter.as_mut(), &timeout_runtime, - logs.split_off(0), + &logs.iter().map(|log| (&log.0, &log.1)).collect::>(), ) .await; @@ -303,7 +304,7 @@ async fn export_with_timeout( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec<(LogRecord, InstrumentationLibrary)>, + batch: &[(&LogRecord, &InstrumentationLibrary)], ) -> ExportResult where R: RuntimeChannel, @@ -312,14 +313,8 @@ where if batch.is_empty() { return Ok(()); } - // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> - // TBD - Can we avoid this conversion as it involves heap allocation with new vector? - let export_batch = batch - .iter() - .map(|log_data| (&log_data.0, &log_data.1)) - .collect(); - let export = exporter.export(export_batch); + let export = exporter.export(batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -549,7 +544,7 @@ mod tests { impl LogExporter for MockLogExporter { async fn export( &mut self, - _batch: Vec<(&LogRecord, &InstrumentationLibrary)>, + _batch: &[(&LogRecord, &InstrumentationLibrary)], ) -> LogResult<()> { Ok(()) } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 170139bf9d..f2614acb12 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -184,12 +184,12 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; - for (log_record, instrumentation) in batch.into_iter() { + for (log_record, instrumentation) in batch.iter() { let owned_log = OwnedLogData { - record: log_record.clone(), - instrumentation: instrumentation.clone(), + record: (*log_record).clone(), + instrumentation: (*instrumentation).clone(), }; logs_guard.push(owned_log); } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index fd59701c0b..2e61f126c6 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -45,7 +45,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { if let Some(writer) = &mut self.writer { let result = (self.encoder)(writer, (batch, &self.resource).into()) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) From ff3aae4eda2d9bd0d51112eec34406ef663d7d87 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 02:43:24 -0700 Subject: [PATCH 02/18] update stdout --- opentelemetry-stdout/src/logs/exporter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index f44627585d..ebb3fe4ff1 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -66,7 +66,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } -fn print_logs(batch: Vec<(&LogRecord, &InstrumentationLibrary)>) { +fn print_logs(batch: &[(&LogRecord, &InstrumentationLibrary)]) { for (i, log) in batch.into_iter().enumerate() { println!("Log #{}", i); let (record, _library) = log; From 4a28fdcd3faea6e9d2697b26e86922d66bbc554a Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 03:52:15 -0700 Subject: [PATCH 03/18] lint errors --- opentelemetry-appender-tracing/benches/logs.rs | 2 +- opentelemetry-stdout/src/logs/exporter.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 1252e73e79..4d6866e6f9 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -34,7 +34,7 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export(&mut self, _: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { + async fn export(&mut self, _: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { LogResult::Ok(()) } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index ebb3fe4ff1..9fbb87c735 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -67,7 +67,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } fn print_logs(batch: &[(&LogRecord, &InstrumentationLibrary)]) { - for (i, log) in batch.into_iter().enumerate() { + for (i, log) in batch.iter().enumerate() { println!("Log #{}", i); let (record, _library) = log; if let Some(event_name) = record.event_name { From b69aa8ae3841e437662a36010ef31523e50c77c1 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 10:49:30 -0700 Subject: [PATCH 04/18] update bench --- opentelemetry-sdk/benches/log_exporter.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 40e8dda846..9047b38381 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -29,11 +29,11 @@ use std::fmt::Debug; // cargo bench --bench log_exporter #[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]); } pub trait LogExporterWithoutFuture: Send + Sync + Debug { - fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); + fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]); } #[derive(Debug)] @@ -41,13 +41,13 @@ struct NoOpExporterWithFuture {} #[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { - async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} + async fn export(&mut self, _batch: &[(&LogRecord, &InstrumentationLibrary)]) {} } #[derive(Debug)] struct NoOpExporterWithoutFuture {} impl LogExporterWithoutFuture for NoOpExporterWithoutFuture { - fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} + fn export(&mut self, _batch: &[(&LogRecord, &InstrumentationLibrary)]) {} } #[derive(Debug)] @@ -66,7 +66,7 @@ impl ExportingProcessorWithFuture { impl LogProcessor for ExportingProcessorWithFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { let mut exporter = self.exporter.lock().expect("lock error"); - futures_executor::block_on(exporter.export(vec![(record, library)])); + futures_executor::block_on(exporter.export(&[(record, library)])); } fn force_flush(&self) -> LogResult<()> { @@ -96,7 +96,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture { self.exporter .lock() .expect("lock error") - .export(vec![(record, library)]); + .export(&[(record, library)]); } fn force_flush(&self) -> LogResult<()> { From f66925752d15bce00e6c7333bc765657abb8c962 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 11:12:28 -0700 Subject: [PATCH 05/18] update bench --- opentelemetry-appender-tracing/benches/logs.rs | 2 +- opentelemetry-sdk/benches/log_exporter.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 4d6866e6f9..fae9b8fbe5 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -10,7 +10,7 @@ | noop_layer_disabled | 12 ns | | noop_layer_enabled | 25 ns | | ot_layer_disabled | 19 ns | - | ot_layer_enabled | 203 ns | + | ot_layer_enabled | 196 ns | */ use async_trait::async_trait; diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 9047b38381..53eb17cc9a 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -6,8 +6,8 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | LogExporterWithFuture | 280 ns | - | LogExporterWithoutFuture | 255 ns | + | LogExporterWithFuture | 109 ns | + | LogExporterWithoutFuture | 88 ns | */ use std::sync::Mutex; From d1d24f0569a30c8fee243d7c5a9c81f09e5e61d2 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 22:32:57 -0700 Subject: [PATCH 06/18] fix batch --- opentelemetry-sdk/src/logs/log_processor.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index e5d8acc197..e6081f4e28 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -223,7 +223,7 @@ impl BatchLogProcessor { config.max_export_timeout, exporter.as_mut(), &timeout_runtime, - &logs.iter().map(|log| (&log.0, &log.1)).collect::>(), + logs.split_off(0), ) .await; @@ -238,7 +238,7 @@ impl BatchLogProcessor { config.max_export_timeout, exporter.as_mut(), &timeout_runtime, - &logs.iter().map(|log| (&log.0, &log.1)).collect::>(), + logs.split_off(0), ) .await; @@ -259,7 +259,7 @@ impl BatchLogProcessor { config.max_export_timeout, exporter.as_mut(), &timeout_runtime, - &logs.iter().map(|log| (&log.0, &log.1)).collect::>(), + logs.split_off(0), ) .await; @@ -304,7 +304,7 @@ async fn export_with_timeout( time_out: Duration, exporter: &mut E, runtime: &R, - batch: &[(&LogRecord, &InstrumentationLibrary)], + batch: Vec<(LogRecord, InstrumentationLibrary)>, ) -> ExportResult where R: RuntimeChannel, @@ -314,7 +314,14 @@ where return Ok(()); } - let export = exporter.export(batch); + // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> + // TBD - Can we avoid this conversion as it involves heap allocation with new vector? + let export_batch: Vec<(&LogRecord, &InstrumentationLibrary)> = batch + .iter() + .map(|log_data| (&log_data.0, &log_data.1)) + .collect(); + + let export = exporter.export(export_batch.as_slice()); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); From ce2e53b90b1cec38fb37ec1235fb7760b5406874 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 23:09:40 -0700 Subject: [PATCH 07/18] revert benchmark updats --- opentelemetry-appender-tracing/benches/logs.rs | 2 +- opentelemetry-sdk/benches/log_exporter.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index fae9b8fbe5..4d6866e6f9 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -10,7 +10,7 @@ | noop_layer_disabled | 12 ns | | noop_layer_enabled | 25 ns | | ot_layer_disabled | 19 ns | - | ot_layer_enabled | 196 ns | + | ot_layer_enabled | 203 ns | */ use async_trait::async_trait; diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 53eb17cc9a..9047b38381 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -6,8 +6,8 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | LogExporterWithFuture | 109 ns | - | LogExporterWithoutFuture | 88 ns | + | LogExporterWithFuture | 280 ns | + | LogExporterWithoutFuture | 255 ns | */ use std::sync::Mutex; From c7c7f0dc6347e75267716284744c70b8cef3307b Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 23:26:31 -0700 Subject: [PATCH 08/18] add changelog --- opentelemetry-sdk/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 283db572c3..5f9c47f027 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -26,6 +26,7 @@ - Provide default implementation for `event_enabled` method in `LogProcessor` trait that returns `true` always. - **Breaking** [#2041](https://github.com/open-telemetry/opentelemetry-rust/pull/2041) + and [#2057](https://github.com/open-telemetry/opentelemetry-rust/pull/2057) - The Exporter::export() interface is modified as below: Previous Signature: ```rust @@ -34,7 +35,7 @@ Updated Signature: ```rust - async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; + async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()>; ``` This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. From 3bc0725bdc391c0c839046a234646a8d754f08aa Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Aug 2024 23:41:25 -0700 Subject: [PATCH 09/18] update bench --- opentelemetry-sdk/CHANGELOG.md | 2 +- opentelemetry-sdk/benches/log_exporter.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 5f9c47f027..93aa2e25b2 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -37,7 +37,7 @@ ```rust async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()>; ``` - This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. + This change enhances performance by reducing unnecessary heap allocations and maintains object safety, allowing for more efficient handling of log records. It also simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. ## v0.24.1 diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 9047b38381..3e86ac5159 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -6,8 +6,8 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | LogExporterWithFuture | 280 ns | - | LogExporterWithoutFuture | 255 ns | + | LogExporterWithFuture | 108 ns | + | LogExporterWithoutFuture | 88 ns | */ use std::sync::Mutex; From 0ec6101b17c8b8fe50146d4114acd01334d66686 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 29 Aug 2024 07:03:44 +0000 Subject: [PATCH 10/18] add black_box to log-exporter bench --- opentelemetry-sdk/benches/log_exporter.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 3e86ac5159..b51f31a318 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -14,7 +14,7 @@ use std::sync::Mutex; use std::time::SystemTime; use async_trait::async_trait; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; @@ -66,7 +66,9 @@ impl ExportingProcessorWithFuture { impl LogProcessor for ExportingProcessorWithFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { let mut exporter = self.exporter.lock().expect("lock error"); - futures_executor::block_on(exporter.export(&[(record, library)])); + black_box(futures_executor::block_on( + exporter.export(&[(record, library)]), + )); } fn force_flush(&self) -> LogResult<()> { @@ -93,10 +95,12 @@ impl ExportingProcessorWithoutFuture { impl LogProcessor for ExportingProcessorWithoutFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { - self.exporter - .lock() - .expect("lock error") - .export(&[(record, library)]); + black_box( + self.exporter + .lock() + .expect("lock error") + .export(&[(record, library)]), + ); } fn force_flush(&self) -> LogResult<()> { From a52c2c093aea78e916197ebe7a0b7873325df4c9 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 29 Aug 2024 00:25:22 -0700 Subject: [PATCH 11/18] remove blackbox, as there is no effect --- opentelemetry-sdk/benches/log_exporter.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index b51f31a318..5fa2727ed5 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -66,9 +66,7 @@ impl ExportingProcessorWithFuture { impl LogProcessor for ExportingProcessorWithFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { let mut exporter = self.exporter.lock().expect("lock error"); - black_box(futures_executor::block_on( - exporter.export(&[(record, library)]), - )); + futures_executor::block_on(exporter.export(&[(record, library)])); } fn force_flush(&self) -> LogResult<()> { @@ -95,12 +93,10 @@ impl ExportingProcessorWithoutFuture { impl LogProcessor for ExportingProcessorWithoutFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { - black_box( - self.exporter - .lock() - .expect("lock error") - .export(&[(record, library)]), - ); + self.exporter + .lock() + .expect("lock error") + .export(&[(record, library)]); } fn force_flush(&self) -> LogResult<()> { From e91c7522d295405f45c459474bf9cbbb731a328e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 29 Aug 2024 00:51:06 -0700 Subject: [PATCH 12/18] fix --- opentelemetry-sdk/benches/log_exporter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 5fa2727ed5..3e86ac5159 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -14,7 +14,7 @@ use std::sync::Mutex; use std::time::SystemTime; use async_trait::async_trait; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; From a4731c3604e260101b2be39921f3b3cbbec9d109 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 30 Aug 2024 02:13:12 -0700 Subject: [PATCH 13/18] use LogBatch --- opentelemetry-otlp/src/exporter/http/logs.rs | 6 +- opentelemetry-otlp/src/exporter/http/mod.rs | 8 +-- opentelemetry-otlp/src/exporter/tonic/logs.rs | 6 +- opentelemetry-otlp/src/logs.rs | 6 +- opentelemetry-proto/src/transform/logs.rs | 14 ++-- opentelemetry-sdk/src/export/logs/mod.rs | 72 ++++++++++++++++++- opentelemetry-sdk/src/logs/log_processor.rs | 19 +++-- .../src/testing/logs/in_memory_exporter.rs | 4 +- opentelemetry-stdout/src/logs/exporter.rs | 5 +- 9 files changed, 100 insertions(+), 40 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index f06a0a79ce..db1932868b 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,15 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::export::logs::LogExporter; -use opentelemetry_sdk::logs::LogRecord; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { let client = self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 973366be22..3ccefa4caa 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -7,18 +7,16 @@ use crate::{ OTEL_EXPORTER_OTLP_TIMEOUT, }; use http::{HeaderName, HeaderValue, Uri}; -#[cfg(feature = "logs")] -use opentelemetry::InstrumentationLibrary; use opentelemetry_http::HttpClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; #[cfg(feature = "logs")] use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; #[cfg(feature = "trace")] use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; +#[cfg(feature = "logs")] +use opentelemetry_sdk::export::logs::LogBatch; #[cfg(feature = "trace")] use opentelemetry_sdk::export::trace::SpanData; -#[cfg(feature = "logs")] -use opentelemetry_sdk::logs::LogRecord; #[cfg(feature = "metrics")] use opentelemetry_sdk::metrics::data::ResourceMetrics; use prost::Message; @@ -330,7 +328,7 @@ impl OtlpHttpClient { #[cfg(feature = "logs")] fn build_logs_export_body( &self, - logs: &[(&LogRecord, &InstrumentationLibrary)], + logs: LogBatch<'_>, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index acc3e7c206..bf9b6c9ed3 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -4,14 +4,12 @@ use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; -use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::logs::LogRecord; pub(crate) struct TonicLogsClient { inner: Option, @@ -56,7 +54,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index b6156126f4..71f5a34b3d 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -13,9 +13,9 @@ use async_trait::async_trait; use std::fmt::Debug; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::{logs::LogRecord, runtime::RuntimeChannel, Resource}; +use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::{runtime::RuntimeChannel, Resource}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -99,7 +99,7 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index b94eb19f1d..4cd2c1617b 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -12,6 +12,7 @@ pub mod tonic { transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; + use opentelemetry_sdk::export::logs::LogBatch; use std::borrow::Cow; use std::collections::HashMap; @@ -177,10 +178,7 @@ pub mod tonic { } pub fn group_logs_by_resource_and_scope( - logs: &[( - &opentelemetry_sdk::logs::LogRecord, - &opentelemetry::InstrumentationLibrary, - )], + logs: LogBatch<'_>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -237,7 +235,7 @@ mod tests { use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; use opentelemetry::InstrumentationLibrary; - use opentelemetry_sdk::{logs::LogRecord, Resource}; + use opentelemetry_sdk::{export::logs::LogBatch, logs::LogRecord, Resource}; use std::time::SystemTime; fn create_test_log_data( @@ -259,10 +257,11 @@ mod tests { let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2"); let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(&logs, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -279,9 +278,10 @@ mod tests { let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2"); let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(&logs, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 25fdb145dc..d10c06b593 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -10,11 +10,79 @@ use opentelemetry::{ }; use std::fmt::Debug; +/// A batch of log records to be exported by a `LogExporter`. +/// +/// The `LogBatch` struct holds a collection of log records along with their associated +/// instrumentation libraries. This structure is used to group log records together for efficient +/// export operations. +/// +/// # Type Parameters +/// - `'a`: The lifetime of the references to the log records and instrumentation libraries. +/// +#[derive(Debug)] +pub struct LogBatch<'a> { + /// The data field contains a slice of tuples, where each tuple consists of a reference to + /// a `LogRecord` and a reference to an `InstrumentationLibrary`. + data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)], +} + +impl<'a> LogBatch<'a> { + /// Creates a new instance of `LogBatch`. + /// + /// # Arguments + /// + /// * `data` - A slice of tuples, where each tuple consists of a reference to a `LogRecord` + /// and a reference to an `InstrumentationLibrary`. These tuples represent the log records + /// and their associated instrumentation libraries to be exported. + /// + /// # Returns + /// + /// A `LogBatch` instance containing the provided log records and instrumentation libraries. + /// + + pub fn new(data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)]) -> LogBatch<'a> { + LogBatch { data } + } +} + +impl LogBatch<'_> { + /// Returns an iterator over the log records and instrumentation libraries in the batch. + /// + /// Each item yielded by the iterator is a tuple containing references to a `LogRecord` + /// and an `InstrumentationLibrary`. + /// + /// # Returns + /// + /// An iterator that yields references to the `LogRecord` and `InstrumentationLibrary` in the batch. + /// + pub fn iter(&self) -> impl Iterator { + self.data + .iter() + .map(|(record, library)| (*record, *library)) + } +} + /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] pub trait LogExporter: Send + Sync + Debug { - /// Exports a batch of [`LogRecord`, `InstrumentationLibrary`]. - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()>; + /// Exports a batch of log records and their associated instrumentation libraries. + /// + /// The `export` method is responsible for sending a batch of log records to an external + /// destination. It takes a `LogBatch` as an argument, which contains references to the + /// log records and their corresponding instrumentation libraries. The method returns + /// a `LogResult` indicating the success or failure of the export operation. + /// + /// # Arguments + /// + /// * `batch` - A `LogBatch` containing the log records and instrumentation libraries + /// to be exported. + /// + /// # Returns + /// + /// A `LogResult<()>`, which is a result type indicating either a successful export (with + /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. + /// + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index e6081f4e28..040250a609 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,5 +1,5 @@ use crate::{ - export::logs::{ExportResult, LogExporter}, + export::logs::{ExportResult, LogBatch, LogExporter}, logs::LogRecord, runtime::{RuntimeChannel, TrySend}, Resource, @@ -106,7 +106,8 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - let log_batch = &[(record as &LogRecord, instrumentation)][..]; + let log_tuple = &[(record as &LogRecord, instrumentation)]; + let log_batch = LogBatch::new(log_tuple); futures_executor::block_on(exporter.export(log_batch)) }); if let Err(err) = result { @@ -314,14 +315,13 @@ where return Ok(()); } - // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> // TBD - Can we avoid this conversion as it involves heap allocation with new vector? - let export_batch: Vec<(&LogRecord, &InstrumentationLibrary)> = batch + let log_vec: Vec<(&LogRecord, &InstrumentationLibrary)> = batch .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - - let export = exporter.export(export_batch.as_slice()); + let log_batch: LogBatch<'_> = LogBatch::new(log_vec.as_slice()); + let export = exporter.export(log_batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -517,7 +517,7 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; - use crate::export::logs::LogExporter; + use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ @@ -549,10 +549,7 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export( - &mut self, - _batch: &[(&LogRecord, &InstrumentationLibrary)], - ) -> LogResult<()> { + async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> { Ok(()) } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index f2614acb12..958ab11fe1 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::LogExporter; +use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; use crate::Resource; use async_trait::async_trait; @@ -184,7 +184,7 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for (log_record, instrumentation) in batch.iter() { let owned_log = OwnedLogData { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 9fbb87c735..c4e3f1b338 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -3,6 +3,7 @@ use chrono::{DateTime, Utc}; use core::fmt; use opentelemetry::logs::LogResult; use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::export::logs::LogBatch; use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::Resource; use std::sync::atomic; @@ -33,7 +34,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { if self.is_shutdown.load(atomic::Ordering::SeqCst) { return Err("exporter is shut down".into()); } else { @@ -66,7 +67,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } -fn print_logs(batch: &[(&LogRecord, &InstrumentationLibrary)]) { +fn print_logs(batch: LogBatch<'_>) { for (i, log) in batch.iter().enumerate() { println!("Log #{}", i); let (record, _library) = log; From 0d078f8f90b860db00915e4ea2148747ca4780a1 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 30 Aug 2024 02:43:19 -0700 Subject: [PATCH 14/18] lint error --- opentelemetry-stdout/src/logs/exporter.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index c4e3f1b338..48a8b1a120 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -2,9 +2,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::fmt; use opentelemetry::logs::LogResult; -use opentelemetry::InstrumentationLibrary; use opentelemetry_sdk::export::logs::LogBatch; -use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::Resource; use std::sync::atomic; From bdee3d2ccecfdc87afcbe888d2bc0b3263774c7c Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 30 Aug 2024 02:53:43 -0700 Subject: [PATCH 15/18] update log_exporter bench --- opentelemetry-sdk/benches/log_exporter.rs | 15 +++++++++------ opentelemetry-sdk/src/logs/log_processor.rs | 6 ++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 3e86ac5159..cce5820c4c 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -19,6 +19,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::export::logs::LogBatch; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::LoggerProvider; @@ -29,11 +30,11 @@ use std::fmt::Debug; // cargo bench --bench log_exporter #[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]); + async fn export(&mut self, batch: LogBatch<'_>); } pub trait LogExporterWithoutFuture: Send + Sync + Debug { - fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]); + fn export(&mut self, batch: LogBatch<'_>); } #[derive(Debug)] @@ -41,13 +42,13 @@ struct NoOpExporterWithFuture {} #[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { - async fn export(&mut self, _batch: &[(&LogRecord, &InstrumentationLibrary)]) {} + async fn export(&mut self, _batch: LogBatch<'_>) {} } #[derive(Debug)] struct NoOpExporterWithoutFuture {} impl LogExporterWithoutFuture for NoOpExporterWithoutFuture { - fn export(&mut self, _batch: &[(&LogRecord, &InstrumentationLibrary)]) {} + fn export(&mut self, _batch: LogBatch<'_>) {} } #[derive(Debug)] @@ -66,7 +67,8 @@ impl ExportingProcessorWithFuture { impl LogProcessor for ExportingProcessorWithFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { let mut exporter = self.exporter.lock().expect("lock error"); - futures_executor::block_on(exporter.export(&[(record, library)])); + let logs = [(record as &LogRecord, library)]; + futures_executor::block_on(exporter.export(LogBatch::new(&logs))); } fn force_flush(&self) -> LogResult<()> { @@ -93,10 +95,11 @@ impl ExportingProcessorWithoutFuture { impl LogProcessor for ExportingProcessorWithoutFuture { fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { + let logs = [(record as &LogRecord, library)]; self.exporter .lock() .expect("lock error") - .export(&[(record, library)]); + .export(LogBatch::new(&logs)); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 040250a609..b615acb9b8 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -107,8 +107,7 @@ impl LogProcessor for SimpleLogProcessor { .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; - let log_batch = LogBatch::new(log_tuple); - futures_executor::block_on(exporter.export(log_batch)) + futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); if let Err(err) = result { global::handle_error(err); @@ -320,8 +319,7 @@ where .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - let log_batch: LogBatch<'_> = LogBatch::new(log_vec.as_slice()); - let export = exporter.export(log_batch); + let export = exporter.export(LogBatch::new(log_vec.as_slice())); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); From a250f885544be7cbb8af5e3babfbe2757a78f809 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 30 Aug 2024 02:58:47 -0700 Subject: [PATCH 16/18] update changelog --- opentelemetry-sdk/CHANGELOG.md | 10 +++++++++- opentelemetry-sdk/benches/log_exporter.rs | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 93aa2e25b2..2f44b29e7f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -35,7 +35,15 @@ Updated Signature: ```rust - async fn export(&mut self, batch: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()>; + async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>; + ``` + + where + ```rust + pub struct LogBatch<'a> { + + data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)], + } ``` This change enhances performance by reducing unnecessary heap allocations and maintains object safety, allowing for more efficient handling of log records. It also simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index cce5820c4c..3549c08af5 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -6,8 +6,8 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | LogExporterWithFuture | 108 ns | - | LogExporterWithoutFuture | 88 ns | + | LogExporterWithFuture | 111 ns | + | LogExporterWithoutFuture | 92 ns | */ use std::sync::Mutex; From 540f8077c6711a7575bbf846d7fa4eb6dc3bfded Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 30 Aug 2024 09:06:58 -0700 Subject: [PATCH 17/18] fix lint --- opentelemetry-appender-tracing/benches/logs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index fae9b8fbe5..a5ebb83249 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::LogResult; use opentelemetry::{InstrumentationLibrary, KeyValue}; use opentelemetry_appender_tracing::layer as tracing_layer; -use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; @@ -34,7 +34,7 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export(&mut self, _: &[(&LogRecord, &InstrumentationLibrary)]) -> LogResult<()> { + async fn export(&mut self, _: LogBatch<'_>) -> LogResult<()> { LogResult::Ok(()) } From f1ec18f11cda67236ca106cb9b2af8b21856b284 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 30 Aug 2024 09:43:04 -0700 Subject: [PATCH 18/18] add disclaimer for LogBatch::new --- opentelemetry-sdk/src/export/logs/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index d10c06b593..8056f28222 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -39,6 +39,8 @@ impl<'a> LogBatch<'a> { /// /// A `LogBatch` instance containing the provided log records and instrumentation libraries. /// + /// Note - this is not a public function, and should not be used directly. This would be + /// made private in the future. pub fn new(data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)]) -> LogBatch<'a> { LogBatch { data }