Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## vNext

- Add partial success response handling for OTLP exporters (traces, metrics, logs) per OTLP spec. Exporters now log warnings when the server returns partial success responses with rejected items and error messages. [#865](https://github.com/open-telemetry/opentelemetry-rust/issues/865)
- Refactor `internal-logs` feature in `opentelemetry-otlp` to reduce unnecessary dependencies[3191](https://github.com/open-telemetry/opentelemetry-rust/pull/3192)

## 0.31.0
Expand Down
84 changes: 78 additions & 6 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use super::OtlpHttpClient;
use crate::Protocol;
use opentelemetry::{otel_debug, otel_warn};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use prost::Message;
use std::time;

impl LogExporter for OtlpHttpClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
self.export_http_with_retry(
batch,
OtlpHttpClient::build_logs_export_body,
"HttpLogsClient.Export",
)
.await
let response_body = self
.export_http_with_retry(
batch,
OtlpHttpClient::build_logs_export_body,
"HttpLogsClient.Export",
)
.await?;

handle_partial_success(&response_body, self.protocol);
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
Expand All @@ -29,3 +36,68 @@ impl LogExporter for OtlpHttpClient {
self.resource = resource.into();
}
}

/// Handles partial success returned by OTLP endpoints. We log the rejected log records,
/// as well as the error message returned.
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceResponse;

let response: ExportLogsServiceResponse = match protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::from_slice(response_body) {
Ok(r) => r,
Err(e) => {
otel_debug!(name: "HttpLogsClient.ResponseParseError", error = e.to_string());
return;
}
},
_ => match Message::decode(response_body) {
Ok(r) => r,
Err(e) => {
otel_debug!(name: "HttpLogsClient.ResponseParseError", error = e.to_string());
return;
}
},
};

if let Some(partial_success) = response.partial_success {
if partial_success.rejected_log_records > 0 || !partial_success.error_message.is_empty() {
otel_warn!(
name: "HttpLogsClient.PartialSuccess",
rejected_log_records = partial_success.rejected_log_records,
error_message = partial_success.error_message.as_str(),
);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_handle_invalid_protobuf() {
// Corrupted/invalid protobuf data
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];

// Should not panic - logs debug and returns early
handle_partial_success(&invalid, Protocol::HttpBinary);
}

#[test]
fn test_handle_empty_response() {
let empty = vec![];

// Should not panic
handle_partial_success(&empty, Protocol::HttpBinary);
}

#[cfg(feature = "http-json")]
#[test]
fn test_handle_invalid_json() {
let invalid_json = b"{not valid json}";

// Should not panic - logs debug and returns
handle_partial_success(invalid_json, Protocol::HttpJson);
}
}
76 changes: 74 additions & 2 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::metric::MetricsClient;
use crate::Protocol;
use opentelemetry::{otel_debug, otel_warn};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use prost::Message;

use super::OtlpHttpClient;

Expand All @@ -12,8 +15,12 @@ impl MetricsClient for OtlpHttpClient {
.ok_or_else(|| "Failed to serialize metrics".to_string())
};

self.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export")
.await
let response_body = self
.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export")
.await?;

handle_partial_success(&response_body, self.protocol);
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
Expand All @@ -25,3 +32,68 @@ impl MetricsClient for OtlpHttpClient {
Ok(())
}
}

/// Handles partial success returned by OTLP endpoints. We log the rejected data points,
/// as well as the error message returned.
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse;

let response: ExportMetricsServiceResponse = match protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::from_slice(response_body) {
Ok(r) => r,
Err(e) => {
otel_debug!(name: "HttpMetricsClient.ResponseParseError", error = e.to_string());
return;
}
},
_ => match Message::decode(response_body) {
Ok(r) => r,
Err(e) => {
otel_debug!(name: "HttpMetricsClient.ResponseParseError", error = e.to_string());
return;
}
},
};

if let Some(partial_success) = response.partial_success {
if partial_success.rejected_data_points > 0 || !partial_success.error_message.is_empty() {
otel_warn!(
name: "HttpMetricsClient.PartialSuccess",
rejected_data_points = partial_success.rejected_data_points,
error_message = partial_success.error_message.as_str(),
);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_handle_invalid_protobuf() {
// Corrupted/invalid protobuf data
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];

// Should not panic - logs debug and returns early
handle_partial_success(&invalid, Protocol::HttpBinary);
}

#[test]
fn test_handle_empty_response() {
let empty = vec![];

// Should not panic
handle_partial_success(&empty, Protocol::HttpBinary);
}

#[cfg(feature = "http-json")]
#[test]
fn test_handle_invalid_json() {
let invalid_json = b"{not valid json}";

// Should not panic - logs debug and returns
handle_partial_success(invalid_json, Protocol::HttpJson);
}
}
23 changes: 15 additions & 8 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
use crate::{ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry::otel_debug;
use opentelemetry_http::HttpClient;
use opentelemetry_http::{Bytes, HttpClient};
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
Expand Down Expand Up @@ -381,7 +381,7 @@ impl OtlpHttpClient {
data: T,
build_body_fn: F,
operation_name: &'static str,
) -> opentelemetry_sdk::error::OTelSdkResult
) -> Result<Bytes, opentelemetry_sdk::error::OTelSdkError>
where
F: Fn(&Self, T) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String>,
{
Expand All @@ -408,7 +408,7 @@ impl OtlpHttpClient {
#[cfg(not(feature = "reqwest-blocking-client"))]
let runtime = opentelemetry_sdk::runtime::Tokio;

retry_with_backoff(
let response_body = retry_with_backoff(
runtime,
self.retry_policy.clone(),
classify_http_export_error,
Expand All @@ -424,7 +424,9 @@ impl OtlpHttpClient {
},
)
.await
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))?;

Ok(response_body)
}

#[cfg(not(feature = "experimental-http-retry"))]
Expand All @@ -438,9 +440,12 @@ impl OtlpHttpClient {
endpoint: self.collector_endpoint.to_string(),
};

self.export_http_once(&retry_data, content_type, content_encoding, operation_name)
let response_body = self
.export_http_once(&retry_data, content_type, content_encoding, operation_name)
.await
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))
.map_err(|e| opentelemetry_sdk::error::OTelSdkError::InternalFailure(e.message))?;

Ok(response_body)
}
}

Expand All @@ -451,7 +456,7 @@ impl OtlpHttpClient {
content_type: &'static str,
content_encoding: Option<&'static str>,
_operation_name: &'static str,
) -> Result<(), HttpExportError> {
) -> Result<Bytes, HttpExportError> {
// Get client
let client = self
.client
Expand Down Expand Up @@ -510,7 +515,9 @@ impl OtlpHttpClient {
}

otel_debug!(name: "HttpClient.ExportSucceeded");
Ok(())

// Return the response, consuming the body to save a copy
Ok(response.into_body())
}

/// Compress data using gzip or zstd if the user has requested it and the relevant feature
Expand Down
84 changes: 78 additions & 6 deletions opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use super::OtlpHttpClient;
use crate::Protocol;
use opentelemetry::{otel_debug, otel_warn};
use opentelemetry_sdk::{
error::{OTelSdkError, OTelSdkResult},
trace::{SpanData, SpanExporter},
};
use prost::Message;

impl SpanExporter for OtlpHttpClient {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
self.export_http_with_retry(
batch,
OtlpHttpClient::build_trace_export_body,
"HttpTracesClient.Export",
)
.await
let response_body = self
.export_http_with_retry(
batch,
OtlpHttpClient::build_trace_export_body,
"HttpTracesClient.Export",
)
.await?;

handle_partial_success(&response_body, self.protocol);
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
Expand All @@ -30,3 +37,68 @@ impl SpanExporter for OtlpHttpClient {
self.resource = resource.into();
}
}

/// Handles partial success returned by OTLP endpoints. We log the rejected spans,
/// as well as the error message returned.
fn handle_partial_success(response_body: &[u8], protocol: Protocol) {
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceResponse;

let response: ExportTraceServiceResponse = match protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::from_slice(response_body) {
Ok(r) => r,
Err(e) => {
otel_debug!(name: "HttpTraceClient.ResponseParseError", error = e.to_string());
return;
}
},
_ => match Message::decode(response_body) {
Ok(r) => r,
Err(e) => {
otel_debug!(name: "HttpTraceClient.ResponseParseError", error = e.to_string());
return;
}
},
};

if let Some(partial_success) = response.partial_success {
if partial_success.rejected_spans > 0 || !partial_success.error_message.is_empty() {
otel_warn!(
name: "HttpTraceClient.PartialSuccess",
rejected_spans = partial_success.rejected_spans,
error_message = partial_success.error_message.as_str(),
);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_handle_invalid_protobuf() {
// Corrupted/invalid protobuf data
let invalid = vec![0xFF, 0xFF, 0xFF, 0xFF];

// Should not panic - logs debug and returns early
handle_partial_success(&invalid, Protocol::HttpBinary);
}

#[test]
fn test_handle_empty_response() {
let empty = vec![];

// Should not panic
handle_partial_success(&empty, Protocol::HttpBinary);
}

#[cfg(feature = "http-json")]
#[test]
fn test_handle_invalid_json() {
let invalid_json = b"{not valid json}";

// Should not panic - logs debug and returns
handle_partial_success(invalid_json, Protocol::HttpJson);
}
}
Loading