Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ pub struct LoggingProviders {
pub internal: ProviderMode,
}

impl LoggingProviders {
/// Returns true if this requires a LogsReporter channel for
/// asynchronous logging due to its global or engine setting.
/// (Internal providers are forbidden from this case in validate.)
#[must_use]
pub const fn needs_reporter(&self) -> bool {
self.global.needs_reporter() || self.engine.needs_reporter()
}
}

/// Logs producer: how log events are captured and routed.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(rename_all = "lowercase")]
Expand Down
6 changes: 3 additions & 3 deletions rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use otap_df_state::DeployedPipelineKey;
use otap_df_state::event::{ErrorSummary, ObservedEvent};
use otap_df_state::reporter::ObservedEventReporter;
use otap_df_state::store::ObservedStateStore;
use otap_df_telemetry::opentelemetry_client::OpentelemetryClient;
use otap_df_telemetry::telemetry_runtime::TelemetryRuntime;
use otap_df_telemetry::reporter::MetricsReporter;
use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn};
use std::thread;
Expand Down Expand Up @@ -83,7 +83,7 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {
node_ctrl_msg_channel_size = settings.default_node_ctrl_msg_channel_size,
pipeline_ctrl_msg_channel_size = settings.default_pipeline_ctrl_msg_channel_size
);
let opentelemetry_client = OpentelemetryClient::new(telemetry_config)?;
let telemetry_runtime = TelemetryRuntime::new(telemetry_config)?;
let metrics_system = InternalTelemetrySystem::new(telemetry_config);
let metrics_dispatcher = metrics_system.dispatcher();
let metrics_reporter = metrics_system.reporter();
Expand Down Expand Up @@ -257,7 +257,7 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {
handle.shutdown_and_join()?;
}
obs_state_join_handle.shutdown_and_join()?;
opentelemetry_client.shutdown()?;
telemetry_runtime.shutdown()?;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ macro_rules! raw_error {
}};
}

#[cfg(test)]
mod tests {
#[test]
fn test_raw_error() {
Expand Down
39 changes: 38 additions & 1 deletion rust/otap-dataflow/crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ use std::sync::Arc;

use crate::error::Error;
use crate::registry::TelemetryRegistryHandle;
use crate::logs::LogsReceiver;
use otap_df_config::pipeline::service::telemetry::TelemetryConfig;
use tokio_util::sync::CancellationToken;
use otap_df_config::pipeline::service::telemetry::logs::LogLevel;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

pub mod attributes;
pub mod collector;
Expand All @@ -39,8 +43,10 @@ pub mod error;
pub mod instrument;
/// Internal logs/events module for engine.
pub mod internal_events;
pub mod logs;
pub mod metrics;
pub mod opentelemetry_client;
pub mod resource;
pub mod telemetry_runtime;
pub mod registry;
pub mod reporter;
pub mod self_tracing;
Expand Down Expand Up @@ -153,3 +159,34 @@ impl Default for InternalTelemetrySystem {
Self::new(&TelemetryConfig::default())
}
}

/// Runtime settings for internal telemetry injection into a receiver.
///
/// This struct bundles the logs receiver channel and pre-encoded resource bytes
/// that should be injected into the Internal Telemetry Receiver node.
#[derive(Clone)]
pub struct InternalTelemetrySettings {
/// The logs receiver channel.
pub logs_receiver: LogsReceiver,
/// Pre-encoded resource bytes for OTLP log encoding.
pub resource_bytes: bytes::Bytes,
}

/// Creates an `EnvFilter` for the given log level.
///
/// If `RUST_LOG` is set in the environment, it takes precedence for fine-grained control.
/// Otherwise, falls back to the config level with known noisy dependencies (h2, hyper) silenced.
#[must_use]
pub fn get_env_filter(level: LogLevel) -> EnvFilter {
let level = match level {
LogLevel::Off => LevelFilter::OFF,
LogLevel::Debug => LevelFilter::DEBUG,
LogLevel::Info => LevelFilter::INFO,
LogLevel::Warn => LevelFilter::WARN,
LogLevel::Error => LevelFilter::ERROR,
};
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
// Default filter: use config level, but silence known noisy HTTP dependencies
EnvFilter::new(format!("{level},h2=off,hyper=off"))
})
}
191 changes: 191 additions & 0 deletions rust/otap-dataflow/crates/telemetry/src/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Internal logs collection for OTAP-Dataflow.

use crate::self_tracing::{ConsoleWriter, LogRecord, RawLoggingLayer, SavedCallsite};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use otap_df_config::pipeline::service::telemetry::logs::LogLevel;
use tracing::{Event, Subscriber};
use tracing_subscriber::Registry;
use tracing_subscriber::layer::{Context, Layer as TracingLayer, SubscriberExt};
use tracing_subscriber::registry::LookupSpan;

/// A payload of log data
/// TODO: merge with Event in crates/state
pub enum LogPayload {
/// A single record.
Singleton(LogRecord),
}

/// Reporter for sending log batches through a channel.
pub type LogsReporter = flume::Sender<LogPayload>;

/// Type alias for the log payload receiver channel.
pub type LogsReceiver = flume::Receiver<LogPayload>;

/// Create a reporter and receiver pair without the collector.
///
/// Use this when the receiver will be consumed elsewhere (e.g., by the
/// Internal Telemetry Receiver node).
#[must_use]
pub fn channel(channel_size: usize) -> (LogsReporter, LogsReceiver) {
flume::bounded(channel_size)
}

/// Direct logs collector
pub struct DirectCollector {
writer: ConsoleWriter,
receiver: LogsReceiver,
}

impl DirectCollector {
/// New collector with writer.
pub fn new(writer: ConsoleWriter, receiver: LogsReceiver) -> Self {
Self { writer, receiver }
}

/// Run the collection loop until the channel is closed.
pub async fn run(self) -> Result<(), crate::Error> {
loop {
match self.receiver.recv_async().await {
Ok(payload) => {
self.write_batch(payload);
}
Err(err) => {
crate::raw_error!("log collector error:", err = err.to_string());
return Ok(());
}
}
}
}

/// Write a batch of log records to console.
fn write_batch(&self, payload: LogPayload) {
match payload {
LogPayload::Singleton(record) => self.write_record(record),
}
}

/// Write one record.
fn write_record(&self, record: LogRecord) {
// Identifier.0 is the &'static dyn Callsite
let metadata = record.callsite_id.0.metadata();
let saved = SavedCallsite::new(metadata);
self.writer.print_log_record(&record, &saved);
}
}

/// A tracing Layer that sends each record immediately.
pub struct ImmediateLayer {
/// Reporter for sending to the channel.
reporter: LogsReporter,
}

impl ImmediateLayer {
/// Create a new unbuffered layer.
#[must_use]
pub fn new(reporter: LogsReporter) -> Self {
Self { reporter }
}
}

impl<S> TracingLayer<S> for ImmediateLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let record = LogRecord::new(event);

match self.reporter.try_send(LogPayload::Singleton(record)) {
Ok(()) => {}
Err(err) => {
crate::raw_error!("failed to send log", err = %err);
}
};
}
}

/// Telemetry setup for pipeline threads, carrying the data needed for each mode.
#[derive(Clone)]
pub enum TelemetrySetup {
/// Logs are silently dropped.
Noop,
/// Synchronous raw logging to console.
Raw,
/// Channeled: both ITS and ConsoleAsync cases.
Channeled {
/// Reporter to send.
reporter: LogsReporter,
},
/// OpenTelemetry SDK: logs go through the OpenTelemetry logging pipeline.
OpenTelemetry {
/// The OpenTelemetry SDK logger provider.
logger_provider: SdkLoggerProvider,
},
}

impl TelemetrySetup {
/// Initialize this setup as the global tracing subscriber.
///
/// This is used during startup to set the global subscriber. Returns an error
/// if a global subscriber has already been set.
pub fn try_init_global(
&self,
log_level: LogLevel,
) -> Result<(), tracing_subscriber::util::TryInitError> {
use tracing_subscriber::util::SubscriberInitExt;

let filter = crate::get_env_filter(log_level);

match self {
TelemetrySetup::Noop => tracing::subscriber::NoSubscriber::new().try_init(),
TelemetrySetup::Raw => Registry::default()
.with(filter)
.with(RawLoggingLayer::new(ConsoleWriter::no_color()))
.try_init(),
TelemetrySetup::Channeled { reporter } => {
let layer = ImmediateLayer::new(reporter.clone());
Registry::default().with(filter).with(layer).try_init()
}
TelemetrySetup::OpenTelemetry { logger_provider } => {
let sdk_layer = OpenTelemetryTracingBridge::new(logger_provider);
Registry::default().with(filter).with(sdk_layer).try_init()
}
}
}

/// Run a closure with the appropriate tracing subscriber for this setup.
///
/// The closure runs with the configured logging layer active.
pub fn with_subscriber<F, R>(&self, log_level: LogLevel, f: F) -> R
where
F: FnOnce() -> R,
{
let filter = crate::get_env_filter(log_level);

match self {
TelemetrySetup::Noop => {
let subscriber = tracing::subscriber::NoSubscriber::new();
tracing::subscriber::with_default(subscriber, f)
}
TelemetrySetup::Raw => {
let subscriber = Registry::default()
.with(filter)
.with(RawLoggingLayer::new(ConsoleWriter::no_color()));
tracing::subscriber::with_default(subscriber, f)
}
TelemetrySetup::Channeled { reporter } => {
let layer = ImmediateLayer::new(reporter.clone());
let subscriber = Registry::default().with(filter).with(layer);
tracing::subscriber::with_default(subscriber, f)
}
TelemetrySetup::OpenTelemetry { logger_provider } => {
let sdk_layer = OpenTelemetryTracingBridge::new(logger_provider);
let subscriber = Registry::default().with(filter).with(sdk_layer);
tracing::subscriber::with_default(subscriber, f)
}
}
}
}
Loading
Loading