Skip to content

Commit 1c01581

Browse files
authored
Merge branch 'main' into http-json-example
2 parents f2bd44f + 4df74e8 commit 1c01581

File tree

11 files changed

+397
-38
lines changed

11 files changed

+397
-38
lines changed

opentelemetry-otlp/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using
1919
- Fixing the OTLP HTTP/JSON exporter. [#1882](https://github.com/open-telemetry/opentelemetry-rust/pull/1882) - The exporter was broken in the
2020
previous release.
2121
- **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated.
22+
- Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).
23+
2224

2325
## v0.16.0
2426

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl LogExporter for OtlpHttpClient {
2525
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
2626
.collect::<Vec<LogData>>();
2727

28-
let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
28+
let (body, content_type) = { self.build_logs_export_body(owned_batch)? };
2929
let mut request = http::Request::builder()
3030
.method(Method::POST)
3131
.uri(&self.collector_endpoint)

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use crate::{
99
use http::{HeaderName, HeaderValue, Uri};
1010
use opentelemetry_http::HttpClient;
1111
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
12-
12+
#[cfg(feature = "logs")]
13+
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
14+
#[cfg(feature = "trace")]
15+
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
1316
#[cfg(feature = "logs")]
1417
use opentelemetry_sdk::export::logs::LogData;
1518
#[cfg(feature = "trace")]
@@ -307,16 +310,9 @@ impl OtlpHttpClient {
307310
fn build_trace_export_body(
308311
&self,
309312
spans: Vec<SpanData>,
310-
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
311313
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
312-
use opentelemetry_proto::tonic::{
313-
collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans,
314-
};
315-
316-
let resource_spans = spans
317-
.into_iter()
318-
.map(|span| ResourceSpans::new(span, resource))
319-
.collect::<Vec<_>>();
314+
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
315+
let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);
320316

321317
let req = ExportTraceServiceRequest { resource_spans };
322318
match self.protocol {
@@ -333,13 +329,9 @@ impl OtlpHttpClient {
333329
fn build_logs_export_body(
334330
&self,
335331
logs: Vec<LogData>,
336-
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
337332
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
338333
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
339-
let resource_logs = logs
340-
.into_iter()
341-
.map(|log_event| (log_event, resource).into())
342-
.collect::<Vec<_>>();
334+
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
343335
let req = ExportLogsServiceRequest { resource_logs };
344336

345337
match self.protocol {

opentelemetry-otlp/src/exporter/http/trace.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient {
2121
Err(err) => return Box::pin(std::future::ready(Err(err))),
2222
};
2323

24-
let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {
24+
let (body, content_type) = match self.build_trace_export_body(batch) {
2525
Ok(body) => body,
2626
Err(e) => return Box::pin(std::future::ready(Err(e))),
2727
};

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
77
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
88
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
99

10+
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
11+
1012
use super::BoxInterceptor;
1113

1214
pub(crate) struct TonicLogsClient {
@@ -65,15 +67,13 @@ impl LogExporter for TonicLogsClient {
6567
None => return Err(LogError::Other("exporter is already shut down".into())),
6668
};
6769

68-
// TODO: Avoid cloning here.
69-
let resource_logs = {
70-
batch
71-
.into_iter()
72-
.map(|log_data_cow| (log_data_cow.into_owned()))
73-
.map(|log_data| (log_data, &self.resource))
74-
.map(Into::into)
75-
.collect()
76-
};
70+
//TODO: avoid cloning here.
71+
let owned_batch = batch
72+
.into_iter()
73+
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
74+
.collect::<Vec<LogData>>();
75+
76+
let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource);
7777

7878
client
7979
.export(Request::from_parts(

opentelemetry-otlp/src/exporter/tonic/trace.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ use opentelemetry::trace::TraceError;
55
use opentelemetry_proto::tonic::collector::trace::v1::{
66
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
77
};
8-
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
98
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
109
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
1110

11+
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
12+
1213
use super::BoxInterceptor;
1314

1415
pub(crate) struct TonicTracesClient {
@@ -71,13 +72,7 @@ impl SpanExporter for TonicTracesClient {
7172
}
7273
};
7374

74-
// TODO: Avoid cloning here.
75-
let resource_spans = {
76-
batch
77-
.into_iter()
78-
.map(|log_data| ResourceSpans::new(log_data, &self.resource))
79-
.collect()
80-
};
75+
let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
8176

8277
Box::pin(async move {
8378
client

opentelemetry-proto/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## vNext
44

55
- Bump MSRV to 1.70 [1864](https://github.com/open-telemetry/opentelemetry-rust/pull/1874)
6+
- Group log and Span batch by their resource and instrumentation scope before exporting [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).
7+
- Introduced `group_logs_by_resource_and_scope()` and `group_spans_by_resource_and_scope()` methods to group logs and spans by the resource and scope respectively.
68

79
## v0.6.0
810

opentelemetry-proto/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ path = "tests/json_deserialize.rs"
2929

3030

3131
[features]
32-
default = []
32+
default = ["full"]
3333

3434
full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"]
3535

@@ -42,6 +42,7 @@ trace = ["opentelemetry/trace", "opentelemetry_sdk/trace"]
4242
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"]
4343
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs"]
4444
zpages = ["trace"]
45+
testing = ["opentelemetry/testing"]
4546

4647
# add ons
4748
with-schemars = ["schemars"]
@@ -57,7 +58,8 @@ serde = { workspace = true, optional = true, features = ["serde_derive"] }
5758
hex = { version = "0.4.3", optional = true }
5859

5960
[dev-dependencies]
61+
opentelemetry = { version = "0.23", features = ["testing"], path = "../opentelemetry" }
6062
tonic-build = { workspace = true }
6163
prost-build = { workspace = true }
6264
tempfile = "3.3.0"
63-
serde_json = { workspace = true }
65+
serde_json = { workspace = true }

opentelemetry-proto/src/transform/logs.rs

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22
pub mod tonic {
33
use crate::{
44
tonic::{
5-
common::v1::{any_value::Value, AnyValue, ArrayValue, KeyValue, KeyValueList},
5+
common::v1::{
6+
any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
7+
KeyValueList,
8+
},
69
logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
710
resource::v1::Resource,
811
Attributes,
912
},
1013
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
1114
};
1215
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
16+
use std::borrow::Cow;
17+
use std::collections::HashMap;
1318

1419
impl From<LogsAnyValue> for AnyValue {
1520
fn from(value: LogsAnyValue) -> Self {
@@ -143,4 +148,123 @@ pub mod tonic {
143148
}
144149
}
145150
}
151+
152+
pub fn group_logs_by_resource_and_scope(
153+
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
154+
resource: &ResourceAttributesWithSchema,
155+
) -> Vec<ResourceLogs> {
156+
// Group logs by target or instrumentation name
157+
let scope_map = logs.iter().fold(
158+
HashMap::new(),
159+
|mut scope_map: HashMap<
160+
Cow<'static, str>,
161+
Vec<&opentelemetry_sdk::export::logs::LogData>,
162+
>,
163+
log| {
164+
let key = log
165+
.record
166+
.target
167+
.clone()
168+
.unwrap_or_else(|| log.instrumentation.name.clone());
169+
scope_map.entry(key).or_default().push(log);
170+
scope_map
171+
},
172+
);
173+
174+
let scope_logs = scope_map
175+
.into_iter()
176+
.map(|(key, log_data)| ScopeLogs {
177+
scope: Some(InstrumentationScope::from((
178+
&log_data.first().unwrap().instrumentation,
179+
Some(key),
180+
))),
181+
schema_url: resource.schema_url.clone().unwrap_or_default(),
182+
log_records: log_data
183+
.into_iter()
184+
.map(|log_data| log_data.record.clone().into())
185+
.collect(),
186+
})
187+
.collect();
188+
189+
vec![ResourceLogs {
190+
resource: Some(Resource {
191+
attributes: resource.attributes.0.clone(),
192+
dropped_attributes_count: 0,
193+
}),
194+
scope_logs,
195+
schema_url: resource.schema_url.clone().unwrap_or_default(),
196+
}]
197+
}
198+
}
199+
200+
#[cfg(test)]
201+
mod tests {
202+
use crate::transform::common::tonic::ResourceAttributesWithSchema;
203+
use opentelemetry::logs::LogRecord as _;
204+
use opentelemetry_sdk::export::logs::LogData;
205+
use opentelemetry_sdk::{logs::LogRecord, Resource};
206+
use std::time::SystemTime;
207+
208+
fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
209+
let mut logrecord = LogRecord::default();
210+
logrecord.set_timestamp(SystemTime::now());
211+
logrecord.set_observed_timestamp(SystemTime::now());
212+
LogData {
213+
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
214+
instrumentation_name.to_string(),
215+
)
216+
.build(),
217+
record: logrecord,
218+
}
219+
}
220+
221+
#[test]
222+
fn test_group_logs_by_resource_and_scope_single_scope() {
223+
let resource = Resource::default();
224+
let log1 = create_test_log_data("test-lib", "Log 1");
225+
let log2 = create_test_log_data("test-lib", "Log 2");
226+
227+
let logs = vec![log1, log2];
228+
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
229+
230+
let grouped_logs =
231+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);
232+
233+
assert_eq!(grouped_logs.len(), 1);
234+
let resource_logs = &grouped_logs[0];
235+
assert_eq!(resource_logs.scope_logs.len(), 1);
236+
237+
let scope_logs = &resource_logs.scope_logs[0];
238+
assert_eq!(scope_logs.log_records.len(), 2);
239+
}
240+
241+
#[test]
242+
fn test_group_logs_by_resource_and_scope_multiple_scopes() {
243+
let resource = Resource::default();
244+
let log1 = create_test_log_data("lib1", "Log 1");
245+
let log2 = create_test_log_data("lib2", "Log 2");
246+
247+
let logs = vec![log1, log2];
248+
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
249+
let grouped_logs =
250+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);
251+
252+
assert_eq!(grouped_logs.len(), 1);
253+
let resource_logs = &grouped_logs[0];
254+
assert_eq!(resource_logs.scope_logs.len(), 2);
255+
256+
let scope_logs_1 = &resource_logs
257+
.scope_logs
258+
.iter()
259+
.find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
260+
.unwrap();
261+
let scope_logs_2 = &resource_logs
262+
.scope_logs
263+
.iter()
264+
.find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
265+
.unwrap();
266+
267+
assert_eq!(scope_logs_1.log_records.len(), 1);
268+
assert_eq!(scope_logs_2.log_records.len(), 1);
269+
}
146270
}

0 commit comments

Comments
 (0)