diff --git a/rust/lit-actions/Cargo.lock b/rust/lit-actions/Cargo.lock index a0d341fe..e33ddf37 100644 --- a/rust/lit-actions/Cargo.lock +++ b/rust/lit-actions/Cargo.lock @@ -1596,6 +1596,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.9.0" @@ -2469,7 +2483,7 @@ dependencies = [ "async-trait", "base32", "boxed_error", - "dashmap", + "dashmap 5.5.3", "deno_cache_dir", "deno_config", "deno_error", @@ -5296,6 +5310,7 @@ dependencies = [ name = "lit-observability" version = "0.1.0" dependencies = [ + "dashmap 6.1.0", "derive_more", "flume", "hyper-util", @@ -5304,7 +5319,6 @@ dependencies = [ "lit-logging", "nu-ansi-term", "opentelemetry 0.24.0", - "opentelemetry-appender-tracing", "opentelemetry-otlp 0.17.0", "opentelemetry-semantic-conventions 0.15.0", "opentelemetry_sdk 0.24.1", @@ -5707,7 +5721,7 @@ dependencies = [ "anyhow", "async-trait", "boxed_error", - "dashmap", + "dashmap 5.5.3", "deno_config", "deno_error", "deno_media_type", @@ -5981,18 +5995,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-appender-tracing" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" -dependencies = [ - "opentelemetry 0.24.0", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "opentelemetry-http" version = "0.27.0" @@ -8393,7 +8395,7 @@ checksum = "83406221c501860fce9c27444f44125eafe9e598b8b81be7563d7036784cd05c" dependencies = [ "ahash", "anyhow", - "dashmap", + "dashmap 5.5.3", "once_cell", "regex", "serde", @@ -8613,7 +8615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76c76d8b9792ce51401d38da0fa62158d61f6d80d16d68fe5b03ce4bf5fba383" dependencies = [ "base64 0.21.7", - "dashmap", + "dashmap 5.5.3", "indexmap 2.11.0", "once_cell", "serde", diff --git a/rust/lit-core/Cargo.lock b/rust/lit-core/Cargo.lock index 1af36fa9..7b72d645 100644 --- a/rust/lit-core/Cargo.lock +++ b/rust/lit-core/Cargo.lock @@ -5996,6 +5996,7 @@ dependencies = [ name = "lit-observability" version = "0.1.0" dependencies = [ + "dashmap", "derive_more 2.0.1", "flume", "hyper-util", @@ -6004,7 +6005,6 @@ dependencies = [ "lit-logging", "nu-ansi-term", "opentelemetry", - "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", @@ -6914,18 +6914,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "opentelemetry-appender-tracing" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" -dependencies = [ - "opentelemetry", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "opentelemetry-otlp" version = "0.17.0" diff --git a/rust/lit-core/lit-api-core/src/context/mod.rs b/rust/lit-core/lit-api-core/src/context/mod.rs index 77a79683..e1a368d5 100644 --- a/rust/lit-core/lit-api-core/src/context/mod.rs +++ b/rust/lit-core/lit-api-core/src/context/mod.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::fmt; use std::future::Future; -use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator}; -use opentelemetry_sdk::propagation::TraceContextPropagator; +use lit_observability::logging::set_request_context; +use opentelemetry::propagation::Injector; use rocket::Request; use rocket::request::{FromRequest, Outcome}; use semver::Version; @@ -11,9 +11,6 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use tokio::task::futures::TaskLocalFuture; use tokio::task_local; -use tracing::{Span, info_span}; -use tracing_opentelemetry::OpenTelemetrySpanExt; -use uuid::Uuid; use crate::error::{EC, Error, Result, conversion_err_code, validation_err_code}; @@ -27,77 +24,6 @@ task_local! { pub static TRACING: Box; } -/// The TracingSpan request guard creates a new tracing span for the request. If the request -/// contains a parent span ID, it will be used as the parent of this new span. Otherwise, a new -/// root span will be created. -#[allow(dead_code)] -#[derive(Clone, Debug)] -pub struct TracingSpan { - span: Span, -} - -impl TracingSpan { - pub fn new(span: Span) -> Self { - Self { span } - } - - pub fn span(&self) -> &Span { - &self.span - } -} - -#[rocket::async_trait] -impl<'r> FromRequest<'r> for TracingSpan { - type Error = crate::error::Error; - - async fn from_request( - req: &'r rocket::Request<'_>, - ) -> rocket::request::Outcome { - // Extract the propagated context - let propagator = TraceContextPropagator::new(); - // Initialize some container to hold the header information. - let mut carrier = HashMap::new(); - // Transfer header information from request to carrier. - for header in req.headers().iter() { - carrier.insert(header.name().to_string(), header.value().to_string()); - } - // Extract the context from the carrier - let context = propagator.extract(&HeaderExtractor::from(&carrier)); - - // Initialize a new span with the propagated context as the parent - let req_method = req.method(); - let req_path = req.uri().path(); - let new_span = info_span!( - "handle_request", - method = req_method.as_str(), - path = req_path.to_string(), - ); - new_span.set_parent(context); - - Outcome::Success(TracingSpan { span: new_span }) - } -} - -pub struct HeaderExtractor<'a> { - headers: &'a HashMap, -} - -impl<'a> From<&'a HashMap> for HeaderExtractor<'a> { - fn from(headers: &'a HashMap) -> Self { - HeaderExtractor { headers } - } -} - -impl<'a> Extractor for HeaderExtractor<'a> { - fn get(&self, key: &str) -> Option<&'a str> { - self.headers.get(key).map(|v| v.as_str()) - } - - fn keys(&self) -> Vec<&str> { - self.headers.keys().map(|v| v.as_str()).collect() - } -} - pub struct HeaderInjector<'a> { headers: &'a mut HashMap, } @@ -167,10 +93,13 @@ impl<'r> FromRequest<'r> for Tracing { type Error = crate::error::Error; async fn from_request(req: &'r Request<'_>) -> Outcome { - let correlation_id = - extract_correlation_id(req).unwrap_or_else(|| format!("LD-{}", Uuid::new_v4())); + let (request_id, correlation_id) = extract_request_and_correlation_ids(req); + + // Set request context for log injection; no fallback IDs. + set_request_context(request_id, correlation_id.clone()); - let mut tracing = Self::new(correlation_id); + // For the Tracing struct, use empty string if no correlation_id was provided. + let mut tracing = Self::new(correlation_id.unwrap_or_default()); apply_req_tracing_fields(req, &mut tracing); Outcome::Success(tracing) @@ -227,7 +156,16 @@ impl<'r> FromRequest<'r> for TracingRequired { type Error = crate::error::Error; async fn from_request(req: &'r Request<'_>) -> Outcome { - if let Some(correlation_id) = extract_correlation_id(req) { + let (request_id, correlation_id) = extract_request_and_correlation_ids(req); + + // TracingRequired requires at least one header + if let Some(correlation_id) = correlation_id { + // Preserve distinct values when both headers are present + let request_id = request_id.unwrap_or_else(|| correlation_id.clone()); + + // Set request context (span extensions + OTel attributes) for consistency + set_request_context(Some(request_id), Some(correlation_id.clone())); + let mut tracing = Self::new(correlation_id); apply_req_tracing_fields(req, &mut tracing); @@ -257,12 +195,23 @@ where TRACING.scope(Box::new(tracing), f) } -pub(crate) fn extract_correlation_id(req: &Request<'_>) -> Option { - req.headers() - .get(HEADER_KEY_X_CORRELATION_ID) - .next() - .or_else(|| req.headers().get(HEADER_KEY_X_REQUEST_ID).next()) - .map(|val| val.to_string()) +/// Extracts both request_id and correlation_id from headers, preserving distinct values. +/// Returns (request_id, correlation_id) tuple. +/// - request_id: X-Request-Id header, falls back to X-Correlation-Id +/// - correlation_id: X-Correlation-Id header, falls back to X-Request-Id +pub(crate) fn extract_request_and_correlation_ids( + req: &Request<'_>, +) -> (Option, Option) { + let x_request_id = req.headers().get(HEADER_KEY_X_REQUEST_ID).next().map(|v| v.to_string()); + let x_correlation_id = + req.headers().get(HEADER_KEY_X_CORRELATION_ID).next().map(|v| v.to_string()); + + // request_id: prefer X-Request-Id, fall back to X-Correlation-Id + let request_id = x_request_id.clone().or_else(|| x_correlation_id.clone()); + // correlation_id: prefer X-Correlation-Id, fall back to X-Request-Id + let correlation_id = x_correlation_id.or(x_request_id); + + (request_id, correlation_id) } pub(crate) fn apply_req_tracing_fields(req: &Request<'_>, tracing: &mut (impl Tracer + 'static)) { diff --git a/rust/lit-core/lit-api-core/src/server/hyper/handler/router.rs b/rust/lit-core/lit-api-core/src/server/hyper/handler/router.rs index c687a2a5..7569a639 100644 --- a/rust/lit-core/lit-api-core/src/server/hyper/handler/router.rs +++ b/rust/lit-core/lit-api-core/src/server/hyper/handler/router.rs @@ -10,7 +10,6 @@ use hyper::body::Bytes; use hyper::http::HeaderValue; use hyper::{HeaderMap, Method, Request as HyperRequest, Response as HyperResponse}; use tracing::debug; -use uuid::Uuid; use crate::context::{HEADER_KEY_X_CORRELATION_ID, HEADER_KEY_X_REQUEST_ID, Tracing, with_context}; @@ -129,13 +128,10 @@ impl Default for Router { } } -// Get Tracing from request headers. +// Get Tracing from request headers; no fallback ID generation. fn get_tracing_from_request_header(headers: HeaderMap) -> Tracing { - if let Some(correlation_id) = extract_correlation_id(headers) { - Tracing::new(correlation_id) - } else { - Tracing::new(Uuid::new_v4().simple().to_string()) - } + let correlation_id = extract_correlation_id(headers).unwrap_or_default(); + Tracing::new(correlation_id) } fn extract_correlation_id(headers: HeaderMap) -> Option { diff --git a/rust/lit-core/lit-observability/Cargo.toml b/rust/lit-core/lit-observability/Cargo.toml index 09489b17..428a4929 100644 --- a/rust/lit-core/lit-observability/Cargo.toml +++ b/rust/lit-core/lit-observability/Cargo.toml @@ -14,12 +14,12 @@ proxy-collector = [] channels = ["dep:flume"] [dependencies] +dashmap = "6" derive_more.workspace = true flume = { version = "0.11", optional = true } hyper-util.workspace = true nu-ansi-term = { version = "0.50.1" } opentelemetry.workspace = true -opentelemetry-appender-tracing = { version = "0.5.0", default-features = false } opentelemetry-otlp = { workspace = true, features = ["logs"] } opentelemetry-semantic-conventions.workspace = true opentelemetry_sdk = { workspace = true, features = ["logs"] } diff --git a/rust/lit-core/lit-observability/src/lib.rs b/rust/lit-core/lit-observability/src/lib.rs index 77e87833..3b816d6d 100644 --- a/rust/lit-core/lit-observability/src/lib.rs +++ b/rust/lit-core/lit-observability/src/lib.rs @@ -3,11 +3,10 @@ use std::str::FromStr; pub use config::LitObservabilityConfig; use error::unexpected_err; use lit_core::config::LitConfig; -use logging::init_logger_provider; +use logging::{ContextAwareOtelLogLayer, CustomEventFormatter, init_logger_provider}; use metrics::init_metrics_provider; use net::init_tonic_exporter_builder; use opentelemetry::trace::TracerProvider; -use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::metrics::SdkMeterProvider; @@ -80,8 +79,7 @@ pub async fn create_providers( }; let logger_provider = init_logger_provider(tonic_exporter_builder, resource.clone())?; - // Create a new OpenTelemetryTracingBridge using the above LoggerProvider. - let tracing_bridge_layer = OpenTelemetryTracingBridge::new(&logger_provider); + let context_aware_log_layer = ContextAwareOtelLogLayer::new(&logger_provider); // Add a tracing filter to filter events from crates used by opentelemetry-otlp. // The filter levels are set as follows: @@ -101,10 +99,12 @@ pub async fn create_providers( .add_directive("h2=error".parse().unwrap()) .add_directive("reqwest=error".parse().unwrap()); + let custom_formatter = CustomEventFormatter::default(); + let sub = tracing_subscriber::registry() .with(level_filter) - .with(fmt::layer()) - .with(tracing_bridge_layer) + .with(fmt::layer().event_format(custom_formatter)) + .with(context_aware_log_layer) .with(MetricsLayer::new(meter_provider.clone())) .with(OpenTelemetryLayer::new(tracer)); diff --git a/rust/lit-core/lit-observability/src/logging/context_layer.rs b/rust/lit-core/lit-observability/src/logging/context_layer.rs new file mode 100644 index 00000000..f0987397 --- /dev/null +++ b/rust/lit-core/lit-observability/src/logging/context_layer.rs @@ -0,0 +1,540 @@ +//! Context-aware OpenTelemetry log layer. +//! Injects request_id/correlation_id into OTLP logs. +//! Resolution order: span extensions, then task-local context keyed by tokio task ID. + +use std::any::TypeId; +use std::borrow::Cow; +use std::marker::PhantomData; +use std::sync::LazyLock; + +use dashmap::DashMap; + +use opentelemetry::Key; +use opentelemetry::logs::{AnyValue, LogRecord as _, Logger, LoggerProvider as _, Severity}; +use opentelemetry::trace::TraceContextExt; +use opentelemetry_sdk::logs::LoggerProvider; +use tracing::span::{Attributes, Id, Record}; +use tracing::{Dispatch, Event, Span, Subscriber}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_subscriber::Layer; +use tracing_subscriber::layer::Context; +use tracing_subscriber::registry::LookupSpan; + +const INSTRUMENTATION_LIBRARY_NAME: &str = "lit-observability"; + +// Task-local fallback keyed by tokio task ID; cleared at request boundaries. +// Uses DashMap for sharded locking to reduce contention under high concurrency. +static TASK_CONTEXTS: LazyLock> = + LazyLock::new(DashMap::new); + +/// Request context propagated to all log events within a span hierarchy. +#[derive(Clone, Debug, Default)] +pub struct RequestContext { + pub request_id: Option, + pub correlation_id: Option, +} + +impl RequestContext { + pub fn new(request_id: Option, correlation_id: Option) -> Self { + Self { request_id, correlation_id } + } + + pub fn has_context(&self) -> bool { + self.request_id.is_some() || self.correlation_id.is_some() + } +} + +/// Helper for setting request context via `downcast_raw`. +pub(crate) struct WithRequestContext(fn(dispatch: &Dispatch, id: &Id, ctx: &RequestContext)); + +impl WithRequestContext { + pub(crate) fn set_context(&self, dispatch: &Dispatch, id: &Id, ctx: &RequestContext) { + (self.0)(dispatch, id, ctx) + } +} + +/// Tracing layer that converts events to OpenTelemetry LogRecords with request context injection. +pub struct ContextAwareOtelLogLayer { + logger: opentelemetry_sdk::logs::Logger, + with_context: WithRequestContext, + get_context: GetRequestContext, + _subscriber: PhantomData, +} + +impl ContextAwareOtelLogLayer +where + S: Subscriber + for<'lookup> LookupSpan<'lookup>, +{ + pub fn new(provider: &LoggerProvider) -> Self { + Self { + logger: provider + .logger_builder(INSTRUMENTATION_LIBRARY_NAME) + .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION"))) + .build(), + with_context: WithRequestContext(Self::set_context_impl), + get_context: GetRequestContext(Self::get_context_impl), + _subscriber: PhantomData, + } + } + + fn set_context_impl(dispatch: &Dispatch, id: &Id, ctx: &RequestContext) { + if let Some(subscriber) = dispatch.downcast_ref::() { + if let Some(span) = subscriber.span(id) { + span.extensions_mut().insert(ctx.clone()); + } + } + } + + fn get_context_impl(dispatch: &Dispatch, id: &Id) -> Option { + let subscriber = dispatch.downcast_ref::()?; + let span = subscriber.span(id)?; + + // Walk the span hierarchy (scope() includes current span first, then ancestors) + // This allows child spans to find context set on parent spans + for ancestor in span.scope() { + if let Some(ctx) = ancestor.extensions().get::() { + if ctx.has_context() { + return Some(ctx.clone()); + } + } + } + None + } + + fn resolve_request_context( + &self, ctx: &Context<'_, S>, event: &Event<'_>, + ) -> Option { + // Priority 1: Walk span ancestry to find request context in extensions. + if let Some(scope) = ctx.event_scope(event) { + for span in scope { + if let Some(request_ctx) = span.extensions().get::() { + if request_ctx.has_context() { + return Some(request_ctx.clone()); + } + } + } + } + + // Priority 2: Fall back to task-local context (async-safe). + get_task_request_context() + } +} + +impl Layer for ContextAwareOtelLogLayer +where + S: Subscriber + for<'lookup> LookupSpan<'lookup>, +{ + fn on_new_span(&self, _attrs: &Attributes<'_>, _id: &Id, _ctx: Context<'_, S>) {} + + fn on_record(&self, _span: &Id, _values: &Record<'_>, _ctx: Context<'_, S>) {} + + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let mut log_record = self.logger.create_log_record(); + + // Inject trace context from current OTel span for log-trace correlation. + // Note: Uses Context::current(), so logs with explicit `parent:` spans may correlate incorrectly. + let otel_ctx = opentelemetry::Context::current(); + if otel_ctx.has_active_span() { + let otel_span = otel_ctx.span(); + let span_context = otel_span.span_context(); + if span_context.is_valid() { + log_record.trace_context = Some(span_context.into()); + } + } + + let severity = match *event.metadata().level() { + tracing::Level::TRACE => Severity::Trace, + tracing::Level::DEBUG => Severity::Debug, + tracing::Level::INFO => Severity::Info, + tracing::Level::WARN => Severity::Warn, + tracing::Level::ERROR => Severity::Error, + }; + log_record.set_severity_number(severity); + log_record.set_severity_text(event.metadata().level().to_string().into()); + log_record.set_target(event.metadata().target().to_string()); + log_record.set_event_name(event.metadata().name()); + + let mut visitor = EventVisitor::new(&mut log_record); + event.record(&mut visitor); + let context_fields = visitor.into_recorded_context_fields(); + + if !context_fields.has_request_id || !context_fields.has_correlation_id { + if let Some(request_ctx) = self.resolve_request_context(&ctx, event) { + // Only add attributes not already present from event fields + if !context_fields.has_request_id { + if let Some(ref request_id) = request_ctx.request_id { + log_record.add_attribute( + Key::new("request_id"), + AnyValue::from(request_id.clone()), + ); + } + } + if !context_fields.has_correlation_id { + if let Some(ref correlation_id) = request_ctx.correlation_id { + log_record.add_attribute( + Key::new("correlation_id"), + AnyValue::from(correlation_id.clone()), + ); + } + } + } + } + + self.logger.emit(log_record); + } + + /// # Safety + /// + /// This implements the `downcast_raw` method required by `tracing_subscriber::Layer` + /// to enable type-safe access to this layer's helper types via `Dispatch::downcast_ref`. + /// + /// Safety invariants upheld: + /// - All returned pointers point to fields owned by `self` (`with_context`, `get_context`) + /// - Pointers remain valid for the `&self` lifetime (layer lifetime matches subscriber) + /// - The tracing-subscriber machinery guarantees pointers aren't stored beyond the call + /// - This follows the standard pattern from `tracing-subscriber` and `tracing-opentelemetry` + unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> { + match id { + id if id == TypeId::of::() => Some(self as *const _ as *const ()), + id if id == TypeId::of::() => { + Some(&self.with_context as *const _ as *const ()) + } + id if id == TypeId::of::() => { + Some(&self.get_context as *const _ as *const ()) + } + _ => None, + } + } +} + +#[derive(Default)] +struct RecordedContextFields { + has_request_id: bool, + has_correlation_id: bool, +} + +/// Extracts tracing event fields into a LogRecord, preserving native types. +struct EventVisitor<'a, LR: opentelemetry::logs::LogRecord> { + log_record: &'a mut LR, + context_fields: RecordedContextFields, +} + +impl<'a, LR: opentelemetry::logs::LogRecord> EventVisitor<'a, LR> { + fn new(log_record: &'a mut LR) -> Self { + Self { log_record, context_fields: RecordedContextFields::default() } + } + + fn into_recorded_context_fields(self) -> RecordedContextFields { + self.context_fields + } + + #[inline] + fn track_context_field(&mut self, field_name: &str) { + match field_name { + "request_id" => self.context_fields.has_request_id = true, + "correlation_id" => self.context_fields.has_correlation_id = true, + _ => {} + } + } +} + +impl tracing::field::Visit for EventVisitor<'_, LR> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.track_context_field(field.name()); + if field.name() == "message" { + self.log_record.set_body(AnyValue::from(format!("{:?}", value))); + } else { + self.log_record + .add_attribute(Key::new(field.name()), AnyValue::from(format!("{:?}", value))); + } + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.track_context_field(field.name()); + if field.name() == "message" { + self.log_record.set_body(AnyValue::from(value.to_owned())); + } else { + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value.to_owned())); + } + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.track_context_field(field.name()); + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value)); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.track_context_field(field.name()); + // OTel AnyValue lacks u64; use i64 if in range, else string + if value <= i64::MAX as u64 { + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value as i64)); + } else { + self.log_record + .add_attribute(Key::new(field.name()), AnyValue::from(value.to_string())); + } + } + + fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { + self.track_context_field(field.name()); + if value >= i64::MIN as i128 && value <= i64::MAX as i128 { + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value as i64)); + } else { + self.log_record + .add_attribute(Key::new(field.name()), AnyValue::from(value.to_string())); + } + } + + fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { + self.track_context_field(field.name()); + if value <= i64::MAX as u128 { + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value as i64)); + } else { + self.log_record + .add_attribute(Key::new(field.name()), AnyValue::from(value.to_string())); + } + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.track_context_field(field.name()); + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value)); + } + + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.track_context_field(field.name()); + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value)); + } + + fn record_error( + &mut self, field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), + ) { + self.track_context_field(field.name()); + self.log_record.add_attribute(Key::new(field.name()), AnyValue::from(value.to_string())); + } +} + +/// Stores request context on the current span and task-local fallback; sets OTel attributes. +/// No-op when both IDs are `None`. +pub fn set_request_context(request_id: Option, correlation_id: Option) { + let request_ctx = RequestContext::new(request_id.clone(), correlation_id.clone()); + if !request_ctx.has_context() { + return; + } + + // Task-local fallback for when spans aren't connected (async-safe) + set_task_request_context(request_ctx.clone()); + + let span = Span::current(); + + // OTel span attributes for trace correlation + if let Some(ref req_id) = request_id { + span.set_attribute("request_id", req_id.clone()); + } + if let Some(ref corr_id) = correlation_id { + span.set_attribute("correlation_id", corr_id.clone()); + } + + // Span extensions for log injection + span.with_subscriber(|(id, dispatch)| { + if let Some(with_ctx) = dispatch.downcast_ref::() { + with_ctx.set_context(dispatch, id, &request_ctx); + } + }); +} + +/// Sets request context in task-local storage (async-safe fallback). +fn set_task_request_context(ctx: RequestContext) { + if let Some(task_id) = current_task_id() { + TASK_CONTEXTS.insert(task_id, ctx); + } +} + +/// Gets request context from task-local storage. +pub(crate) fn get_task_request_context() -> Option { + let task_id = current_task_id()?; + TASK_CONTEXTS.get(&task_id).map(|entry| entry.value().clone()).filter(|ctx| ctx.has_context()) +} + +/// Clears task-local request context at request boundaries. +pub fn clear_task_request_context() { + if let Some(task_id) = current_task_id() { + TASK_CONTEXTS.remove(&task_id); + } +} + +/// Returns the current tokio task ID, or None if not in a tokio runtime. +#[inline] +fn current_task_id() -> Option { + tokio::task::try_id() +} + +/// Helper for getting request context via `downcast_raw`. +pub(crate) struct GetRequestContext(fn(dispatch: &Dispatch, id: &Id) -> Option); + +impl GetRequestContext { + pub(crate) fn get_context(&self, dispatch: &Dispatch, id: &Id) -> Option { + (self.0)(dispatch, id) + } +} + +/// Retrieves request context from span hierarchy, then task-local fallback. +pub fn get_request_context() -> Option { + // Try span hierarchy first + let mut result = None; + Span::current().with_subscriber(|(id, dispatch)| { + if let Some(get_ctx) = dispatch.downcast_ref::() { + result = get_ctx.get_context(dispatch, id); + } + }); + + if result.as_ref().is_some_and(|ctx| ctx.has_context()) { + return result; + } + + // Fall back to task-local storage + get_task_request_context() +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry_sdk::Resource; + use opentelemetry_sdk::logs::LoggerProvider; + use tracing_subscriber::Registry; + use tracing_subscriber::layer::SubscriberExt; + + fn with_test_subscriber(f: F) + where + F: FnOnce(), + { + let provider = LoggerProvider::builder().with_resource(Resource::empty()).build(); + let layer = ContextAwareOtelLogLayer::new(&provider); + let subscriber = Registry::default().with(layer); + + tracing::subscriber::with_default(subscriber, f); + } + + #[test] + fn test_set_request_context_noop_when_empty() { + with_test_subscriber(|| { + let span = tracing::info_span!("test_span"); + let _guard = span.enter(); + set_request_context(None, None); + assert!(get_request_context().is_none()); + }); + } + + #[test] + fn test_get_request_context_returns_stored_values() { + with_test_subscriber(|| { + let span = tracing::info_span!("test_span"); + let _guard = span.enter(); + + let initial = get_request_context(); + assert!(initial.is_none() || !initial.as_ref().map_or(false, |c| c.has_context())); + + let expected_req_id = "test-req-id-12345".to_string(); + let expected_corr_id = "test-corr-id-67890".to_string(); + set_request_context(Some(expected_req_id.clone()), Some(expected_corr_id.clone())); + + let retrieved = get_request_context(); + assert!(retrieved.is_some()); + let ctx = retrieved.expect("context should exist"); + assert_eq!(ctx.request_id, Some(expected_req_id)); + assert_eq!(ctx.correlation_id, Some(expected_corr_id)); + }); + } + + #[test] + fn test_get_request_context_inherits_parent_context() { + with_test_subscriber(|| { + let parent_span = tracing::info_span!("parent"); + let _parent_guard = parent_span.enter(); + set_request_context( + Some("parent-req-id".to_string()), + Some("parent-corr-id".to_string()), + ); + + let child_span = tracing::info_span!("child"); + let _child_guard = child_span.enter(); + + let child_ctx = get_request_context(); + assert!(child_ctx.is_some()); + assert_eq!( + child_ctx.as_ref().and_then(|c| c.request_id.as_ref()), + Some(&"parent-req-id".to_string()) + ); + assert_eq!( + child_ctx.as_ref().and_then(|c| c.correlation_id.as_ref()), + Some(&"parent-corr-id".to_string()) + ); + }); + } + + #[test] + fn test_context_available_in_sibling_span_via_span_extensions() { + // Context should not cross sibling spans. + with_test_subscriber(|| { + let span_a = tracing::info_span!("span_a"); + { + let _guard = span_a.enter(); + set_request_context( + Some("span-a-req-id".to_string()), + Some("span-a-corr-id".to_string()), + ); + + let ctx = get_request_context(); + assert!(ctx.is_some()); + assert_eq!(ctx.as_ref().unwrap().request_id, Some("span-a-req-id".to_string())); + } + + let span_b = tracing::info_span!("span_b"); + let _guard = span_b.enter(); + let ctx = get_request_context(); + assert!(ctx.is_none() || !ctx.as_ref().unwrap().has_context()); + }); + } + + #[test] + fn test_context_cleaned_up_with_span() { + // Context should not leak after leaving a span. + with_test_subscriber(|| { + { + let span = tracing::info_span!("scoped_span"); + let _guard = span.enter(); + set_request_context( + Some("scoped-req-id".to_string()), + Some("scoped-corr-id".to_string()), + ); + + let ctx = get_request_context(); + assert!(ctx.is_some()); + } + + let new_span = tracing::info_span!("new_span"); + let _guard = new_span.enter(); + let ctx = get_request_context(); + assert!(ctx.is_none() || !ctx.as_ref().unwrap().has_context()); + }); + } + + #[test] + fn test_log_emission_with_context_does_not_panic() { + with_test_subscriber(|| { + let span = tracing::info_span!("test_span"); + let _guard = span.enter(); + + set_request_context( + Some("test-req-id-12345".to_string()), + Some("test-corr-id-67890".to_string()), + ); + + tracing::trace!("Trace level log"); + tracing::debug!("Debug level log"); + tracing::info!("Info level log"); + tracing::warn!("Warn level log"); + tracing::error!("Error level log"); + + tracing::info!(custom_field = "custom_value", numeric_field = 42, "Log with fields"); + }); + } +} diff --git a/rust/lit-core/lit-observability/src/logging/event_format.rs b/rust/lit-core/lit-observability/src/logging/event_format.rs index e79adb3c..0120dd83 100644 --- a/rust/lit-core/lit-observability/src/logging/event_format.rs +++ b/rust/lit-core/lit-observability/src/logging/event_format.rs @@ -3,6 +3,7 @@ //! The `CustomEventFormatter` adds additional logic to customize the formatting of log messages, such as: //! - Optionally omitting the event / span scopes //! - Optionally adding a prefix string to each log message +//! - Displaying request_id and correlation_id from span extensions for request tracing //! //! This mod should mostly be used in development and testing environments. @@ -27,9 +28,12 @@ use tracing_subscriber::{ registry::LookupSpan, }; +use super::context_layer::RequestContext; + /// A variant of the default formatter `tracing_subscriber::fmt::format::Format` that adds additional logic to customize the formatting of log messages, such as: /// - Optionally omitting the event / span scopes /// - Optionally adding a prefix string to each log message +/// - Displaying request_id and correlation_id from span extensions /// /// This struct is mostly used in development and testing environments. #[derive(Debug, Clone)] @@ -44,6 +48,7 @@ pub struct CustomEventFormatter { pub(crate) display_filename: bool, pub(crate) display_line_number: bool, pub(crate) display_event_scope: bool, + pub(crate) display_request_context: bool, pub(crate) prefix_string: Option, } @@ -60,6 +65,7 @@ impl Default for CustomEventFormatter { display_filename: false, display_line_number: false, display_event_scope: true, + display_request_context: true, prefix_string: None, } } @@ -92,6 +98,7 @@ impl CustomEventFormatter { display_filename: self.display_filename, display_line_number: self.display_line_number, display_event_scope: self.display_event_scope, + display_request_context: self.display_request_context, prefix_string: self.prefix_string, } } @@ -109,10 +116,19 @@ impl CustomEventFormatter { display_filename: self.display_filename, display_line_number: self.display_line_number, display_event_scope: self.display_event_scope, + display_request_context: self.display_request_context, prefix_string: self.prefix_string, } } + /// Sets whether or not request context (request_id, correlation_id) is displayed. + /// + /// When enabled, the formatter will look for `RequestContext` in span extensions + /// and display the request_id and correlation_id if present. + pub fn with_request_context(self, display_request_context: bool) -> CustomEventFormatter { + CustomEventFormatter { display_request_context, ..self } + } + /// Enable ANSI terminal colors for formatted output. pub fn with_ansi(self, ansi: bool) -> CustomEventFormatter { Self { ansi: Some(ansi), ..self } @@ -262,6 +278,66 @@ where write!(writer, "{} ", fmt_level)?; } + // Display request context (request_id, correlation_id) from span extensions or task-local + if self.display_request_context { + // Try span extensions first + let mut request_ctx = None; + if let Some(scope) = ctx.event_scope() { + for span in scope.from_root() { + let ext = span.extensions(); + if let Some(ctx) = ext.get::() { + if ctx.has_context() { + request_ctx = Some(ctx.clone()); + break; + } + } + } + } + // Fall back to task-local storage + if request_ctx.is_none() { + request_ctx = super::context_layer::get_task_request_context(); + } + + if let Some(request_ctx) = request_ctx { + let bracket_style = Style::new().dimmed(); + + write!(writer, "{}", bracket_style.paint("["))?; + let mut first = true; + if let Some(ref req_id) = request_ctx.request_id { + #[cfg(feature = "ansi")] + { + if writer.has_ansi_escapes() { + write!(writer, "req:{}", Color::Cyan.paint(req_id))?; + } else { + write!(writer, "req:{}", req_id)?; + } + } + #[cfg(not(feature = "ansi"))] + write!(writer, "req:{}", req_id)?; + first = false; + } + if let Some(ref corr_id) = request_ctx.correlation_id { + // Only show correlation_id if different from request_id + if request_ctx.request_id.as_ref() != Some(corr_id) { + if !first { + writer.write_char(' ')?; + } + #[cfg(feature = "ansi")] + { + if writer.has_ansi_escapes() { + write!(writer, "corr:{}", Color::Cyan.paint(corr_id))?; + } else { + write!(writer, "corr:{}", corr_id)?; + } + } + #[cfg(not(feature = "ansi"))] + write!(writer, "corr:{}", corr_id)?; + } + } + write!(writer, "{} ", bracket_style.paint("]"))?; + } + } + if self.display_thread_name { let current_thread = std::thread::current(); match current_thread.name() { diff --git a/rust/lit-core/lit-observability/src/logging/mod.rs b/rust/lit-core/lit-observability/src/logging/mod.rs index 96bfba34..2855e0b4 100644 --- a/rust/lit-core/lit-observability/src/logging/mod.rs +++ b/rust/lit-core/lit-observability/src/logging/mod.rs @@ -1,7 +1,6 @@ use std::str::FromStr; use crate::config::LitObservabilityConfig; -use event_format::CustomEventFormatter; use lit_core::{config::LitConfig, error::Result}; use opentelemetry_otlp::TonicExporterBuilder; use opentelemetry_sdk::{Resource, runtime}; @@ -10,8 +9,16 @@ use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt}; use crate::error::unexpected_err; +mod context_layer; mod event_format; +// Re-export context layer components for use by lit-node +pub use context_layer::{ + ContextAwareOtelLogLayer, RequestContext, clear_task_request_context, get_request_context, + set_request_context, +}; +pub use event_format::CustomEventFormatter; + /// Initialize a simple `tracing` subscriber that logs to stdout. pub fn simple_logging_subscriber( cfg: &LitConfig, prefix_string: Option, @@ -22,7 +29,7 @@ pub fn simple_logging_subscriber( .map_err(|e| unexpected_err(e.to_string(), Some("Could not create filter".to_string())))?; println!("Using level filter: {}", level_filter); - let custom_formatter = CustomEventFormatter::default() + let custom_formatter = event_format::CustomEventFormatter::default() .with_target(true) .with_source_location(true) .with_event_scope(false) @@ -50,7 +57,7 @@ pub fn simple_file_logging_subscriber( cfg.get_string("node.staker_address")?.to_lowercase(), ); - let custom_formatter = CustomEventFormatter::default() + let custom_formatter = event_format::CustomEventFormatter::default() .with_target(true) .with_source_location(true) .with_event_scope(false) diff --git a/rust/lit-node/Cargo.lock b/rust/lit-node/Cargo.lock index edcef4a5..40d26371 100644 --- a/rust/lit-node/Cargo.lock +++ b/rust/lit-node/Cargo.lock @@ -10055,6 +10055,7 @@ dependencies = [ name = "lit-observability" version = "0.1.0" dependencies = [ + "dashmap 6.1.0", "derive_more 2.0.1", "flume", "hyper-util", @@ -10063,7 +10064,6 @@ dependencies = [ "lit-logging", "nu-ansi-term", "opentelemetry 0.24.0", - "opentelemetry-appender-tracing", "opentelemetry-otlp 0.17.0", "opentelemetry-semantic-conventions 0.15.0", "opentelemetry_sdk 0.24.1", @@ -11494,18 +11494,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-appender-tracing" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" -dependencies = [ - "opentelemetry 0.24.0", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "opentelemetry-http" version = "0.27.0" diff --git a/rust/lit-node/lit-node/src/endpoints/versions/v2.rs b/rust/lit-node/lit-node/src/endpoints/versions/v2.rs index 6a37634d..96bfcfe1 100644 --- a/rust/lit-node/lit-node/src/endpoints/versions/v2.rs +++ b/rust/lit-node/lit-node/src/endpoints/versions/v2.rs @@ -266,13 +266,14 @@ pub(crate) async fn execute_function( #[cfg(feature = "lit-actions")] #[post("/web/job_status/v2", format = "json", data = "")] -#[instrument(level = "debug", name = "POST /web/job_status/v2", skip_all, ret)] +#[instrument(level = "debug", name = "POST /web/job_status/v2", skip_all, fields(correlation_id = tracing.correlation_id()), ret)] pub(crate) async fn get_job_status( job_status_request: Json>, action_store: &State, tss_state: &State>, cfg: &State, client_state: &State>, + tracing: Tracing, ) -> status::Custom { let (job_status_request, client_session) = match client_state.json_decrypt_to_session(&job_status_request) { diff --git a/rust/lit-node/lit-node/src/git_info.rs b/rust/lit-node/lit-node/src/git_info.rs index 416710dc..a70b78f2 100644 --- a/rust/lit-node/lit-node/src/git_info.rs +++ b/rust/lit-node/lit-node/src/git_info.rs @@ -1 +1 @@ -pub const GIT_COMMIT_HASH: &str = "50083ccf8061c4bfe17761be8b496bdab82851a6"; +pub const GIT_COMMIT_HASH: &str = "c6f85f5cb69315899476b07f92a430c7aeecd59a"; diff --git a/rust/lit-node/lit-node/src/main.rs b/rust/lit-node/lit-node/src/main.rs index 1fa2b63f..5d97a56f 100644 --- a/rust/lit-node/lit-node/src/main.rs +++ b/rust/lit-node/lit-node/src/main.rs @@ -55,6 +55,7 @@ use tracing::error; use tracing_subscriber::util::SubscriberInitExt; use crate::peers::grpc_client_pool::GrpcClientPool; +use crate::utils::rocket::fairings::RequestContextCleanupFairing; use crate::utils::web::default_http_client; use rocket::Request; use rocket::serde::json::Value; @@ -411,6 +412,7 @@ pub fn main() { .mount("/", endpoints::versions::v1::routes()) // include the v2 routes .mount("/", endpoints::versions::v2::routes()) + .attach(RequestContextCleanupFairing) .attach(cors) .attach(AdHoc::on_response("Version Header", |_, resp| { Box::pin(async move { diff --git a/rust/lit-node/lit-node/src/utils/rocket/fairings.rs b/rust/lit-node/lit-node/src/utils/rocket/fairings.rs new file mode 100644 index 00000000..a8bef9d0 --- /dev/null +++ b/rust/lit-node/lit-node/src/utils/rocket/fairings.rs @@ -0,0 +1,27 @@ +//! Rocket fairings for request lifecycle utilities. + +use lit_observability::logging::clear_task_request_context; +use rocket::Data; +use rocket::fairing::{Fairing, Info, Kind}; +use rocket::{Request, Response}; + +/// Clears task-local request context at request boundaries to avoid stale IDs. +pub struct RequestContextCleanupFairing; + +#[rocket::async_trait] +impl Fairing for RequestContextCleanupFairing { + fn info(&self) -> Info { + Info { + name: "Request Context Cleanup", + kind: Kind::Request | Kind::Response, + } + } + + async fn on_request(&self, _req: &mut Request<'_>, _data: &mut Data<'_>) { + clear_task_request_context(); + } + + async fn on_response<'r>(&self, _req: &'r Request<'_>, _res: &mut Response<'r>) { + clear_task_request_context(); + } +} diff --git a/rust/lit-node/lit-node/src/utils/rocket/guards.rs b/rust/lit-node/lit-node/src/utils/rocket/guards.rs index 905fd4b9..a8bdf36c 100644 --- a/rust/lit-node/lit-node/src/utils/rocket/guards.rs +++ b/rust/lit-node/lit-node/src/utils/rocket/guards.rs @@ -2,6 +2,7 @@ use rocket::http::HeaderMap; use rocket::request::{FromRequest, Outcome, Request}; use rocket::serde::json::Value; +/// Rocket request guard that extracts HTTP headers from the incoming request. pub struct RequestHeaders<'r> { pub headers: HeaderMap<'r>, } diff --git a/rust/lit-node/lit-node/src/utils/rocket/mod.rs b/rust/lit-node/lit-node/src/utils/rocket/mod.rs index 873678bc..4c224586 100644 --- a/rust/lit-node/lit-node/src/utils/rocket/mod.rs +++ b/rust/lit-node/lit-node/src/utils/rocket/mod.rs @@ -1 +1,2 @@ +pub mod fairings; pub mod guards; diff --git a/rust/lit-os/Cargo.lock b/rust/lit-os/Cargo.lock index 20313822..0a7c8550 100644 --- a/rust/lit-os/Cargo.lock +++ b/rust/lit-os/Cargo.lock @@ -10650,6 +10650,7 @@ dependencies = [ name = "lit-observability" version = "0.1.0" dependencies = [ + "dashmap 6.1.0", "derive_more 2.0.1", "flume", "hyper-util", @@ -10658,7 +10659,6 @@ dependencies = [ "lit-logging", "nu-ansi-term 0.50.1", "opentelemetry 0.24.0 (registry+https://github.com/rust-lang/crates.io-index)", - "opentelemetry-appender-tracing", "opentelemetry-otlp 0.17.0", "opentelemetry-semantic-conventions 0.15.0", "opentelemetry_sdk 0.24.1", @@ -12392,18 +12392,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-appender-tracing" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" -dependencies = [ - "opentelemetry 0.24.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "opentelemetry-http" version = "0.27.0"