Skip to content

Commit f70afa2

Browse files
authored
chore(otlp): Handle partial success (#3206)
the CI security failure is safe - coming from test containers - which are used for integration test.
1 parent 9bbe272 commit f70afa2

File tree

9 files changed

+396
-28
lines changed

9 files changed

+396
-28
lines changed

opentelemetry-otlp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## vNext
44

5+
- Add partial success response handling for OTLP exporters (traces, metrics, logs) per OTLP spec. Exporters now log warnings when the server returns partial success responses with rejected items and error messages. [#865](https://github.com/open-telemetry/opentelemetry-rust/issues/865)
56
- Refactor `internal-logs` feature in `opentelemetry-otlp` to reduce unnecessary dependencies[3191](https://github.com/open-telemetry/opentelemetry-rust/pull/3192)
67

78
## 0.31.0

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
use super::OtlpHttpClient;
2+
use crate::Protocol;
3+
use opentelemetry::{otel_debug, otel_warn};
24
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
35
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
6+
use prost::Message;
47
use std::time;
58

69
impl LogExporter for OtlpHttpClient {
710
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
8-
self.export_http_with_retry(
9-
batch,
10-
OtlpHttpClient::build_logs_export_body,
11-
"HttpLogsClient.Export",
12-
)
13-
.await
11+
let response_body = self
12+
.export_http_with_retry(
13+
batch,
14+
OtlpHttpClient::build_logs_export_body,
15+
"HttpLogsClient.Export",
16+
)
17+
.await?;
18+
19+
handle_partial_success(&response_body, self.protocol);
20+
Ok(())
1421
}
1522

1623
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
@@ -29,3 +36,68 @@ impl LogExporter for OtlpHttpClient {
2936
self.resource = resource.into();
3037
}
3138
}
39+
40+
/// Handles partial success returned by OTLP endpoints. We log the rejected log records,
41+
/// as well as the error message returned.
42+
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
43+
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceResponse;
44+
45+
let response: ExportLogsServiceResponse = match protocol {
46+
#[cfg(feature = "http-json")]
47+
Protocol::HttpJson => match serde_json::from_slice(response_body) {
48+
Ok(r) => r,
49+
Err(e) => {
50+
otel_debug!(name: "HttpLogsClient.ResponseParseError", error = e.to_string());
51+
return;
52+
}
53+
},
54+
_ => match Message::decode(response_body) {
55+
Ok(r) => r,
56+
Err(e) => {
57+
otel_debug!(name: "HttpLogsClient.ResponseParseError", error = e.to_string());
58+
return;
59+
}
60+
},
61+
};
62+
63+
if let Some(partial_success) = response.partial_success {
64+
if partial_success.rejected_log_records > 0 || !partial_success.error_message.is_empty() {
65+
otel_warn!(
66+
name: "HttpLogsClient.PartialSuccess",
67+
rejected_log_records = partial_success.rejected_log_records,
68+
error_message = partial_success.error_message.as_str(),
69+
);
70+
}
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::*;
77+
78+
#[test]
79+
fn test_handle_invalid_protobuf() {
80+
// Corrupted/invalid protobuf data
81+
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];
82+
83+
// Should not panic - logs debug and returns early
84+
handle_partial_success(&invalid, Protocol::HttpBinary);
85+
}
86+
87+
#[test]
88+
fn test_handle_empty_response() {
89+
let empty = vec![];
90+
91+
// Should not panic
92+
handle_partial_success(&empty, Protocol::HttpBinary);
93+
}
94+
95+
#[cfg(feature = "http-json")]
96+
#[test]
97+
fn test_handle_invalid_json() {
98+
let invalid_json = b"{not valid json}";
99+
100+
// Should not panic - logs debug and returns
101+
handle_partial_success(invalid_json, Protocol::HttpJson);
102+
}
103+
}

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::metric::MetricsClient;
2+
use crate::Protocol;
3+
use opentelemetry::{otel_debug, otel_warn};
24
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
35
use opentelemetry_sdk::metrics::data::ResourceMetrics;
6+
use prost::Message;
47

58
use super::OtlpHttpClient;
69

@@ -12,8 +15,12 @@ impl MetricsClient for OtlpHttpClient {
1215
.ok_or_else(|| "Failed to serialize metrics".to_string())
1316
};
1417

15-
self.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export")
16-
.await
18+
let response_body = self
19+
.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export")
20+
.await?;
21+
22+
handle_partial_success(&response_body, self.protocol);
23+
Ok(())
1724
}
1825

1926
fn shutdown(&self) -> OTelSdkResult {
@@ -25,3 +32,68 @@ impl MetricsClient for OtlpHttpClient {
2532
Ok(())
2633
}
2734
}
35+
36+
/// Handles partial success returned by OTLP endpoints. We log the rejected data points,
37+
/// as well as the error message returned.
38+
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
39+
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse;
40+
41+
let response: ExportMetricsServiceResponse = match protocol {
42+
#[cfg(feature = "http-json")]
43+
Protocol::HttpJson => match serde_json::from_slice(response_body) {
44+
Ok(r) => r,
45+
Err(e) => {
46+
otel_debug!(name: "HttpMetricsClient.ResponseParseError", error = e.to_string());
47+
return;
48+
}
49+
},
50+
_ => match Message::decode(response_body) {
51+
Ok(r) => r,
52+
Err(e) => {
53+
otel_debug!(name: "HttpMetricsClient.ResponseParseError", error = e.to_string());
54+
return;
55+
}
56+
},
57+
};
58+
59+
if let Some(partial_success) = response.partial_success {
60+
if partial_success.rejected_data_points > 0 || !partial_success.error_message.is_empty() {
61+
otel_warn!(
62+
name: "HttpMetricsClient.PartialSuccess",
63+
rejected_data_points = partial_success.rejected_data_points,
64+
error_message = partial_success.error_message.as_str(),
65+
);
66+
}
67+
}
68+
}
69+
70+
#[cfg(test)]
71+
mod tests {
72+
use super::*;
73+
74+
#[test]
75+
fn test_handle_invalid_protobuf() {
76+
// Corrupted/invalid protobuf data
77+
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];
78+
79+
// Should not panic - logs debug and returns early
80+
handle_partial_success(&invalid, Protocol::HttpBinary);
81+
}
82+
83+
#[test]
84+
fn test_handle_empty_response() {
85+
let empty = vec![];
86+
87+
// Should not panic
88+
handle_partial_success(&empty, Protocol::HttpBinary);
89+
}
90+
91+
#[cfg(feature = "http-json")]
92+
#[test]
93+
fn test_handle_invalid_json() {
94+
let invalid_json = b"{not valid json}";
95+
96+
// Should not panic - logs debug and returns
97+
handle_partial_success(invalid_json, Protocol::HttpJson);
98+
}
99+
}

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use super::{
55
use crate::{ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
66
use http::{HeaderName, HeaderValue, Uri};
77
use opentelemetry::otel_debug;
8-
use opentelemetry_http::HttpClient;
8+
use opentelemetry_http::{Bytes, HttpClient};
99
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
1010
#[cfg(feature = "logs")]
1111
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
@@ -381,7 +381,7 @@ impl OtlpHttpClient {
381381
data: T,
382382
build_body_fn: F,
383383
operation_name: &'static str,
384-
) -> opentelemetry_sdk::error::OTelSdkResult
384+
) -> Result<Bytes, opentelemetry_sdk::error::OTelSdkError>
385385
where
386386
F: Fn(&Self, T) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String>,
387387
{
@@ -408,7 +408,7 @@ impl OtlpHttpClient {
408408
#[cfg(not(feature = "reqwest-blocking-client"))]
409409
let runtime = opentelemetry_sdk::runtime::Tokio;
410410

411-
retry_with_backoff(
411+
let response_body = retry_with_backoff(
412412
runtime,
413413
self.retry_policy.clone(),
414414
classify_http_export_error,
@@ -424,7 +424,9 @@ impl OtlpHttpClient {
424424
},
425425
)
426426
.await
427-
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))
427+
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))?;
428+
429+
Ok(response_body)
428430
}
429431

430432
#[cfg(not(feature = "experimental-http-retry"))]
@@ -438,9 +440,12 @@ impl OtlpHttpClient {
438440
endpoint: self.collector_endpoint.to_string(),
439441
};
440442

441-
self.export_http_once(&retry_data, content_type, content_encoding, operation_name)
443+
let response_body = self
444+
.export_http_once(&retry_data, content_type, content_encoding, operation_name)
442445
.await
443-
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))
446+
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))?;
447+
448+
Ok(response_body)
444449
}
445450
}
446451

@@ -451,7 +456,7 @@ impl OtlpHttpClient {
451456
content_type: &'static str,
452457
content_encoding: Option<&'static str>,
453458
_operation_name: &'static str,
454-
) -> Result<(), HttpExportError> {
459+
) -> Result<Bytes, HttpExportError> {
455460
// Get client
456461
let client = self
457462
.client
@@ -510,7 +515,9 @@ impl OtlpHttpClient {
510515
}
511516

512517
otel_debug!(name: "HttpClient.ExportSucceeded");
513-
Ok(())
518+
519+
// Return the response, consuming the body to save a copy
520+
Ok(response.into_body())
514521
}
515522

516523
/// Compress data using gzip or zstd if the user has requested it and the relevant feature

opentelemetry-otlp/src/exporter/http/trace.rs

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
use super::OtlpHttpClient;
2+
use crate::Protocol;
3+
use opentelemetry::{otel_debug, otel_warn};
24
use opentelemetry_sdk::{
35
error::{OTelSdkError, OTelSdkResult},
46
trace::{SpanData, SpanExporter},
57
};
8+
use prost::Message;
69

710
impl SpanExporter for OtlpHttpClient {
811
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
9-
self.export_http_with_retry(
10-
batch,
11-
OtlpHttpClient::build_trace_export_body,
12-
"HttpTracesClient.Export",
13-
)
14-
.await
12+
let response_body = self
13+
.export_http_with_retry(
14+
batch,
15+
OtlpHttpClient::build_trace_export_body,
16+
"HttpTracesClient.Export",
17+
)
18+
.await?;
19+
20+
handle_partial_success(&response_body, self.protocol);
21+
Ok(())
1522
}
1623

1724
fn shutdown(&mut self) -> OTelSdkResult {
@@ -30,3 +37,68 @@ impl SpanExporter for OtlpHttpClient {
3037
self.resource = resource.into();
3138
}
3239
}
40+
41+
/// Handles partial success returned by OTLP endpoints. We log the rejected spans,
42+
/// as well as the error message returned.
43+
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
44+
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceResponse;
45+
46+
let response: ExportTraceServiceResponse = match protocol {
47+
#[cfg(feature = "http-json")]
48+
Protocol::HttpJson => match serde_json::from_slice(response_body) {
49+
Ok(r) => r,
50+
Err(e) => {
51+
otel_debug!(name: "HttpTraceClient.ResponseParseError", error = e.to_string());
52+
return;
53+
}
54+
},
55+
_ => match Message::decode(response_body) {
56+
Ok(r) => r,
57+
Err(e) => {
58+
otel_debug!(name: "HttpTraceClient.ResponseParseError", error = e.to_string());
59+
return;
60+
}
61+
},
62+
};
63+
64+
if let Some(partial_success) = response.partial_success {
65+
if partial_success.rejected_spans > 0 || !partial_success.error_message.is_empty() {
66+
otel_warn!(
67+
name: "HttpTraceClient.PartialSuccess",
68+
rejected_spans = partial_success.rejected_spans,
69+
error_message = partial_success.error_message.as_str(),
70+
);
71+
}
72+
}
73+
}
74+
75+
#[cfg(test)]
76+
mod tests {
77+
use super::*;
78+
79+
#[test]
80+
fn test_handle_invalid_protobuf() {
81+
// Corrupted/invalid protobuf data
82+
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];
83+
84+
// Should not panic - logs debug and returns early
85+
handle_partial_success(&invalid, Protocol::HttpBinary);
86+
}
87+
88+
#[test]
89+
fn test_handle_empty_response() {
90+
let empty = vec![];
91+
92+
// Should not panic
93+
handle_partial_success(&empty, Protocol::HttpBinary);
94+
}
95+
96+
#[cfg(feature = "http-json")]
97+
#[test]
98+
fn test_handle_invalid_json() {
99+
let invalid_json = b"{not valid json}";
100+
101+
// Should not panic - logs debug and returns
102+
handle_partial_success(invalid_json, Protocol::HttpJson);
103+
}
104+
}

0 commit comments

Comments
 (0)