Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,298 @@

pub mod processors;

use crate::error::Error;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// Internal logs configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct LogsConfig {
/// The log level for internal engine logs.
#[serde(default)]
pub level: LogLevel,

/// The list of log processors to configure.
/// Logging provider configuration.
#[serde(default = "default_providers")]
pub providers: LoggingProviders,

/// OpenTelemetry SDK is configured via processors.
#[serde(default)]
pub processors: Vec<processors::LogProcessorConfig>,
}

/// Log level for internal engine logs.
///
/// TODO: Change default to `Info` once per-thread subscriber is implemented
/// to avoid contention from the global tracing subscriber.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
/// Log level for dataflow engine logs.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
/// Logging is completely disabled.
#[default]
Off,
/// Debug level logging.
Debug,
/// Info level logging.
#[default]
Info,
/// Warn level logging.
Warn,
/// Error level logging.
Error,
}

/// Logging providers for different execution contexts.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct LoggingProviders {
/// Provider mode for non-engine threads. This defines the global Tokio
/// `tracing` subscriber. Default is Unbuffered. Note that Buffered
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to newer terminology instead of mentioning "Buffered" and "Unbuffered".

/// requires opt-in thread-local setup.
#[serde(default = "default_global_provider")]
pub global: ProviderMode,

/// Provider mod for engine/pipeline threads. This defines how the
/// engine thread / core sets the Tokio `tracing`
/// subscriber. Default is Buffered. Internal logs will be flushed
/// by either the Internal Telemetry Receiver or the main pipeline
/// controller.
#[serde(default = "default_engine_provider")]
pub engine: ProviderMode,

/// Provider mode for nodes downstream of Internal Telemetry receiver.
/// This defaults to Noop to avoid internal feedback.
#[serde(default = "default_internal_provider")]
pub internal: ProviderMode,
}

/// Logs producer: how log events are captured and routed.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum ProviderMode {
/// Log events are silently ignored.
Noop,

/// Delivery using the internal telemetry system.
ITS,

/// Use OTel-Rust as the provider.
OpenTelemetry,

/// Asynchronous console logging.
/// The caller writes to a channel the same as ITS deliver, but bypasses
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The caller writes to a channel the same as ITS deliver, but bypasses
/// The caller writes to a channel the same as ITS delivery, but bypasses

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

/// the internal pipeline with console logging.
ConsoleAsync,

/// Synchronous console logging. Note! This can block the producing thread.
/// The caller writes directly to the console.
ConsoleDirect,
}

impl ProviderMode {
/// Returns true if this requires a LogsReporter channel for
/// asynchronous logging.
#[must_use]
pub fn needs_reporter(&self) -> bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn needs_reporter(&self) -> bool {
pub const fn needs_reporter(&self) -> bool {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I think I don't fully understand why I should use const keywords.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly to help the compiler apply optimizations. However, marking a method const doesn't guarantee a performance improvement in all cases. The most evident benefit of using const on the method is that it allows your method to be called from const expressions.

On thinking some more, I think we also have to keep in mind that for public methods removing const later would be a breaking change.

matches!(self, Self::ITS | Self::ConsoleAsync)
}
}

fn default_global_provider() -> ProviderMode {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These methods can be marked const.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

ProviderMode::ITS
}

fn default_engine_provider() -> ProviderMode {
ProviderMode::ITS
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not consistent with the default config yaml shared in the self_tracing_architecture.md which has ConsoleAsync as the default.

service:
  telemetry:
    logs:
      level: info
      providers:
        global: console_async
        engine: console_async
        internal: noop

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. In the long term I hope ITS becomes the default, but for now ConsoleAsync is safest.


fn default_internal_provider() -> ProviderMode {
ProviderMode::Noop
}

fn default_providers() -> LoggingProviders {
LoggingProviders {
global: default_global_provider(),
engine: default_engine_provider(),
internal: default_internal_provider(),
}
}

impl Default for LogsConfig {
fn default() -> Self {
Self {
level: LogLevel::default(),
providers: default_providers(),
processors: Vec::new(),
}
}
}

impl LogsConfig {
/// Validate the logs configuration.
///
/// Returns an error if:
/// - `internal` is configured to use ITS, ConsoleAsync (needs_reporter())
/// - `engine` is `OpenTelemetry` but `global` is not
/// (current implementation restriction).
pub fn validate(&self) -> Result<(), Error> {
if self.providers.internal.needs_reporter() {
return Err(Error::InvalidUserConfig {
error: format!(
"internal provider is invalid: {:?}",
self.providers.internal
),
});
}
// Current implementation restriction: engine OpenTelemetry requires global OpenTelemetry.
// The SDK logger provider is only created when the global provider is OpenTelemetry.
// This could be lifted in the future by creating the logger provider independently.
if self.providers.engine == ProviderMode::OpenTelemetry
&& self.providers.global != ProviderMode::OpenTelemetry
{
return Err(Error::InvalidUserConfig {
error: "engine provider 'opentelemetry' requires global provider to also be \
'opentelemetry' (current implementation restriction)"
.into(),
});
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

/// Helper to parse YAML into LogsConfig.
fn parse(yaml: &str) -> LogsConfig {
serde_yaml::from_str(yaml).unwrap()
}

/// Helper to create LoggingProviders with specified modes.
fn providers(
global: ProviderMode,
engine: ProviderMode,
internal: ProviderMode,
) -> LoggingProviders {
LoggingProviders {
global,
engine,
internal,
}
}

/// Helper to create a config with custom providers.
fn config_with(
global: ProviderMode,
engine: ProviderMode,
internal: ProviderMode,
) -> LogsConfig {
LogsConfig {
providers: providers(global, engine, internal),
..Default::default()
}
}

/// Asserts validation fails with expected substring in error message.
fn assert_invalid(config: &LogsConfig, expected_msg: &str) {
let err = config.validate().unwrap_err();
assert!(matches!(err, Error::InvalidUserConfig { .. }));
assert!(
err.to_string().contains(expected_msg),
"Expected '{}' in: {}",
expected_msg,
err
);
}

// ==================== Defaults & Parsing ====================

#[test]
fn test_logs_config_deserialize() {
let yaml_str = r#"
level: "info"
processors:
- batch:
exporter:
console:
"#;
let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap();
fn test_defaults() {
// Manual Default impl matches serde defaults
let config = LogsConfig::default();
assert_eq!(config.level, LogLevel::Info);
assert_eq!(config.processors.len(), 1);
assert_eq!(config.providers.global, ProviderMode::ITS);
assert_eq!(config.providers.engine, ProviderMode::ITS);
assert_eq!(config.providers.internal, ProviderMode::Noop);
assert!(config.processors.is_empty());

// Serde defaults should match Rust Default
let parsed = parse("{}");
assert_eq!(parsed.level, config.level);
assert_eq!(parsed.providers.global, config.providers.global);
assert_eq!(parsed.providers.engine, config.providers.engine);
assert_eq!(parsed.providers.internal, config.providers.internal);
}

#[test]
fn test_log_level_deserialize() {
let yaml_str = r#"
level: "info"
"#;
let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.level, LogLevel::Info);
fn test_log_level_parsing() {
let cases = [
("off", LogLevel::Off),
("debug", LogLevel::Debug),
("info", LogLevel::Info),
("warn", LogLevel::Warn),
("error", LogLevel::Error),
];
for (name, expected) in cases {
assert_eq!(parse(&format!("level: {name}")).level, expected);
}
}

#[test]
fn test_logs_config_default_deserialize() -> Result<(), serde_yaml::Error> {
let yaml_str = r#""#;
let config: LogsConfig = serde_yaml::from_str(yaml_str)?;
assert_eq!(config.level, LogLevel::Off);
assert!(config.processors.is_empty());
Ok(())
fn test_provider_mode_parsing() {
let config = parse("providers: { global: noop, engine: its, internal: console_direct }");
assert_eq!(config.providers.global, ProviderMode::Noop);
assert_eq!(config.providers.engine, ProviderMode::ITS);
assert_eq!(config.providers.internal, ProviderMode::ConsoleDirect);

let config = parse("providers: { global: opentelemetry, engine: opentelemetry }");
assert_eq!(config.providers.global, ProviderMode::OpenTelemetry);
assert_eq!(config.providers.engine, ProviderMode::OpenTelemetry);
}

// ==================== ProviderMode::needs_reporter ====================

#[test]
fn test_needs_reporter() {
use ProviderMode::*;
let cases = [
(Noop, false),
(ITS, true),
(OpenTelemetry, false),
(ConsoleDirect, false),
(ConsoleAsync, true),
];
for (mode, expected) in cases {
assert_eq!(mode.needs_reporter(), expected, "{mode:?}");
}
}

// ==================== Validation ====================

#[test]
fn test_validate_default_succeeds() {
assert!(LogsConfig::default().validate().is_ok());
}

#[test]
fn test_validate_internal_cannot_use_reporter() {
use ProviderMode::*;
let config = config_with(Noop, Noop, ITS);
assert_invalid(&config, "internal provider is invalid");

let config = config_with(Noop, Noop, ConsoleAsync);
assert_invalid(&config, "internal provider is invalid");
}

#[test]
fn test_validate_engine_otel_requires_global_otel() {
use ProviderMode::*;
// Engine OpenTelemetry without global OpenTelemetry fails
for global in [Noop, ITS, ConsoleDirect] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for global in [Noop, ITS, ConsoleDirect] {
for global in [Noop, ITS, ConsoleAsync, ConsoleDirect] {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

let config = config_with(global, OpenTelemetry, Noop);
assert_invalid(&config, "opentelemetry");
}

// Both OpenTelemetry succeeds
let config = config_with(OpenTelemetry, OpenTelemetry, Noop);
assert!(config.validate().is_ok());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ mod tests {
},
),
],
..Default::default()
};
let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?;
let (sdk_logger_provider, _) = logger_provider.into_parts();
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {
},
),
],
..Default::default()
};
let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?;
let (sdk_logger_provider, runtime_option) = logger_provider.into_parts();
Expand All @@ -309,8 +311,7 @@ mod tests {
fn test_logger_provider_configure_default() -> Result<(), Error> {
let resource = Resource::builder().build();
let logger_config = LogsConfig {
level: LogLevel::default(),
processors: vec![],
..Default::default()
};
let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?;
let (sdk_logger_provider, _) = logger_provider.into_parts();
Expand Down
Loading
Loading