From 25486ed82072d3920b4fc74bf4d605b9d037a048 Mon Sep 17 00:00:00 2001 From: lquerel Date: Fri, 16 Jan 2026 07:28:21 -0800 Subject: [PATCH 01/13] Add engine config file with support for 1 pipeline group and 1 pipeline --- .../otap-dataflow/crates/config/src/engine.rs | 61 +++++++- rust/otap-dataflow/src/main.rs | 146 +++++++++++++----- 2 files changed, 169 insertions(+), 38 deletions(-) diff --git a/rust/otap-dataflow/crates/config/src/engine.rs b/rust/otap-dataflow/crates/config/src/engine.rs index 7c00454f40..14bfe1fd83 100644 --- a/rust/otap-dataflow/crates/config/src/engine.rs +++ b/rust/otap-dataflow/crates/config/src/engine.rs @@ -4,11 +4,12 @@ //! The configuration for the dataflow engine. use crate::PipelineGroupId; -use crate::error::Error; +use crate::error::{Context, Error}; use crate::pipeline_group::PipelineGroupConfig; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::path::Path; /// Root configuration for the pipeline engine. /// Contains engine-level settings and all pipeline groups. @@ -87,6 +88,64 @@ impl EngineConfig { Ok(config) } + /// Creates a new `EngineConfig` with the given YAML string. + pub fn from_yaml(yaml: &str) -> Result { + let config: EngineConfig = + serde_yaml::from_str(yaml).map_err(|e| Error::DeserializationError { + context: Context::default(), + format: "YAML".to_string(), + details: e.to_string(), + })?; + config.validate()?; + Ok(config) + } + + /// Load an [`EngineConfig`] from a JSON file. + pub fn from_json_file>(path: P) -> Result { + let contents = std::fs::read_to_string(path).map_err(|e| Error::FileReadError { + context: Context::default(), + details: e.to_string(), + })?; + Self::from_json(&contents) + } + + /// Load an [`EngineConfig`] from a YAML file. + pub fn from_yaml_file>(path: P) -> Result { + let contents = std::fs::read_to_string(path).map_err(|e| Error::FileReadError { + context: Context::default(), + details: e.to_string(), + })?; + Self::from_yaml(&contents) + } + + /// Load an [`EngineConfig`] from a file, automatically detecting the format based on file extension. + /// + /// Supports: + /// - JSON files: `.json` + /// - YAML files: `.yaml`, `.yml` + pub fn from_file>(path: P) -> Result { + let path = path.as_ref(); + let extension = path + .extension() + .and_then(|ext| ext.to_str()) + .map(|ext| ext.to_lowercase()); + + match extension.as_deref() { + Some("json") => Self::from_json_file(path), + Some("yaml") | Some("yml") => Self::from_yaml_file(path), + _ => { + let details = format!( + "Unsupported file extension: {}. Supported extensions are: .json, .yaml, .yml", + extension.unwrap_or_else(|| "".to_string()) + ); + Err(Error::FileReadError { + context: Context::default(), + details, + }) + } + } + } + /// Validates the engine configuration and returns a [`Error::InvalidConfiguration`] error /// containing all validation errors found in the pipeline groups. pub fn validate(&self) -> Result<(), Error> { diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index e242b472cf..3b075853f5 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -3,7 +3,8 @@ //! Create and run a multi-core pipeline -use clap::Parser; +use clap::{ArgGroup, Parser}; +use otap_df_config::engine::{EngineConfig, HttpAdminSettings}; use otap_df_config::pipeline::PipelineConfig; use otap_df_config::pipeline_group::{CoreAllocation, CoreRange, Quota}; use otap_df_config::{PipelineGroupId, PipelineId}; @@ -44,24 +45,33 @@ static GLOBAL: Jemalloc = Jemalloc; version, about, long_about = None, - after_help = system_info() + after_help = system_info(), + group = ArgGroup::new("config_source") + .required(true) + .multiple(false) + .args(["pipeline", "config"]) )] struct Args { /// Path to the pipeline configuration file (.json, .yaml, or .yml) - #[arg(short, long)] - pipeline: PathBuf, + #[arg(short, long, value_name = "FILE", group = "config_source")] + pipeline: Option, + + /// Path to the engine configuration file (.json, .yaml, or .yml) + #[arg(short = 'c', long, value_name = "FILE", group = "config_source")] + config: Option, /// Number of cores to use (0 for default) - #[arg(long, default_value = "0", conflicts_with = "core_id_range")] - num_cores: usize, + #[arg(long, conflicts_with = "core_id_range")] + num_cores: Option, /// Inclusive range of CPU core IDs to pin threads to (e.g. "0-3", "0..3,5", "0..=3,6-7"). #[arg(long, value_name = "START..END", value_parser = parse_core_id_allocation, conflicts_with = "num_cores")] core_id_range: Option, - /// Address to bind the HTTP admin server to (e.g., "127.0.0.1:8080", "0.0.0.0:8080") - #[arg(long, default_value = "127.0.0.1:8080")] - http_admin_bind: String, + /// Address to bind the HTTP admin server to (e.g., "127.0.0.1:8080", "0.0.0.0:8080"). + /// Defaults to 127.0.0.1:8080 when unset. + #[arg(long)] + http_admin_bind: Option, } fn parse_core_id_allocation(s: &str) -> Result { @@ -114,38 +124,103 @@ fn main() -> Result<(), Box> { .install_default() .map_err(|e| format!("Failed to install rustls crypto provider: {e:?}"))?; - let args = Args::parse(); - - // For now, we predefine pipeline group and pipeline IDs. - // That will be replaced with a more dynamic approach in the future. - let pipeline_group_id: PipelineGroupId = "default_pipeline_group".into(); - let pipeline_id: PipelineId = "default_pipeline".into(); + let Args { + pipeline, + config, + num_cores, + core_id_range, + http_admin_bind, + } = Args::parse(); println!("{}", system_info()); - // Load pipeline configuration from file - let pipeline_cfg = PipelineConfig::from_file( - pipeline_group_id.clone(), - pipeline_id.clone(), - &args.pipeline, - )?; + let default_admin_bind = HttpAdminSettings::default().bind_address; + let core_allocation_override = match (core_id_range, num_cores) { + (Some(range), _) => Some(range), + (None, Some(num_cores)) => Some(if num_cores == 0 { + CoreAllocation::AllCores + } else { + CoreAllocation::CoreCount { count: num_cores } + }), + (None, None) => None, + }; + let core_allocation_override_for_engine = core_allocation_override.clone(); + let core_allocation_override_for_pipeline = core_allocation_override; + let http_admin_bind_for_engine = http_admin_bind.clone(); + let http_admin_bind_for_pipeline = http_admin_bind; + + let (pipeline_group_id, pipeline_id, pipeline_cfg, quota, admin_settings) = + if let Some(config_path) = config { + let engine_cfg = EngineConfig::from_file(config_path)?; + if engine_cfg.pipeline_groups.len() != 1 { + return Err(format!( + "Engine config must define exactly one pipeline group, found {}", + engine_cfg.pipeline_groups.len() + ) + .into()); + } + let EngineConfig { + settings, + pipeline_groups, + } = engine_cfg; + let (pipeline_group_id, pipeline_group_cfg) = + pipeline_groups.into_iter().next().expect("pipeline group missing"); + if pipeline_group_cfg.pipelines.len() != 1 { + return Err(format!( + "Engine config must define exactly one pipeline, found {}", + pipeline_group_cfg.pipelines.len() + ) + .into()); + } + let otap_df_config::pipeline_group::PipelineGroupConfig { pipelines, quota } = + pipeline_group_cfg; + let (pipeline_id, pipeline_cfg) = + pipelines.into_iter().next().expect("pipeline missing"); + let admin_settings = if let Some(bind_address) = http_admin_bind_for_engine { + HttpAdminSettings { bind_address } + } else if let Some(config_admin) = settings.http_admin { + config_admin + } else { + HttpAdminSettings { + bind_address: default_admin_bind.clone(), + } + }; + let quota = match core_allocation_override_for_engine { + Some(core_allocation) => Quota { core_allocation }, + None => quota, + }; + (pipeline_group_id, pipeline_id, pipeline_cfg, quota, admin_settings) + } else { + // For now, we predefine pipeline group and pipeline IDs. + // That will be replaced with a more dynamic approach in the future. + let pipeline_group_id: PipelineGroupId = "default_pipeline_group".into(); + let pipeline_id: PipelineId = "default_pipeline".into(); + let pipeline_path = match pipeline { + Some(path) => path, + None => { + return Err("Missing --pipeline argument".into()); + } + }; + + // Load pipeline configuration from file + let pipeline_cfg = PipelineConfig::from_file( + pipeline_group_id.clone(), + pipeline_id.clone(), + &pipeline_path, + )?; + let core_allocation = + core_allocation_override_for_pipeline.unwrap_or(CoreAllocation::AllCores); + let quota = Quota { core_allocation }; + let admin_settings = HttpAdminSettings { + bind_address: http_admin_bind_for_pipeline + .unwrap_or_else(|| default_admin_bind.clone()), + }; + (pipeline_group_id, pipeline_id, pipeline_cfg, quota, admin_settings) + }; // Create controller and start pipeline with multi-core support let controller = Controller::new(&OTAP_PIPELINE_FACTORY); - // Map CLI arguments to the core allocation enum - let core_allocation = if let Some(range) = args.core_id_range { - range - } else if args.num_cores == 0 { - CoreAllocation::AllCores - } else { - CoreAllocation::CoreCount { - count: args.num_cores, - } - }; - - let quota = Quota { core_allocation }; - // Print the requested core configuration match "a.core_allocation { CoreAllocation::AllCores => println!("Requested core allocation: all available cores"), @@ -155,9 +230,6 @@ fn main() -> Result<(), Box> { } } - let admin_settings = otap_df_config::engine::HttpAdminSettings { - bind_address: args.http_admin_bind, - }; let result = controller.run_forever( pipeline_group_id, pipeline_id, From 422bf78050c3b562f90e4b07ef0dcbc506ee56f1 Mon Sep 17 00:00:00 2001 From: lquerel Date: Fri, 16 Jan 2026 15:44:44 -0800 Subject: [PATCH 02/13] Add engine config file with support for 1 pipeline group and 1 pipeline --- .../engine-conf/continuous_benchmark.yaml | 160 ++++++++++++++++++ .../otap-dataflow/crates/config/src/engine.rs | 8 + .../crates/config/src/pipeline.rs | 120 ++++++++++++- .../crates/config/src/pipeline_group.rs | 117 ------------- .../crates/controller/src/error.rs | 2 +- .../crates/controller/src/lib.rs | 10 +- rust/otap-dataflow/src/main.rs | 42 ++--- 7 files changed, 310 insertions(+), 149 deletions(-) create mode 100644 rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml diff --git a/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml b/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml new file mode 100644 index 0000000000..68c15c3491 --- /dev/null +++ b/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml @@ -0,0 +1,160 @@ +settings: + http_admin: + bind_address: 127.0.0.1:8085 + +pipeline_groups: + # ======================================================================== + # All the pipelines required for continuous benchmarking in a single group + # ======================================================================== + continuous_benchmark: + pipelines: + # ====================================================================== + # Pipeline generating traffic + # ====================================================================== + traffic_gen: + quota: + core_allocation: + type: core_set + set: + - type: CoreRange + start: 20 + end: 45 + + nodes: + receiver: + kind: receiver + plugin_urn: "urn:otel:otap:fake_data_generator:receiver" + out_ports: + out_port: + destinations: + - exporter + dispatch_strategy: round_robin + config: + traffic_config: + signals_per_second: 100000 + max_signal_count: null + metric_weight: 0 + trace_weight: 0 + log_weight: 30 + registry_path: https://github.com/open-telemetry/semantic-conventions.git[model] + exporter: + kind: exporter + plugin_urn: "urn:otel:otlp:exporter" + config: + grpc_endpoint: "http://127.0.0.1:4327" + + # ====================================================================== + # System Under Test pipeline + # ====================================================================== + sut: + quota: + core_allocation: + type: core_set + set: + - type: CoreRange + start: 11 + end: 11 + nodes: + otlp_recv: + kind: receiver + plugin_urn: "urn:otel:otlp:receiver" + out_ports: + out_port: + destinations: + - router + dispatch_strategy: round_robin + config: + listening_addr: "127.0.0.1:4327" + wait_for_result: true + + otap_recv: + kind: receiver + plugin_urn: "urn:otel:otap:receiver" + out_ports: + out_port: + destinations: + - otap_exporter + dispatch_strategy: round_robin + config: + listening_addr: "127.0.0.1:4329" + response_stream_channel_size: 256 + + router: + kind: processor + plugin_urn: "urn:otap:processor:signal_type_router" + out_ports: + logs: + destinations: + - retry + dispatch_strategy: round_robin + metrics: + destinations: + - metrics_exporter + dispatch_strategy: round_robin + traces: + destinations: + - spans_exporter + dispatch_strategy: round_robin + config: {} + + retry: + kind: processor + plugin_urn: "urn:otel:retry:processor" + out_ports: + out_port: + destinations: + - logs_exporter + dispatch_strategy: round_robin + config: + multiplier: 1.5 + + logs_exporter: + kind: exporter + plugin_urn: "urn:otel:otlp:exporter" + config: + grpc_endpoint: "http://127.0.0.1:4328" + max_in_flight: 6 + + metrics_exporter: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + + spans_exporter: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + + otap_exporter: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + + # ====================================================================== + # Performance No-Op pipeline to measure receiver performance + # ====================================================================== + backend: + quota: + core_allocation: + type: core_set + set: + - type: CoreRange + start: 12 + end: 12 + + nodes: + receiver: + kind: receiver + plugin_urn: urn:otel:otlp:receiver + out_ports: + out_port: + destinations: + - perf_noop + dispatch_strategy: round_robin + config: + listening_addr: 127.0.0.1:4328 + + perf_noop: + kind: exporter + plugin_urn: urn:otel:noop:exporter + config: null diff --git a/rust/otap-dataflow/crates/config/src/engine.rs b/rust/otap-dataflow/crates/config/src/engine.rs index 14bfe1fd83..6b8a9548b4 100644 --- a/rust/otap-dataflow/crates/config/src/engine.rs +++ b/rust/otap-dataflow/crates/config/src/engine.rs @@ -31,6 +31,7 @@ pub struct EngineSettings { pub http_admin: Option, /// Telemetry settings. + #[serde(default = "default_telemetry_settings")] pub telemetry: TelemetrySettings, } @@ -63,6 +64,13 @@ impl Default for HttpAdminSettings { } } +const fn default_telemetry_settings() -> TelemetrySettings { + TelemetrySettings { + reporting_channel_size: default_reporting_channel_size(), + flush_interval: default_reporting_interval() + } +} + const fn default_reporting_channel_size() -> usize { 100 } diff --git a/rust/otap-dataflow/crates/config/src/pipeline.rs b/rust/otap-dataflow/crates/config/src/pipeline.rs index 078f49e7ef..c8d54697a6 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline.rs @@ -15,6 +15,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::{HashMap, HashSet}; +use std::fmt::Display; use std::path::Path; use std::sync::Arc; @@ -42,6 +43,10 @@ pub struct PipelineConfig { #[serde(default)] settings: PipelineSettings, + /// Quota for this pipeline. + #[serde(default)] + quota: Quota, + /// All nodes in this pipeline, keyed by node ID. #[serde(default)] nodes: PipelineNodes, @@ -73,6 +78,75 @@ pub enum PipelineType { Otap, } +/// Pipeline quota configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] +#[serde(deny_unknown_fields)] +pub struct Quota { + /// CPU core allocation strategy for this pipeline. + #[serde(default)] + pub core_allocation: CoreAllocation, +} + +/// Defines how CPU cores should be allocated for pipeline execution. +#[derive(Debug, Default, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum CoreAllocation { + /// Use all available CPU cores. + #[default] + AllCores, + /// Use a specific number of CPU cores (starting from core 0). + /// If the requested number exceeds available cores, use all available cores. + CoreCount { + /// Number of cores to use. If 0, uses all available cores. + count: usize, + }, + /// Defines a set of CPU cores should be allocated for pipeline execution. + CoreSet { + /// Core set defined as a set of ranges. + set: Vec, + }, +} + +impl Display for CoreAllocation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CoreAllocation::AllCores => write!(f, "*"), + CoreAllocation::CoreCount { count } => write!(f, "[{count} cores]"), + CoreAllocation::CoreSet { set } => { + let mut first = true; + for item in set { + if !first { + write!(f, ",")? + } + write!(f, "{item}")?; + first = false + } + Ok(()) + } + } + } +} + +/// Defines a range of CPU cores should be allocated for pipeline execution. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub struct CoreRange { + /// Start core ID (inclusive). + pub start: usize, + /// End core ID (inclusive). + pub end: usize, +} + +impl Display for CoreRange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.start == self.end { + write!(f, "{}", self.start) + } else { + write!(f, "{}-{}", self.start, self.end) + } + } +} + /// A collection of nodes forming a pipeline graph. /// /// Note: We use `Arc` to allow sharing the same pipeline configuration @@ -446,6 +520,17 @@ impl PipelineConfig { &self.settings } + /// Returns the quota configuration for this pipeline. + #[must_use] + pub fn quota(&self) -> &Quota { + &self.quota + } + + /// Sets the quota configuration for this pipeline. + pub fn set_quota(&mut self, quota: Quota) { + self.quota = quota; + } + /// Returns a reference to the main pipeline nodes. #[must_use] pub fn nodes(&self) -> &PipelineNodes { @@ -791,6 +876,7 @@ impl PipelineConfigBuilder { .collect(), internal: PipelineNodes(HashMap::new()), settings: PipelineSettings::default(), + quota: Quota::default(), r#type: pipeline_type, service: ServiceConfig::default(), }; @@ -817,9 +903,41 @@ mod tests { MetricsReaderConfig, MetricsReaderPeriodicConfig, }; use crate::pipeline::service::telemetry::{AttributeValue, TelemetryConfig}; - use crate::pipeline::{PipelineConfigBuilder, PipelineType}; + use crate::pipeline::{CoreAllocation, CoreRange, PipelineConfigBuilder, PipelineType}; use serde_json::json; + #[test] + fn test_core_allocation_display_all_cores() { + let allocation = CoreAllocation::AllCores; + assert_eq!(allocation.to_string(), "*"); + } + + #[test] + fn test_core_allocation_display_core_count() { + let allocation = CoreAllocation::CoreCount { count: 4 }; + assert_eq!(allocation.to_string(), "[4 cores]"); + } + + #[test] + fn test_core_allocation_display_core_set_single_range() { + let allocation = CoreAllocation::CoreSet { + set: vec![CoreRange { start: 0, end: 3 }], + }; + assert_eq!(allocation.to_string(), "0-3"); + } + + #[test] + fn test_core_allocation_display_core_set_multiple_ranges() { + let allocation = CoreAllocation::CoreSet { + set: vec![ + CoreRange { start: 0, end: 3 }, + CoreRange { start: 8, end: 11 }, + CoreRange { start: 16, end: 16 }, + ], + }; + assert_eq!(allocation.to_string(), "0-3,8-11,16"); + } + #[test] fn test_duplicate_node_errors() { let result = PipelineConfigBuilder::new() diff --git a/rust/otap-dataflow/crates/config/src/pipeline_group.rs b/rust/otap-dataflow/crates/config/src/pipeline_group.rs index f6b41e35dc..6a8d451819 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline_group.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline_group.rs @@ -9,7 +9,6 @@ use crate::{PipelineGroupId, PipelineId}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::fmt::Display; /// Configuration for a single pipeline group. /// Contains group-specific settings and all its pipelines. @@ -18,10 +17,6 @@ use std::fmt::Display; pub struct PipelineGroupConfig { /// All pipelines belonging to this pipeline group, keyed by pipeline ID. pub pipelines: HashMap, - - /// Quota for the pipeline group. - #[serde(default)] - pub quota: Quota, } impl PipelineGroupConfig { @@ -30,15 +25,9 @@ impl PipelineGroupConfig { pub fn new() -> Self { Self { pipelines: HashMap::new(), - quota: Default::default(), } } - /// Sets the quota for the pipeline group. - pub fn set_quota(&mut self, quota: Quota) { - self.quota = quota; - } - /// Adds a pipeline to the pipeline group. pub fn add_pipeline( &mut self, @@ -74,109 +63,3 @@ impl Default for PipelineGroupConfig { Self::new() } } - -/// Pipeline group quota configuration. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] -#[serde(deny_unknown_fields)] -pub struct Quota { - /// CPU core allocation strategy for this pipeline group. - #[serde(default)] - pub core_allocation: CoreAllocation, -} - -/// Defines how CPU cores should be allocated for pipeline execution. -#[derive(Debug, Default, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum CoreAllocation { - /// Use all available CPU cores. - #[default] - AllCores, - /// Use a specific number of CPU cores (starting from core 0). - /// If the requested number exceeds available cores, use all available cores. - CoreCount { - /// Number of cores to use. If 0, uses all available cores. - count: usize, - }, - /// Defines a set of CPU cores should be allocated for pipeline execution. - CoreSet { - /// Core set defined as a set of ranges. - set: Vec, - }, -} - -impl Display for CoreAllocation { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - CoreAllocation::AllCores => write!(f, "*"), - CoreAllocation::CoreCount { count } => write!(f, "[{count} cores]"), - CoreAllocation::CoreSet { set } => { - let mut first = true; - for item in set { - if !first { - write!(f, ",")? - } - write!(f, "{item}")?; - first = false - } - Ok(()) - } - } - } -} - -/// Defines a range of CPU cores should be allocated for pipeline execution. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] -#[serde(tag = "type", rename_all = "snake_case")] -pub struct CoreRange { - /// Start core ID (inclusive). - pub start: usize, - /// End core ID (inclusive). - pub end: usize, -} - -impl Display for CoreRange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.start == self.end { - write!(f, "{}", self.start) - } else { - write!(f, "{}-{}", self.start, self.end) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_core_allocation_display_all_cores() { - let allocation = CoreAllocation::AllCores; - assert_eq!(allocation.to_string(), "*"); - } - - #[test] - fn test_core_allocation_display_core_count() { - let allocation = CoreAllocation::CoreCount { count: 4 }; - assert_eq!(allocation.to_string(), "[4 cores]"); - } - - #[test] - fn test_core_allocation_display_core_set_single_range() { - let allocation = CoreAllocation::CoreSet { - set: vec![CoreRange { start: 0, end: 3 }], - }; - assert_eq!(allocation.to_string(), "0-3"); - } - - #[test] - fn test_core_allocation_display_core_set_multiple_ranges() { - let allocation = CoreAllocation::CoreSet { - set: vec![ - CoreRange { start: 0, end: 3 }, - CoreRange { start: 8, end: 11 }, - CoreRange { start: 16, end: 16 }, - ], - }; - assert_eq!(allocation.to_string(), "0-3,8-11,16"); - } -} diff --git a/rust/otap-dataflow/crates/controller/src/error.rs b/rust/otap-dataflow/crates/controller/src/error.rs index 53ca9aa299..e9d9ef898b 100644 --- a/rust/otap-dataflow/crates/controller/src/error.rs +++ b/rust/otap-dataflow/crates/controller/src/error.rs @@ -4,7 +4,7 @@ //! Errors for the controller crate. use miette::Diagnostic; -use otap_df_config::pipeline_group::CoreAllocation; +use otap_df_config::pipeline::CoreAllocation; /// Errors that can occur in the controller crate. #[derive(thiserror::Error, Debug, Diagnostic)] diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 0d6129cda7..45100ec34a 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -23,8 +23,7 @@ use core_affinity::CoreId; use otap_df_config::engine::HttpAdminSettings; use otap_df_config::{ PipelineGroupId, PipelineId, - pipeline::PipelineConfig, - pipeline_group::{CoreAllocation, Quota}, + pipeline::{CoreAllocation, PipelineConfig, Quota}, }; use otap_df_engine::PipelineFactory; use otap_df_engine::context::{ControllerContext, PipelineContext}; @@ -63,13 +62,12 @@ impl Controller { Self { pipeline_factory } } - /// Starts the controller with the given pipeline configuration and quota. + /// Starts the controller with the given pipeline configuration. pub fn run_forever( &self, pipeline_group_id: PipelineGroupId, pipeline_id: PipelineId, pipeline: PipelineConfig, - quota: Quota, admin_settings: HttpAdminSettings, ) -> Result<(), Error> { // Initialize metrics system and observed event store. @@ -115,6 +113,8 @@ impl Controller { obs_state_store.run(cancellation_token) })?; + let quota = pipeline.quota().clone(); + // Start one thread per requested core // Get available CPU cores for pinning let requested_cores = Self::select_cores_for_quota( @@ -454,7 +454,7 @@ fn error_summary_from_gen(error: &Error) -> ErrorSummary { #[cfg(test)] mod tests { use super::*; - use otap_df_config::pipeline_group::CoreRange; + use otap_df_config::pipeline::CoreRange; fn available_core_ids() -> Vec { vec![ diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index 3b075853f5..e407934767 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -5,8 +5,7 @@ use clap::{ArgGroup, Parser}; use otap_df_config::engine::{EngineConfig, HttpAdminSettings}; -use otap_df_config::pipeline::PipelineConfig; -use otap_df_config::pipeline_group::{CoreAllocation, CoreRange, Quota}; +use otap_df_config::pipeline::{CoreAllocation, CoreRange, PipelineConfig, Quota}; use otap_df_config::{PipelineGroupId, PipelineId}; use otap_df_controller::Controller; use otap_df_otap::OTAP_PIPELINE_FACTORY; @@ -144,12 +143,7 @@ fn main() -> Result<(), Box> { }), (None, None) => None, }; - let core_allocation_override_for_engine = core_allocation_override.clone(); - let core_allocation_override_for_pipeline = core_allocation_override; - let http_admin_bind_for_engine = http_admin_bind.clone(); - let http_admin_bind_for_pipeline = http_admin_bind; - - let (pipeline_group_id, pipeline_id, pipeline_cfg, quota, admin_settings) = + let (pipeline_group_id, pipeline_id, mut pipeline_cfg, admin_settings) = if let Some(config_path) = config { let engine_cfg = EngineConfig::from_file(config_path)?; if engine_cfg.pipeline_groups.len() != 1 { @@ -172,11 +166,12 @@ fn main() -> Result<(), Box> { ) .into()); } - let otap_df_config::pipeline_group::PipelineGroupConfig { pipelines, quota } = - pipeline_group_cfg; - let (pipeline_id, pipeline_cfg) = - pipelines.into_iter().next().expect("pipeline missing"); - let admin_settings = if let Some(bind_address) = http_admin_bind_for_engine { + let (pipeline_id, pipeline_cfg) = pipeline_group_cfg + .pipelines + .into_iter() + .next() + .expect("pipeline missing"); + let admin_settings = if let Some(bind_address) = http_admin_bind { HttpAdminSettings { bind_address } } else if let Some(config_admin) = settings.http_admin { config_admin @@ -185,11 +180,7 @@ fn main() -> Result<(), Box> { bind_address: default_admin_bind.clone(), } }; - let quota = match core_allocation_override_for_engine { - Some(core_allocation) => Quota { core_allocation }, - None => quota, - }; - (pipeline_group_id, pipeline_id, pipeline_cfg, quota, admin_settings) + (pipeline_group_id, pipeline_id, pipeline_cfg, admin_settings) } else { // For now, we predefine pipeline group and pipeline IDs. // That will be replaced with a more dynamic approach in the future. @@ -208,16 +199,18 @@ fn main() -> Result<(), Box> { pipeline_id.clone(), &pipeline_path, )?; - let core_allocation = - core_allocation_override_for_pipeline.unwrap_or(CoreAllocation::AllCores); - let quota = Quota { core_allocation }; let admin_settings = HttpAdminSettings { - bind_address: http_admin_bind_for_pipeline - .unwrap_or_else(|| default_admin_bind.clone()), + bind_address: http_admin_bind.unwrap_or_else(|| default_admin_bind.clone()), }; - (pipeline_group_id, pipeline_id, pipeline_cfg, quota, admin_settings) + (pipeline_group_id, pipeline_id, pipeline_cfg, admin_settings) }; + if let Some(core_allocation) = core_allocation_override { + pipeline_cfg.set_quota(Quota { core_allocation }); + } + + let quota = pipeline_cfg.quota().clone(); + // Create controller and start pipeline with multi-core support let controller = Controller::new(&OTAP_PIPELINE_FACTORY); @@ -234,7 +227,6 @@ fn main() -> Result<(), Box> { pipeline_group_id, pipeline_id, pipeline_cfg, - quota, admin_settings, ); match result { From f19b994a145c6739dc869645e36237480914097d Mon Sep 17 00:00:00 2001 From: lquerel Date: Fri, 16 Jan 2026 22:30:30 -0800 Subject: [PATCH 03/13] Support multiple groups and nested pipelines --- .../engine-conf/continuous_benchmark.yaml | 41 +++- .../otap-dataflow/crates/config/src/engine.rs | 39 +--- .../crates/config/src/pipeline.rs | 6 - .../crates/controller/src/lib.rs | 213 ++++++++++++------ .../crates/engine/src/pipeline_ctrl.rs | 5 +- rust/otap-dataflow/crates/state/src/store.rs | 46 +++- rust/otap-dataflow/src/main.rs | 164 +++++++------- 7 files changed, 312 insertions(+), 202 deletions(-) diff --git a/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml b/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml index 68c15c3491..55cb03cfb1 100644 --- a/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml +++ b/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml @@ -17,8 +17,8 @@ pipeline_groups: type: core_set set: - type: CoreRange - start: 20 - end: 45 + start: 10 + end: 35 nodes: receiver: @@ -52,8 +52,8 @@ pipeline_groups: type: core_set set: - type: CoreRange - start: 11 - end: 11 + start: 0 + end: 0 nodes: otlp_recv: kind: receiver @@ -139,8 +139,8 @@ pipeline_groups: type: core_set set: - type: CoreRange - start: 12 - end: 12 + start: 1 + end: 1 nodes: receiver: @@ -158,3 +158,32 @@ pipeline_groups: kind: exporter plugin_urn: urn:otel:noop:exporter config: null + + other_group: + pipelines: + + p1: + quota: + core_allocation: + type: core_set + set: + - type: CoreRange + start: 50 + end: 50 + + nodes: + receiver: + kind: receiver + plugin_urn: urn:otel:otlp:receiver + out_ports: + out_port: + destinations: + - perf_noop + dispatch_strategy: round_robin + config: + listening_addr: 127.0.0.1:4329 + + perf_noop: + kind: exporter + plugin_urn: urn:otel:noop:exporter + config: null diff --git a/rust/otap-dataflow/crates/config/src/engine.rs b/rust/otap-dataflow/crates/config/src/engine.rs index 6b8a9548b4..4c343e893e 100644 --- a/rust/otap-dataflow/crates/config/src/engine.rs +++ b/rust/otap-dataflow/crates/config/src/engine.rs @@ -5,6 +5,8 @@ use crate::PipelineGroupId; use crate::error::{Context, Error}; +use crate::observed_state::ObservedStateSettings; +use crate::pipeline::service::telemetry::TelemetryConfig; use crate::pipeline_group::PipelineGroupConfig; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -24,27 +26,19 @@ pub struct EngineConfig { } /// Global settings for the engine. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] #[serde(deny_unknown_fields)] pub struct EngineSettings { /// Optional HTTP admin server configuration. pub http_admin: Option, - /// Telemetry settings. - #[serde(default = "default_telemetry_settings")] - pub telemetry: TelemetrySettings, -} + /// Telemetry backend configuration shared across pipelines. + #[serde(default)] + pub telemetry: TelemetryConfig, -/// Configuration for the telemetry metrics system. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] -#[serde(deny_unknown_fields)] -pub struct TelemetrySettings { - /// The size of the reporting channel. - #[serde(default = "default_reporting_channel_size")] - pub reporting_channel_size: usize, - /// The interval at which metrics are flushed and aggregated by the collector. - #[serde(default = "default_reporting_interval")] - pub flush_interval: std::time::Duration, + /// Observed state store settings shared across pipelines. + #[serde(default)] + pub observed_state: ObservedStateSettings, } /// Configuration for the HTTP admin endpoints. @@ -64,21 +58,6 @@ impl Default for HttpAdminSettings { } } -const fn default_telemetry_settings() -> TelemetrySettings { - TelemetrySettings { - reporting_channel_size: default_reporting_channel_size(), - flush_interval: default_reporting_interval() - } -} - -const fn default_reporting_channel_size() -> usize { - 100 -} - -const fn default_reporting_interval() -> std::time::Duration { - std::time::Duration::from_secs(1) -} - fn default_bind_address() -> String { "127.0.0.1:8080".into() } diff --git a/rust/otap-dataflow/crates/config/src/pipeline.rs b/rust/otap-dataflow/crates/config/src/pipeline.rs index c8d54697a6..d61fd729ea 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline.rs @@ -8,7 +8,6 @@ pub mod service; use crate::error::{Context, Error, HyperEdgeSpecDetails}; use crate::health::HealthPolicy; use crate::node::{DispatchStrategy, HyperEdgeConfig, NodeKind, NodeUserConfig}; -use crate::observed_state::ObservedStateSettings; use crate::pipeline::service::ServiceConfig; use crate::{Description, NodeId, NodeUrn, PipelineGroupId, PipelineId, PortName}; use schemars::JsonSchema; @@ -343,10 +342,6 @@ pub struct PipelineSettings { #[serde(default = "default_pdata_channel_size")] pub default_pdata_channel_size: usize, - /// Observed state settings. - #[serde(default)] - pub observed_state: ObservedStateSettings, - /// Health policy. #[serde(default)] pub health_policy: HealthPolicy, @@ -414,7 +409,6 @@ impl Default for PipelineSettings { default_node_ctrl_msg_channel_size: default_node_ctrl_msg_channel_size(), default_pipeline_ctrl_msg_channel_size: default_pipeline_ctrl_msg_channel_size(), default_pdata_channel_size: default_pdata_channel_size(), - observed_state: ObservedStateSettings::default(), health_policy: HealthPolicy::default(), telemetry: TelemetrySettings::default(), } diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 45100ec34a..33fdcc8cb0 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -15,15 +15,16 @@ //! - TODO: Live pipeline updates //! - TODO: Better resource control //! - TODO: Monitoring -//! - TODO: Support pipeline groups +//! - TODO: Support multiple pipeline groups use crate::error::Error; use crate::thread_task::spawn_thread_local_task; use core_affinity::CoreId; -use otap_df_config::engine::HttpAdminSettings; +use otap_df_config::engine::{EngineSettings, HttpAdminSettings}; use otap_df_config::{ PipelineGroupId, PipelineId, pipeline::{CoreAllocation, PipelineConfig, Quota}, + pipeline_group::PipelineGroupConfig, }; use otap_df_engine::PipelineFactory; use otap_df_engine::context::{ControllerContext, PipelineContext}; @@ -31,13 +32,14 @@ use otap_df_engine::control::{ PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel, }; use otap_df_engine::error::{Error as EngineError, error_summary_from}; -use otap_df_state::DeployedPipelineKey; +use otap_df_state::{DeployedPipelineKey, PipelineKey}; use otap_df_state::event::{ErrorSummary, ObservedEvent}; use otap_df_state::reporter::ObservedEventReporter; use otap_df_state::store::ObservedStateStore; use otap_df_telemetry::opentelemetry_client::OpentelemetryClient; use otap_df_telemetry::reporter::MetricsReporter; use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn}; +use std::collections::HashMap; use std::thread; /// Error types and helpers for the controller module. @@ -69,27 +71,70 @@ impl Controller { pipeline_id: PipelineId, pipeline: PipelineConfig, admin_settings: HttpAdminSettings, + ) -> Result<(), Error> { + let engine_settings = EngineSettings { + http_admin: None, + telemetry: pipeline.service().telemetry.clone(), + observed_state: Default::default(), + }; + let mut pipeline_group = PipelineGroupConfig::new(); + pipeline_group + .add_pipeline(pipeline_id.clone(), pipeline) + .map_err(|e| Error::InvalidConfiguration { errors: vec![e] })?; + self.run_group_forever(pipeline_group_id, pipeline_group, engine_settings, admin_settings) + } + + /// Starts the controller with the given pipeline group configuration. + pub fn run_group_forever( + &self, + pipeline_group_id: PipelineGroupId, + pipeline_group: PipelineGroupConfig, + engine_settings: EngineSettings, + admin_settings: HttpAdminSettings, + ) -> Result<(), Error> { + let mut pipeline_groups = HashMap::new(); + let _ = pipeline_groups.insert(pipeline_group_id, pipeline_group); + self.run_engine_forever(pipeline_groups, engine_settings, admin_settings) + } + + /// Starts the controller with the given pipeline group configurations. + pub fn run_engine_forever( + &self, + pipeline_groups: HashMap, + engine_settings: EngineSettings, + admin_settings: HttpAdminSettings, ) -> Result<(), Error> { // Initialize metrics system and observed event store. // ToDo A hierarchical metrics system will be implemented to better support hardware with multiple NUMA nodes. - let telemetry_config = &pipeline.service().telemetry; - let settings = pipeline.pipeline_settings(); + let telemetry_config = &engine_settings.telemetry; otel_info!( "Controller.Start", - num_nodes = pipeline.node_iter().count(), - pdata_channel_size = settings.default_pdata_channel_size, - node_ctrl_msg_channel_size = settings.default_node_ctrl_msg_channel_size, - pipeline_ctrl_msg_channel_size = settings.default_pipeline_ctrl_msg_channel_size + num_pipeline_groups = pipeline_groups.len(), + num_pipelines = pipeline_groups + .values() + .map(|group| group.pipelines.len()) + .sum::() ); let opentelemetry_client = OpentelemetryClient::new(telemetry_config)?; let metrics_system = InternalTelemetrySystem::new(telemetry_config); let metrics_dispatcher = metrics_system.dispatcher(); let metrics_reporter = metrics_system.reporter(); let controller_ctx = ControllerContext::new(metrics_system.registry()); - let obs_state_store = ObservedStateStore::new(pipeline.pipeline_settings()); + let obs_state_store = ObservedStateStore::new(&engine_settings.observed_state); let obs_evt_reporter = obs_state_store.reporter(); // Only the reporting API let obs_state_handle = obs_state_store.handle(); // Only the querying API + for (pipeline_group_id, pipeline_group) in pipeline_groups.iter() { + for (pipeline_id, pipeline) in pipeline_group.pipelines.iter() { + let pipeline_key = + PipelineKey::new(pipeline_group_id.clone(), pipeline_id.clone()); + obs_state_store.register_pipeline_health_policy( + pipeline_key, + pipeline.pipeline_settings().health_policy.clone(), + ); + } + } + // Start the metrics aggregation let telemetry_registry = metrics_system.registry(); let metrics_agg_handle = @@ -113,65 +158,86 @@ impl Controller { obs_state_store.run(cancellation_token) })?; - let quota = pipeline.quota().clone(); - - // Start one thread per requested core - // Get available CPU cores for pinning - let requested_cores = Self::select_cores_for_quota( - core_affinity::get_core_ids().ok_or_else(|| Error::CoreDetectionUnavailable)?, - quota, - )?; - let mut threads = Vec::with_capacity(requested_cores.len()); - let mut ctrl_msg_senders = Vec::with_capacity(requested_cores.len()); - - // ToDo [LQ] Support multiple pipeline groups in the future. - - for (thread_id, core_id) in requested_cores.into_iter().enumerate() { - let pipeline_key = DeployedPipelineKey { - pipeline_group_id: pipeline_group_id.clone(), - pipeline_id: pipeline_id.clone(), - core_id: core_id.id, - }; - let (pipeline_ctrl_msg_tx, pipeline_ctrl_msg_rx) = pipeline_ctrl_msg_channel( - pipeline - .pipeline_settings() - .default_pipeline_ctrl_msg_channel_size, - ); - ctrl_msg_senders.push(pipeline_ctrl_msg_tx.clone()); - - let pipeline_config = pipeline.clone(); - let pipeline_factory = self.pipeline_factory; - let pipeline_handle = controller_ctx.pipeline_context_with( - pipeline_group_id.clone(), - pipeline_id.clone(), - core_id.id, - thread_id, - ); - let metrics_reporter = metrics_reporter.clone(); - - let thread_name = format!("pipeline-core-{}", core_id.id); - let obs_evt_reporter = obs_evt_reporter.clone(); - let handle = thread::Builder::new() - .name(thread_name.clone()) - .spawn(move || { - Self::run_pipeline_thread( - pipeline_key, - core_id, - pipeline_config, - pipeline_factory, - pipeline_handle, - obs_evt_reporter, - metrics_reporter, - pipeline_ctrl_msg_tx, - pipeline_ctrl_msg_rx, - ) - }) - .map_err(|e| Error::ThreadSpawnError { - thread_name: thread_name.clone(), - source: e, - })?; - - threads.push((thread_name, thread_id, core_id.id, handle)); + let pipeline_count: usize = pipeline_groups + .values() + .map(|group| group.pipelines.len()) + .sum(); + let available_core_ids = if pipeline_count == 0 { + Vec::new() + } else { + core_affinity::get_core_ids().ok_or_else(|| Error::CoreDetectionUnavailable)? + }; + let mut threads = Vec::new(); + let mut ctrl_msg_senders = Vec::new(); + + for (pipeline_group_id, pipeline_group) in pipeline_groups { + for (pipeline_id, pipeline) in pipeline_group.pipelines { + let quota = pipeline.quota().clone(); + let requested_cores = + Self::select_cores_for_quota(available_core_ids.clone(), quota)?; + + for (thread_id, core_id) in requested_cores.into_iter().enumerate() { + let pipeline_key = DeployedPipelineKey { + pipeline_group_id: pipeline_group_id.clone(), + pipeline_id: pipeline_id.clone(), + core_id: core_id.id, + }; + let (pipeline_ctrl_msg_tx, pipeline_ctrl_msg_rx) = pipeline_ctrl_msg_channel( + pipeline + .pipeline_settings() + .default_pipeline_ctrl_msg_channel_size, + ); + ctrl_msg_senders.push(pipeline_ctrl_msg_tx.clone()); + + let pipeline_config = pipeline.clone(); + let pipeline_factory = self.pipeline_factory; + let pipeline_handle = controller_ctx.pipeline_context_with( + pipeline_group_id.clone(), + pipeline_id.clone(), + core_id.id, + thread_id, + ); + let metrics_reporter = metrics_reporter.clone(); + + let thread_name = format!( + "pipeline-{}-{}-core-{}", + pipeline_group_id.as_ref(), + pipeline_id.as_ref(), + core_id.id + ); + let obs_evt_reporter = obs_evt_reporter.clone(); + let group_id = pipeline_group_id.clone(); + let pipeline_id = pipeline_id.clone(); + let handle = thread::Builder::new() + .name(thread_name.clone()) + .spawn(move || { + Self::run_pipeline_thread( + pipeline_key, + core_id, + pipeline_config, + pipeline_factory, + pipeline_handle, + obs_evt_reporter, + metrics_reporter, + pipeline_ctrl_msg_tx, + pipeline_ctrl_msg_rx, + ) + }) + .map_err(|e| Error::ThreadSpawnError { + thread_name: thread_name.clone(), + source: e, + })?; + + threads.push(( + thread_name, + thread_id, + core_id.id, + group_id, + pipeline_id, + handle, + )); + } + } } // Drop the original metrics sender so only pipeline threads hold references @@ -202,9 +268,9 @@ impl Controller { // Wait for all pipeline threads to finish and collect their results let mut results: Vec> = Vec::with_capacity(threads.len()); - for (thread_name, thread_id, core_id, handle) in threads { + for (thread_name, thread_id, core_id, group_id, pipeline_id, handle) in threads { let pipeline_key = DeployedPipelineKey { - pipeline_group_id: pipeline_group_id.clone(), + pipeline_group_id: group_id.clone(), pipeline_id: pipeline_id.clone(), core_id, }; @@ -243,6 +309,11 @@ impl Controller { } } + // Check if any pipeline threads returned an error + if let Some(err) = results.into_iter().find_map(Result::err) { + return Err(err); + } + // ToDo Add CTRL-C handler to initiate graceful shutdown of pipelines and admin server. // In this project phase (alpha), we park the main thread indefinitely. This is useful for diff --git a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs index b0f8ea849b..545284b124 100644 --- a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs +++ b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs @@ -420,6 +420,7 @@ mod tests { use crate::node::{NodeId, NodeType}; use crate::shared::message::{SharedReceiver, SharedSender}; use crate::testing::test_nodes; + use otap_df_config::observed_state::ObservedStateSettings; use otap_df_config::pipeline::PipelineSettings; use otap_df_config::{PipelineGroupId, PipelineId}; use otap_df_state::store::ObservedStateStore; @@ -461,7 +462,7 @@ mod tests { let metrics_system = otap_df_telemetry::InternalTelemetrySystem::default(); let metrics_reporter = metrics_system.reporter(); let pipeline_settings = PipelineSettings::default(); - let observed_state_store = ObservedStateStore::new(&pipeline_settings); + let observed_state_store = ObservedStateStore::new(&ObservedStateSettings::default()); let pipeline_group_id: PipelineGroupId = Default::default(); let pipeline_id: PipelineId = Default::default(); let core_id = 0; @@ -873,7 +874,7 @@ mod tests { let metrics_system = otap_df_telemetry::InternalTelemetrySystem::default(); let metrics_reporter = metrics_system.reporter(); let pipeline_settings = PipelineSettings::default(); - let observed_state_store = ObservedStateStore::new(&pipeline_settings); + let observed_state_store = ObservedStateStore::new(&ObservedStateSettings::default()); let pipeline_key = DeployedPipelineKey { pipeline_group_id: Default::default(), pipeline_id: Default::default(), diff --git a/rust/otap-dataflow/crates/state/src/store.rs b/rust/otap-dataflow/crates/state/src/store.rs index c7ad071f8e..dba66e2379 100644 --- a/rust/otap-dataflow/crates/state/src/store.rs +++ b/rust/otap-dataflow/crates/state/src/store.rs @@ -10,7 +10,8 @@ use crate::phase::PipelinePhase; use crate::pipeline_rt_status::{ApplyOutcome, PipelineRuntimeStatus}; use crate::pipeline_status::PipelineStatus; use crate::reporter::ObservedEventReporter; -use otap_df_config::pipeline::PipelineSettings; +use otap_df_config::health::HealthPolicy; +use otap_df_config::observed_state::ObservedStateSettings; use serde::{Serialize, Serializer}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -27,7 +28,13 @@ const RECENT_EVENTS_CAPACITY: usize = 10; #[derive(Debug, Clone, Serialize)] pub struct ObservedStateStore { #[serde(skip)] - config: PipelineSettings, + config: ObservedStateSettings, + + #[serde(skip)] + default_health_policy: HealthPolicy, + + #[serde(skip)] + health_policies: Arc>>, #[serde(skip)] sender: flume::Sender, @@ -71,12 +78,13 @@ where impl ObservedStateStore { /// Creates a new `ObservedStateStore` with the given configuration. #[must_use] - pub fn new(config: &PipelineSettings) -> Self { - let (sender, receiver) = - flume::bounded::(config.observed_state.reporting_channel_size); + pub fn new(config: &ObservedStateSettings) -> Self { + let (sender, receiver) = flume::bounded::(config.reporting_channel_size); Self { config: config.clone(), + default_health_policy: HealthPolicy::default(), + health_policies: Arc::new(Mutex::new(HashMap::new())), sender, receiver, pipelines: Arc::new(Mutex::new(HashMap::new())), @@ -86,10 +94,22 @@ impl ObservedStateStore { /// Returns a reporter that can be used to send observed events to this store. #[must_use] pub fn reporter(&self) -> ObservedEventReporter { - ObservedEventReporter::new( - self.config.observed_state.reporting_timeout, - self.sender.clone(), - ) + ObservedEventReporter::new(self.config.reporting_timeout, self.sender.clone()) + } + + /// Registers or updates the health policy for a specific pipeline. + pub fn register_pipeline_health_policy( + &self, + pipeline_key: PipelineKey, + health_policy: HealthPolicy, + ) { + let mut policies = self.health_policies.lock().unwrap_or_else(|poisoned| { + log::warn!( + "ObservedStateStore health policy mutex was poisoned; continuing with possibly inconsistent state" + ); + poisoned.into_inner() + }); + _ = policies.insert(pipeline_key, health_policy); } /// Returns a handle that can be used to read the current observed state. @@ -127,9 +147,15 @@ impl ObservedStateStore { pipeline_id: observed_event.key.pipeline_id.clone(), }; + let health_policy = self + .health_policies + .lock() + .ok() + .and_then(|policies| policies.get(&pipeline_key).cloned()) + .unwrap_or_else(|| self.default_health_policy.clone()); let ps = pipelines .entry(pipeline_key) - .or_insert_with(|| PipelineStatus::new(self.config.health_policy.clone())); + .or_insert_with(|| PipelineStatus::new(health_policy)); // Upsert the core record and its condition snapshot let cs = ps diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index e407934767..bdfe7ab806 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -143,92 +143,102 @@ fn main() -> Result<(), Box> { }), (None, None) => None, }; - let (pipeline_group_id, pipeline_id, mut pipeline_cfg, admin_settings) = - if let Some(config_path) = config { - let engine_cfg = EngineConfig::from_file(config_path)?; - if engine_cfg.pipeline_groups.len() != 1 { - return Err(format!( - "Engine config must define exactly one pipeline group, found {}", - engine_cfg.pipeline_groups.len() - ) - .into()); - } - let EngineConfig { - settings, - pipeline_groups, - } = engine_cfg; - let (pipeline_group_id, pipeline_group_cfg) = - pipeline_groups.into_iter().next().expect("pipeline group missing"); - if pipeline_group_cfg.pipelines.len() != 1 { - return Err(format!( - "Engine config must define exactly one pipeline, found {}", - pipeline_group_cfg.pipelines.len() - ) - .into()); - } - let (pipeline_id, pipeline_cfg) = pipeline_group_cfg - .pipelines - .into_iter() - .next() - .expect("pipeline missing"); - let admin_settings = if let Some(bind_address) = http_admin_bind { - HttpAdminSettings { bind_address } - } else if let Some(config_admin) = settings.http_admin { - config_admin - } else { - HttpAdminSettings { - bind_address: default_admin_bind.clone(), + // Create controller and start pipeline with multi-core support + let controller = Controller::new(&OTAP_PIPELINE_FACTORY); + let core_allocation_override_engine = core_allocation_override.clone(); + let core_allocation_override_pipeline = core_allocation_override; + let http_admin_bind_engine = http_admin_bind.clone(); + let http_admin_bind_pipeline = http_admin_bind; + + let result = if let Some(config_path) = config { + let engine_cfg = EngineConfig::from_file(config_path)?; + let EngineConfig { + settings: engine_settings, + mut pipeline_groups, + } = engine_cfg; + if let Some(core_allocation) = core_allocation_override_engine { + for pipeline_group in pipeline_groups.values_mut() { + for pipeline_cfg in pipeline_group.pipelines.values_mut() { + pipeline_cfg.set_quota(Quota { + core_allocation: core_allocation.clone(), + }); } - }; - (pipeline_group_id, pipeline_id, pipeline_cfg, admin_settings) + } + } + let admin_settings = if let Some(bind_address) = http_admin_bind_engine { + HttpAdminSettings { bind_address } + } else if let Some(config_admin) = engine_settings.http_admin.clone() { + config_admin } else { - // For now, we predefine pipeline group and pipeline IDs. - // That will be replaced with a more dynamic approach in the future. - let pipeline_group_id: PipelineGroupId = "default_pipeline_group".into(); - let pipeline_id: PipelineId = "default_pipeline".into(); - let pipeline_path = match pipeline { - Some(path) => path, - None => { - return Err("Missing --pipeline argument".into()); - } - }; - - // Load pipeline configuration from file - let pipeline_cfg = PipelineConfig::from_file( - pipeline_group_id.clone(), - pipeline_id.clone(), - &pipeline_path, - )?; - let admin_settings = HttpAdminSettings { - bind_address: http_admin_bind.unwrap_or_else(|| default_admin_bind.clone()), - }; - (pipeline_group_id, pipeline_id, pipeline_cfg, admin_settings) + HttpAdminSettings { + bind_address: default_admin_bind.clone(), + } }; - if let Some(core_allocation) = core_allocation_override { - pipeline_cfg.set_quota(Quota { core_allocation }); - } + for (pipeline_group_id, pipeline_group_cfg) in pipeline_groups.iter() { + for (pipeline_id, pipeline_cfg) in pipeline_group_cfg.pipelines.iter() { + let quota = pipeline_cfg.quota(); + match "a.core_allocation { + CoreAllocation::AllCores => println!( + "Requested core allocation for {}:{}: all available cores", + pipeline_group_id.as_ref(), + pipeline_id.as_ref() + ), + CoreAllocation::CoreCount { count } => println!( + "Requested core allocation for {}:{}: {count} cores", + pipeline_group_id.as_ref(), + pipeline_id.as_ref() + ), + CoreAllocation::CoreSet { .. } => println!( + "Requested core allocation for {}:{}: {}", + pipeline_group_id.as_ref(), + pipeline_id.as_ref(), + quota.core_allocation + ), + } + } + } - let quota = pipeline_cfg.quota().clone(); + controller.run_engine_forever(pipeline_groups, engine_settings, admin_settings) + } else { + // For now, we predefine pipeline group and pipeline IDs. + // That will be replaced with a more dynamic approach in the future. + let pipeline_group_id: PipelineGroupId = "default_pipeline_group".into(); + let pipeline_id: PipelineId = "default_pipeline".into(); + let pipeline_path = match pipeline { + Some(path) => path, + None => { + return Err("Missing --pipeline argument".into()); + } + }; - // Create controller and start pipeline with multi-core support - let controller = Controller::new(&OTAP_PIPELINE_FACTORY); + // Load pipeline configuration from file + let mut pipeline_cfg = PipelineConfig::from_file( + pipeline_group_id.clone(), + pipeline_id.clone(), + &pipeline_path, + )?; + if let Some(core_allocation) = core_allocation_override_pipeline { + pipeline_cfg.set_quota(Quota { core_allocation }); + } + let quota = pipeline_cfg.quota().clone(); + let admin_settings = HttpAdminSettings { + bind_address: http_admin_bind_pipeline.unwrap_or_else(|| default_admin_bind.clone()), + }; - // Print the requested core configuration - match "a.core_allocation { - CoreAllocation::AllCores => println!("Requested core allocation: all available cores"), - CoreAllocation::CoreCount { count } => println!("Requested core allocation: {count} cores"), - CoreAllocation::CoreSet { .. } => { - println!("Requested core allocation: {}", quota.core_allocation); + // Print the requested core configuration + match "a.core_allocation { + CoreAllocation::AllCores => println!("Requested core allocation: all available cores"), + CoreAllocation::CoreCount { count } => { + println!("Requested core allocation: {count} cores") + } + CoreAllocation::CoreSet { .. } => { + println!("Requested core allocation: {}", quota.core_allocation); + } } - } - let result = controller.run_forever( - pipeline_group_id, - pipeline_id, - pipeline_cfg, - admin_settings, - ); + controller.run_forever(pipeline_group_id, pipeline_id, pipeline_cfg, admin_settings) + }; match result { Ok(_) => { println!("Pipeline run successfully"); From 5b931d03d9c2239c9474192049fb631244b36fe4 Mon Sep 17 00:00:00 2001 From: lquerel Date: Sun, 18 Jan 2026 22:45:27 -0800 Subject: [PATCH 04/13] Fix merge --- rust/otap-dataflow/crates/controller/src/lib.rs | 2 +- rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index ad719d5d79..306d6865b0 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -119,7 +119,7 @@ impl Controller { let metrics_dispatcher = telemetry_system.dispatcher(); let metrics_reporter = telemetry_system.reporter(); let controller_ctx = ControllerContext::new(telemetry_system.registry()); - let obs_state_store = ObservedStateStore::new(pipeline.pipeline_settings()); + let obs_state_store = ObservedStateStore::new(&engine_settings.observed_state); let obs_evt_reporter = obs_state_store.reporter(); // Only the reporting API let obs_state_handle = obs_state_store.handle(); // Only the querying API diff --git a/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs b/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs index ba3d89660b..40f2a35170 100644 --- a/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs @@ -9,6 +9,7 @@ //! and metric sets are unregistered to avoid registry leaks. use otap_df_config::pipeline::{PipelineConfig, PipelineConfigBuilder, PipelineType}; +use otap_df_config::observed_state::ObservedStateSettings; use otap_df_config::{PipelineGroupId, PipelineId}; use otap_df_engine::context::ControllerContext; use otap_df_engine::control::{PipelineControlMsg, pipeline_ctrl_msg_channel}; @@ -65,7 +66,7 @@ fn test_telemetry_registries_cleanup() { let (pipeline_ctrl_tx, pipeline_ctrl_rx) = pipeline_ctrl_msg_channel(pipeline_settings.default_pipeline_ctrl_msg_channel_size); let pipeline_ctrl_tx_for_shutdown = pipeline_ctrl_tx.clone(); - let observed_state_store = ObservedStateStore::new(&pipeline_settings); + let observed_state_store = ObservedStateStore::new(&ObservedStateSettings::default()); let pipeline_key = DeployedPipelineKey { pipeline_group_id, From 677295090ce4875a332f23ad7ceae705b17e0a8f Mon Sep 17 00:00:00 2001 From: lquerel Date: Sun, 18 Jan 2026 22:55:29 -0800 Subject: [PATCH 05/13] Document engine config --- .../engine-conf/continuous_benchmark.yaml | 43 +++++-------------- 1 file changed, 11 insertions(+), 32 deletions(-) diff --git a/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml b/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml index 55cb03cfb1..b1cb7ec24c 100644 --- a/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml +++ b/rust/otap-dataflow/configs/engine-conf/continuous_benchmark.yaml @@ -1,11 +1,19 @@ +# This configuration file reproduces the continuous benchmarking setup used +# in our CI pipelines. However, the systems: traffic generator, system under +# test, and backend are all included in a single configuration for easier +# local testing and debugging. + +# Engine-wide settings settings: http_admin: bind_address: 127.0.0.1:8085 +# Pipeline groups are used to logically separate different sets of +# pipelines. We will introduce progressively more features around +# pipeline groups in the future such as resource quotas, scheduling +# policies, named channels, group life-cycle management, ... pipeline_groups: - # ======================================================================== - # All the pipelines required for continuous benchmarking in a single group - # ======================================================================== + # This group contains all the pipelines required for continuous benchmarking continuous_benchmark: pipelines: # ====================================================================== @@ -158,32 +166,3 @@ pipeline_groups: kind: exporter plugin_urn: urn:otel:noop:exporter config: null - - other_group: - pipelines: - - p1: - quota: - core_allocation: - type: core_set - set: - - type: CoreRange - start: 50 - end: 50 - - nodes: - receiver: - kind: receiver - plugin_urn: urn:otel:otlp:receiver - out_ports: - out_port: - destinations: - - perf_noop - dispatch_strategy: round_robin - config: - listening_addr: 127.0.0.1:4329 - - perf_noop: - kind: exporter - plugin_urn: urn:otel:noop:exporter - config: null From e97c4239fb72c34f2d11261eef585aac92bda8e9 Mon Sep 17 00:00:00 2001 From: lquerel Date: Sun, 18 Jan 2026 23:53:02 -0800 Subject: [PATCH 06/13] A bit of refactoring in the main and the controller to make things nicer --- .../otap-dataflow/crates/config/src/engine.rs | 22 ++- .../crates/controller/src/lib.rs | 54 ++------ .../crates/engine/src/pipeline_ctrl.rs | 4 +- .../crates/otap/tests/pipeline_tests.rs | 2 +- rust/otap-dataflow/src/main.rs | 125 +++++++----------- 5 files changed, 80 insertions(+), 127 deletions(-) diff --git a/rust/otap-dataflow/crates/config/src/engine.rs b/rust/otap-dataflow/crates/config/src/engine.rs index 4c343e893e..ce1fbe6261 100644 --- a/rust/otap-dataflow/crates/config/src/engine.rs +++ b/rust/otap-dataflow/crates/config/src/engine.rs @@ -3,11 +3,12 @@ //! The configuration for the dataflow engine. -use crate::PipelineGroupId; use crate::error::{Context, Error}; use crate::observed_state::ObservedStateSettings; +use crate::pipeline::PipelineConfig; use crate::pipeline::service::telemetry::TelemetryConfig; use crate::pipeline_group::PipelineGroupConfig; +use crate::{PipelineGroupId, PipelineId}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -133,6 +134,25 @@ impl EngineConfig { } } + /// Creates a new `EngineConfig` from a single pipeline definition. + pub fn from_pipeline( + pipeline_group_id: PipelineGroupId, + pipeline_id: PipelineId, + pipeline: PipelineConfig, + settings: EngineSettings, + ) -> Result { + let mut pipeline_group = PipelineGroupConfig::new(); + pipeline_group.add_pipeline(pipeline_id, pipeline)?; + let mut pipeline_groups = HashMap::new(); + let _ = pipeline_groups.insert(pipeline_group_id, pipeline_group); + let config = EngineConfig { + settings, + pipeline_groups, + }; + config.validate()?; + Ok(config) + } + /// Validates the engine configuration and returns a [`Error::InvalidConfiguration`] error /// containing all validation errors found in the pipeline groups. pub fn validate(&self) -> Result<(), Error> { diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 306d6865b0..88abbd2e44 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -20,7 +20,7 @@ use crate::error::Error; use crate::thread_task::spawn_thread_local_task; use core_affinity::CoreId; -use otap_df_config::engine::{EngineSettings, HttpAdminSettings}; +use otap_df_config::engine::{EngineConfig, EngineSettings, HttpAdminSettings}; use otap_df_config::{ PipelineGroupId, PipelineId, pipeline::{CoreAllocation, PipelineConfig, Quota}, @@ -33,10 +33,10 @@ use otap_df_engine::control::{ }; use otap_df_engine::entity_context::set_pipeline_entity_key; use otap_df_engine::error::{Error as EngineError, error_summary_from}; -use otap_df_state::{DeployedPipelineKey, PipelineKey}; use otap_df_state::event::{ErrorSummary, ObservedEvent}; use otap_df_state::reporter::ObservedEventReporter; use otap_df_state::store::ObservedStateStore; +use otap_df_state::{DeployedPipelineKey, PipelineKey}; use otap_df_telemetry::reporter::MetricsReporter; use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn}; use std::collections::HashMap; @@ -64,46 +64,13 @@ impl Controller { Self { pipeline_factory } } - /// Starts the controller with the given pipeline configuration. - pub fn run_forever( - &self, - pipeline_group_id: PipelineGroupId, - pipeline_id: PipelineId, - pipeline: PipelineConfig, - admin_settings: HttpAdminSettings, - ) -> Result<(), Error> { - let engine_settings = EngineSettings { - http_admin: None, - telemetry: pipeline.service().telemetry.clone(), - observed_state: Default::default(), - }; - let mut pipeline_group = PipelineGroupConfig::new(); - pipeline_group - .add_pipeline(pipeline_id.clone(), pipeline) - .map_err(|e| Error::InvalidConfiguration { errors: vec![e] })?; - self.run_group_forever(pipeline_group_id, pipeline_group, engine_settings, admin_settings) - } - - /// Starts the controller with the given pipeline group configuration. - pub fn run_group_forever( - &self, - pipeline_group_id: PipelineGroupId, - pipeline_group: PipelineGroupConfig, - engine_settings: EngineSettings, - admin_settings: HttpAdminSettings, - ) -> Result<(), Error> { - let mut pipeline_groups = HashMap::new(); - let _ = pipeline_groups.insert(pipeline_group_id, pipeline_group); - self.run_engine_forever(pipeline_groups, engine_settings, admin_settings) - } - - /// Starts the controller with the given pipeline group configurations. - pub fn run_engine_forever( - &self, - pipeline_groups: HashMap, - engine_settings: EngineSettings, - admin_settings: HttpAdminSettings, - ) -> Result<(), Error> { + /// Starts the controller with the given engine configurations. + pub fn run_forever(&self, engine_config: EngineConfig) -> Result<(), Error> { + let EngineConfig { + settings: engine_settings, + pipeline_groups, + } = engine_config; + let admin_settings = engine_settings.http_admin.clone().unwrap_or_default(); // Initialize metrics system and observed event store. // ToDo A hierarchical metrics system will be implemented to better support hardware with multiple NUMA nodes. let telemetry_config = &engine_settings.telemetry; @@ -125,8 +92,7 @@ impl Controller { for (pipeline_group_id, pipeline_group) in pipeline_groups.iter() { for (pipeline_id, pipeline) in pipeline_group.pipelines.iter() { - let pipeline_key = - PipelineKey::new(pipeline_group_id.clone(), pipeline_id.clone()); + let pipeline_key = PipelineKey::new(pipeline_group_id.clone(), pipeline_id.clone()); obs_state_store.register_pipeline_health_policy( pipeline_key, pipeline.pipeline_settings().health_policy.clone(), diff --git a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs index 005049cead..6d5722f292 100644 --- a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs +++ b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs @@ -889,8 +889,8 @@ mod tests { // Create a dummy MetricsReporter for testing let metrics_system = otap_df_telemetry::InternalTelemetrySystem::default(); let metrics_reporter = metrics_system.reporter(); - let pipeline_settings = PipelineSettings::default(); - let observed_state_store = ObservedStateStore::new(&ObservedStateSettings::default()); + let observed_state_store = + ObservedStateStore::new(&ObservedStateSettings::default()); let pipeline_key = DeployedPipelineKey { pipeline_group_id: Default::default(), pipeline_id: Default::default(), diff --git a/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs b/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs index 40f2a35170..794b8cc717 100644 --- a/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs @@ -8,8 +8,8 @@ //! pipeline until a graceful shutdown, and then confirms that all related entities //! and metric sets are unregistered to avoid registry leaks. -use otap_df_config::pipeline::{PipelineConfig, PipelineConfigBuilder, PipelineType}; use otap_df_config::observed_state::ObservedStateSettings; +use otap_df_config::pipeline::{PipelineConfig, PipelineConfigBuilder, PipelineType}; use otap_df_config::{PipelineGroupId, PipelineId}; use otap_df_engine::context::ControllerContext; use otap_df_engine::control::{PipelineControlMsg, pipeline_ctrl_msg_channel}; diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index bdfe7ab806..c7a3a05e56 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -4,7 +4,7 @@ //! Create and run a multi-core pipeline use clap::{ArgGroup, Parser}; -use otap_df_config::engine::{EngineConfig, HttpAdminSettings}; +use otap_df_config::engine::{EngineConfig, EngineSettings, HttpAdminSettings}; use otap_df_config::pipeline::{CoreAllocation, CoreRange, PipelineConfig, Quota}; use otap_df_config::{PipelineGroupId, PipelineId}; use otap_df_controller::Controller; @@ -133,74 +133,41 @@ fn main() -> Result<(), Box> { println!("{}", system_info()); - let default_admin_bind = HttpAdminSettings::default().bind_address; - let core_allocation_override = match (core_id_range, num_cores) { - (Some(range), _) => Some(range), - (None, Some(num_cores)) => Some(if num_cores == 0 { - CoreAllocation::AllCores - } else { - CoreAllocation::CoreCount { count: num_cores } - }), - (None, None) => None, - }; - // Create controller and start pipeline with multi-core support + // For now, we ignore command line core settings when using --config + // and warn the user about it. We need to decide how to handle this properly. + // This may change in the future. + let mut ignored_flags = Vec::new(); + if num_cores.is_some() { + ignored_flags.push("--num-cores"); + } + if core_id_range.is_some() { + ignored_flags.push("--core-id-range"); + } + if http_admin_bind.is_some() { + ignored_flags.push("--http-admin-bind"); + } + if config.is_some() && !ignored_flags.is_empty() { + eprintln!( + "Warning: {} ignored when using --config (for now).", + ignored_flags.join(", ") + ); + } + let controller = Controller::new(&OTAP_PIPELINE_FACTORY); - let core_allocation_override_engine = core_allocation_override.clone(); - let core_allocation_override_pipeline = core_allocation_override; - let http_admin_bind_engine = http_admin_bind.clone(); - let http_admin_bind_pipeline = http_admin_bind; let result = if let Some(config_path) = config { let engine_cfg = EngineConfig::from_file(config_path)?; - let EngineConfig { - settings: engine_settings, - mut pipeline_groups, - } = engine_cfg; - if let Some(core_allocation) = core_allocation_override_engine { - for pipeline_group in pipeline_groups.values_mut() { - for pipeline_cfg in pipeline_group.pipelines.values_mut() { - pipeline_cfg.set_quota(Quota { - core_allocation: core_allocation.clone(), - }); - } - } - } - let admin_settings = if let Some(bind_address) = http_admin_bind_engine { - HttpAdminSettings { bind_address } - } else if let Some(config_admin) = engine_settings.http_admin.clone() { - config_admin - } else { - HttpAdminSettings { - bind_address: default_admin_bind.clone(), - } - }; - - for (pipeline_group_id, pipeline_group_cfg) in pipeline_groups.iter() { - for (pipeline_id, pipeline_cfg) in pipeline_group_cfg.pipelines.iter() { - let quota = pipeline_cfg.quota(); - match "a.core_allocation { - CoreAllocation::AllCores => println!( - "Requested core allocation for {}:{}: all available cores", - pipeline_group_id.as_ref(), - pipeline_id.as_ref() - ), - CoreAllocation::CoreCount { count } => println!( - "Requested core allocation for {}:{}: {count} cores", - pipeline_group_id.as_ref(), - pipeline_id.as_ref() - ), - CoreAllocation::CoreSet { .. } => println!( - "Requested core allocation for {}:{}: {}", - pipeline_group_id.as_ref(), - pipeline_id.as_ref(), - quota.core_allocation - ), - } - } - } - - controller.run_engine_forever(pipeline_groups, engine_settings, admin_settings) + controller.run_forever(engine_cfg) } else { + let core_allocation_override = match (core_id_range, num_cores) { + (Some(range), _) => Some(range), + (None, Some(num_cores)) => Some(if num_cores == 0 { + CoreAllocation::AllCores + } else { + CoreAllocation::CoreCount { count: num_cores } + }), + (None, None) => None, + }; // For now, we predefine pipeline group and pipeline IDs. // That will be replaced with a more dynamic approach in the future. let pipeline_group_id: PipelineGroupId = "default_pipeline_group".into(); @@ -218,26 +185,26 @@ fn main() -> Result<(), Box> { pipeline_id.clone(), &pipeline_path, )?; - if let Some(core_allocation) = core_allocation_override_pipeline { + if let Some(core_allocation) = core_allocation_override { pipeline_cfg.set_quota(Quota { core_allocation }); } - let quota = pipeline_cfg.quota().clone(); let admin_settings = HttpAdminSettings { - bind_address: http_admin_bind_pipeline.unwrap_or_else(|| default_admin_bind.clone()), + bind_address: http_admin_bind + .unwrap_or_else(|| HttpAdminSettings::default().bind_address), + }; + let engine_settings = EngineSettings { + http_admin: Some(admin_settings), + telemetry: pipeline_cfg.service().telemetry.clone(), + observed_state: Default::default(), }; - // Print the requested core configuration - match "a.core_allocation { - CoreAllocation::AllCores => println!("Requested core allocation: all available cores"), - CoreAllocation::CoreCount { count } => { - println!("Requested core allocation: {count} cores") - } - CoreAllocation::CoreSet { .. } => { - println!("Requested core allocation: {}", quota.core_allocation); - } - } - - controller.run_forever(pipeline_group_id, pipeline_id, pipeline_cfg, admin_settings) + let engine_cfg = EngineConfig::from_pipeline( + pipeline_group_id, + pipeline_id, + pipeline_cfg, + engine_settings, + )?; + controller.run_forever(engine_cfg) }; match result { Ok(_) => { From c7beeb02a580dcc029e73ce82e0928b8642af968 Mon Sep 17 00:00:00 2001 From: lquerel Date: Mon, 19 Jan 2026 13:18:03 -0800 Subject: [PATCH 07/13] Improve the controller README.md --- .../otap-dataflow/crates/controller/README.md | 151 +++++++++++++++--- .../crates/controller/src/lib.rs | 40 ++++- 2 files changed, 163 insertions(+), 28 deletions(-) diff --git a/rust/otap-dataflow/crates/controller/README.md b/rust/otap-dataflow/crates/controller/README.md index fe600acffb..51122f7524 100644 --- a/rust/otap-dataflow/crates/controller/README.md +++ b/rust/otap-dataflow/crates/controller/README.md @@ -1,22 +1,133 @@ # OTAP Dataflow Engine Controller -A controller takes a pipeline configuration and initiates one dataflow engine -per core (or less if the number of CPUs or the percentage of CPUs is -specified). - -Each engine is started on a dedicated CPU core (via thread pinning). - -## Roadmap - -- [ ] Basic controller that can start/stop engines (stop is not implemented yet) -- [X] Support for metrics collection and aggregation -- [ ] HTTP admin interface to - - [X] view telemetry - - [ ] manager pipelines -- [ ] NUMA awareness and support for pinning engines to specific NUMA nodes -- [ ] Support for multiple pipeline groups -- [ ] Support for dynamic reconfiguration of pipelines -- [ ] Support for high availability and failover -- [ ] Support for logging and tracing -- [ ] Support for custom plugins and extensions -- [ ] Support for configuration validation and schema management +The OTAP Dataflow Engine Controller is responsible for deploying, managing, and +monitoring pipeline groups within a single OTAP Dataflow Engine process. + +It acts as the local control plane for pipeline execution, CPU resource +allocation, lifecycle management, and inter-pipeline coordination, while +preserving the engine's core design principles: thread-per-core execution, +share-nothing hot paths, and predictable performance. + +## Execution Model + +Each pipeline configuration declares its CPU requirements through quota +settings. Based on these settings, the controller allocates CPU cores and spawns +one dedicated worker thread per assigned core. + +Threads are pinned to distinct CPU cores, following a strict **thread-per-core** +model. A pipeline deployed on `n` cores results in `n` worker threads, each +bound to a specific core for deterministic scheduling and efficient cache +locality. + +Hot data paths are fully contained within each thread. Inter-thread +communication is restricted to control messages and internal telemetry only, +minimizing contention and avoiding shared mutable state on the data plane. + +## CPU Isolation and Core Sharing + +By default, pipelines are expected to run on dedicated CPU cores. This mode +provides the strongest isolation and the most predictable performance. It is +also possible to deploy multiple pipeline configurations on the same CPU +cores, primarily for consolidation, testing, or transitional deployments. This +configuration comes at the cost of reduced efficiency, especially in terms of +CPU cache locality. Even in this mode, pipeline instances run in independent +threads and do not share mutable data structures. + +The controller does not perform implicit work stealing, dynamic scheduling, or +automatic load balancing across threads. Any form of cross-pipeline or +cross-thread data exchange must be explicitly modeled. + +## Control Plane and Lifecycle Management + +Each pipeline is associated with a control channel managed by the controller. +Control messages are used to: + +- start and stop pipelines, +- trigger graceful shutdowns, +- propagate configuration or lifecycle events. + +Graceful shutdown ensures that in-flight work can be completed or drained +according to pipeline semantics before threads are terminated. + +## Load Balancing Today + +At the ingress level, the engine already supports OS-level load balancing via +`SO_REUSEPORT` on platforms that provide it. This mechanism allows incoming +connections or traffic to be distributed across multiple receiver threads +efficiently and with minimal overhead. + +This form of load balancing is intentionally limited to the receiver stage and +relies on operating system support. + +## Future Evolutions + +The controller is designed to evolve alongside the engine. Several major +extensions are planned or under exploration. + +### Named Channels + +Controller-managed **named channels** will be introduced as an explicit +mechanism for connecting pipelines and modeling data exchange between them. +Named channels will be in-memory MPMC channels with well-defined ownership, +capacity, and backpressure semantics. They will be the recommended way to +implement explicit routing and load balancing schemes within the engine, +complementing `SO_REUSEPORT` rather than replacing it. + +Crucially, named channels preserve the design principle that there is no +implicit cross-thread or cross-pipeline load balancing: all such behavior must +be explicitly declared in the pipeline topology. + +### NUMA-Aware Controller + +Future versions of the controller are expected to become fully NUMA-aware. +This includes: + +- allocating pipelines on specific NUMA nodes, +- pinning threads accordingly, +- favoring local memory allocation, +- minimizing cross-socket traffic. + +NUMA awareness is a key step toward scaling efficiently on high core-count, +multi-socket systems. + +### Morsel-Driven Parallelism + +In the OTAP Dataflow Engine, **batches** are the natural equivalent of +“morsels”. Future designs may introduce batching at the ingress stage, before +data traverses load-balancing boundaries such as per-core queues or MPMC named +channels. This enables a form of **morsel-driven parallelism**, where: + +- worker threads remain fixed and pinned to cores, +- batches (morsels) are dynamically distributed, +- preemption happens only at batch boundaries, +- backpressure and bounded memory are preserved. + +This approach is inspired by the work described in: + +> *Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the +Many-Core Age* +> Thomas Neumann, Tobias Mühlbauer, Alfons Kemper +> https://db.in.tum.de/~leis/papers/morsels.pdf + +The goal is to enable flexible and explicit load balancing for compute-heavy or +skewed workloads, while remaining fully compatible with the engine's +thread-per-core and share-nothing principles. + +## Design Principles (Summary) + +- Thread-per-core execution with CPU pinning +- Share-nothing hot data paths +- Explicit, bounded communication only +- No implicit scheduling or load balancing +- Predictable performance and backpressure by design + +The controller is the foundation that enforces these principles while allowing +controlled and explicit evolution of the execution model. + +## Todo list + +- [ ] Improve configuration validation and error messages +- [ ] Expose a controller API (OpAMP ?) +- [ ] Add support for NUMA awareness +- [ ] Add support for dynamic reconfiguration +- [ ] Add support for custom plugins/extensions diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 88abbd2e44..729314b53e 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -3,19 +3,43 @@ //! OTAP Dataflow Engine Controller //! -//! This controller manages and monitors the execution of pipelines within the current process. -//! It uses a thread-per-core model, where each thread is pinned to a specific CPU core. -//! This approach maximizes multi-core efficiency and reduces contention between threads, -//! ensuring that each pipeline runs on a dedicated core for predictable and optimal CPU usage. +//! This controller is responsible for deploying, managing, and monitoring pipeline groups +//! within the current process. +//! +//! Each pipeline configuration declares its CPU requirements through quota settings. +//! Based on these settings, the controller allocates CPU cores and spawns one dedicated +//! thread per assigned core. Threads are pinned to distinct CPU cores, following a +//! strict thread-per-core model. +//! +//! A pipeline deployed on `n` cores results in `n` worker threads. Hot data paths are +//! fully contained within each thread to maximize CPU cache locality and minimize +//! cross-thread contention. Inter-thread communication is restricted to control +//! messages and internal telemetry only. +//! +//! By default, pipelines are expected to run on dedicated CPU cores. It is possible +//! to deploy multiple pipeline configurations on the same cores, primarily for +//! consolidation, testing, or transitional deployments. This comes at the cost of +//! reduced efficiency, especially cache locality. Even in this mode, pipeline +//! instances run in independent threads and do not share mutable data structures. +//! +//! Pipelines do not perform implicit work stealing, dynamic scheduling, or automatic +//! load balancing across threads. Any form of cross-pipeline or cross-thread data +//! exchange must be explicitly modeled. +//! +//! In the future, controller-managed named channels will be introduced as the +//! recommended mechanism to implement explicit load balancing and routing schemes +//! within the engine. These channels will complement the existing SO_REUSEPORT-based +//! load balancing mechanism already supported at the receiver level on operating +//! systems that provide it. +//! +//! Pipelines can be gracefully shut down by sending control messages through their +//! control channels. //! //! Future work includes: -//! - TODO: Status and health checks for pipelines -//! - TODO: Graceful shutdown of pipelines +//! - TODO: Complete status and health checks for pipelines //! - TODO: Auto-restart threads in case of panic //! - TODO: Live pipeline updates //! - TODO: Better resource control -//! - TODO: Monitoring -//! - TODO: Support multiple pipeline groups use crate::error::Error; use crate::thread_task::spawn_thread_local_task; From 45f8d0491252b8a0b2d671a348c4632b1a6763d4 Mon Sep 17 00:00:00 2001 From: lquerel Date: Mon, 19 Jan 2026 13:33:06 -0800 Subject: [PATCH 08/13] Add architecture diagram and update main README.md --- rust/otap-dataflow/README.md | 29 +++++++++++++++---- .../docs/images/otap-df-engine.svg | 1 + 2 files changed, 24 insertions(+), 6 deletions(-) create mode 100644 rust/otap-dataflow/docs/images/otap-df-engine.svg diff --git a/rust/otap-dataflow/README.md b/rust/otap-dataflow/README.md index 80dcf4f7f8..ba37d73b80 100644 --- a/rust/otap-dataflow/README.md +++ b/rust/otap-dataflow/README.md @@ -22,6 +22,20 @@ data. > provided as a means to test and validate OTAP pipelines built using > the dataflow engine. +## Architecture + +![OTAP Dataflow Engine architecture](docs/images/otap-df-engine.svg) + +The controller is the local control plane for pipeline groups. It allocates CPU +cores, spawns one worker thread per core, and owns lifecycle, coordination, and +runtime observability. Each pipeline runs a single-threaded engine instance per +assigned core, hot data paths stay within that thread, while cross-thread +coordination is handled through control messages and internal telemetry. + +The admin HTTP server and observed-state store are driven by the controller for +runtime visibility and control. For details, see the controller and engine crate +READMEs. + ## Features The OTAP Dataflow engine consists of a number of major pieces. Here @@ -71,9 +85,12 @@ the many N-to-1 relationships expressed within an OTAP request. ## Major components -### Engine +### Controller and Engine + +See the controller and engine crate READMEs: -[See crate README.](./crates/engine/README.md) +- [controller](./crates/controller/README.md). +- [engine](./crates/engine/README.md), The `otap_df_engine` crate is located in `crates/engine`, here we find the engine's overall architecture expressed: @@ -105,7 +122,7 @@ crates/engine/lib.rs: Effect handler extensions, pipeline factory |-- runtime_pipeline.rs: Builds the graph of component channels ``` -### OTAP: OTel-Arrow Protocol pipline data +### OTAP: OTel-Arrow Protocol pipeline data [See crate README.](./crates/otap/README.md) @@ -234,10 +251,10 @@ establish the performance of the OTAP Dataflow system. [See crate README.](./crates/controller/README.md) -The `otap_df_controller` crate is located in `crates/controller` is +The `otap_df_controller` crate is located in `crates/controller` and is the main entry point to construct an OTAP Dataflow pipeline instance. The -controller type, `otap_df_controller::Controller`, manages building -and running one or more pipelines. +controller type, `otap_df_controller::Controller`, manages building, +running, and supervising one or more pipelines. This component is responsible for making the assignment between OTAP dataflow pipeline and individually-numbered CPU instances. The diff --git a/rust/otap-dataflow/docs/images/otap-df-engine.svg b/rust/otap-dataflow/docs/images/otap-df-engine.svg new file mode 100644 index 0000000000..c89cba3f69 --- /dev/null +++ b/rust/otap-dataflow/docs/images/otap-df-engine.svg @@ -0,0 +1 @@ + \ No newline at end of file From e0e643c6f7c40e48e0e01932a8ccbbe9fbafd157 Mon Sep 17 00:00:00 2001 From: lquerel Date: Mon, 19 Jan 2026 16:00:02 -0800 Subject: [PATCH 09/13] Update diagrams and documentation --- rust/otap-dataflow/README.md | 2 +- rust/otap-dataflow/crates/controller/README.md | 2 ++ .../controller/assets/controller-architecture.svg} | 0 rust/otap-dataflow/docs/images/architecture-high-level.svg | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) rename rust/otap-dataflow/{docs/images/otap-df-engine.svg => crates/controller/assets/controller-architecture.svg} (100%) create mode 100644 rust/otap-dataflow/docs/images/architecture-high-level.svg diff --git a/rust/otap-dataflow/README.md b/rust/otap-dataflow/README.md index ba37d73b80..f2fe74ab78 100644 --- a/rust/otap-dataflow/README.md +++ b/rust/otap-dataflow/README.md @@ -24,7 +24,7 @@ data. ## Architecture -![OTAP Dataflow Engine architecture](docs/images/otap-df-engine.svg) +![OTAP Dataflow Engine architecture](docs/images/architecture-high-level.svg) The controller is the local control plane for pipeline groups. It allocates CPU cores, spawns one worker thread per core, and owns lifecycle, coordination, and diff --git a/rust/otap-dataflow/crates/controller/README.md b/rust/otap-dataflow/crates/controller/README.md index 51122f7524..cdee737008 100644 --- a/rust/otap-dataflow/crates/controller/README.md +++ b/rust/otap-dataflow/crates/controller/README.md @@ -8,6 +8,8 @@ allocation, lifecycle management, and inter-pipeline coordination, while preserving the engine's core design principles: thread-per-core execution, share-nothing hot paths, and predictable performance. +![Controller Architecture](./assets/controller-architecture.svg) + ## Execution Model Each pipeline configuration declares its CPU requirements through quota diff --git a/rust/otap-dataflow/docs/images/otap-df-engine.svg b/rust/otap-dataflow/crates/controller/assets/controller-architecture.svg similarity index 100% rename from rust/otap-dataflow/docs/images/otap-df-engine.svg rename to rust/otap-dataflow/crates/controller/assets/controller-architecture.svg diff --git a/rust/otap-dataflow/docs/images/architecture-high-level.svg b/rust/otap-dataflow/docs/images/architecture-high-level.svg new file mode 100644 index 0000000000..fccc4b5667 --- /dev/null +++ b/rust/otap-dataflow/docs/images/architecture-high-level.svg @@ -0,0 +1 @@ + \ No newline at end of file From 14ea19e01a4ccafbe91eea92dfa259354d38818b Mon Sep 17 00:00:00 2001 From: lquerel Date: Mon, 19 Jan 2026 16:13:37 -0800 Subject: [PATCH 10/13] A little bit of refactoring in the controller --- .../crates/controller/src/lib.rs | 160 +++++++++--------- 1 file changed, 76 insertions(+), 84 deletions(-) diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 729314b53e..941ad529b2 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -44,12 +44,8 @@ use crate::error::Error; use crate::thread_task::spawn_thread_local_task; use core_affinity::CoreId; -use otap_df_config::engine::{EngineConfig, EngineSettings, HttpAdminSettings}; -use otap_df_config::{ - PipelineGroupId, PipelineId, - pipeline::{CoreAllocation, PipelineConfig, Quota}, - pipeline_group::PipelineGroupConfig, -}; +use otap_df_config::engine::EngineConfig; +use otap_df_config::pipeline::{CoreAllocation, PipelineConfig}; use otap_df_engine::PipelineFactory; use otap_df_engine::context::{ControllerContext, PipelineContext}; use otap_df_engine::control::{ @@ -63,7 +59,6 @@ use otap_df_state::store::ObservedStateStore; use otap_df_state::{DeployedPipelineKey, PipelineKey}; use otap_df_telemetry::reporter::MetricsReporter; use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn}; -use std::collections::HashMap; use std::thread; /// Error types and helpers for the controller module. @@ -163,8 +158,10 @@ impl Controller { for (pipeline_group_id, pipeline_group) in pipeline_groups { for (pipeline_id, pipeline) in pipeline_group.pipelines { let quota = pipeline.quota().clone(); - let requested_cores = - Self::select_cores_for_quota(available_core_ids.clone(), quota)?; + let requested_cores = Self::select_cores_for_allocation( + available_core_ids.clone(), + "a.core_allocation, + )?; for (thread_id, core_id) in requested_cores.into_iter().enumerate() { let pipeline_key = DeployedPipelineKey { @@ -323,24 +320,24 @@ impl Controller { Ok(()) } - /// Selects which CPU cores to use based on the given quota configuration. - fn select_cores_for_quota( + /// Selects which CPU cores to use based on the given allocation. + fn select_cores_for_allocation( mut available_core_ids: Vec, - quota: Quota, + core_allocation: &CoreAllocation, ) -> Result, Error> { available_core_ids.sort_by_key(|c| c.id); let max_core_id = available_core_ids.iter().map(|c| c.id).max().unwrap_or(0); let num_cores = available_core_ids.len(); - match quota.core_allocation { + match core_allocation { CoreAllocation::AllCores => Ok(available_core_ids), CoreAllocation::CoreCount { count } => { - if count == 0 { + if *count == 0 { Ok(available_core_ids) - } else if count > num_cores { + } else if *count > num_cores { Err(Error::InvalidCoreAllocation { - alloc: quota.core_allocation.clone(), + alloc: core_allocation.clone(), message: format!( "Requested {} cores but only {} cores available on this system", count, num_cores @@ -348,15 +345,15 @@ impl Controller { available: available_core_ids.iter().map(|c| c.id).collect(), }) } else { - Ok(available_core_ids.into_iter().take(count).collect()) + Ok(available_core_ids.into_iter().take(*count).collect()) } } - CoreAllocation::CoreSet { ref set } => { + CoreAllocation::CoreSet { set } => { // Validate all ranges first for r in set.iter() { if r.start > r.end { return Err(Error::InvalidCoreAllocation { - alloc: quota.core_allocation.clone(), + alloc: core_allocation.clone(), message: format!( "Invalid core range: start ({}) is greater than end ({})", r.start, r.end @@ -366,7 +363,7 @@ impl Controller { } if r.start > max_core_id { return Err(Error::InvalidCoreAllocation { - alloc: quota.core_allocation.clone(), + alloc: core_allocation.clone(), message: format!( "Core ID {} exceeds available cores (system has cores 0-{})", r.start, max_core_id @@ -376,7 +373,7 @@ impl Controller { } if r.end > max_core_id { return Err(Error::InvalidCoreAllocation { - alloc: quota.core_allocation.clone(), + alloc: core_allocation.clone(), message: format!( "Core ID {} exceeds available cores (system has cores 0-{})", r.end, max_core_id @@ -394,7 +391,7 @@ impl Controller { let overlap_start = r1.start.max(r2.start); let overlap_end = r1.end.min(r2.end); return Err(Error::InvalidCoreAllocation { - alloc: quota.core_allocation.clone(), + alloc: core_allocation.clone(), message: format!( "Core ranges overlap: {}-{} and {}-{} share cores {}-{}", r1.start, r1.end, r2.start, r2.end, overlap_start, overlap_end @@ -416,7 +413,7 @@ impl Controller { if selected.is_empty() { return Err(Error::InvalidCoreAllocation { - alloc: quota.core_allocation.clone(), + alloc: core_allocation.clone(), message: "No available cores in the specified ranges".to_owned(), available: core_affinity::get_core_ids() .unwrap_or_default() @@ -543,23 +540,24 @@ mod tests { #[test] fn select_all_cores_by_default() { - let quota = Quota { - core_allocation: CoreAllocation::AllCores, - }; + let core_allocation = CoreAllocation::AllCores; let available_core_ids = available_core_ids(); let expected_core_ids = available_core_ids.clone(); - let result = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap(); + let result = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap(); assert_eq!(to_ids(&result), to_ids(&expected_core_ids)); } #[test] fn select_limited_by_num_cores() { - let quota = Quota { - core_allocation: CoreAllocation::CoreCount { count: 4 }, - }; + let core_allocation = CoreAllocation::CoreCount { count: 4 }; let available_core_ids = available_core_ids(); - let result = - Controller::<()>::select_cores_for_quota(available_core_ids.clone(), quota).unwrap(); + let result = Controller::<()>::select_cores_for_allocation( + available_core_ids.clone(), + &core_allocation, + ) + .unwrap(); assert_eq!(result.len(), 4); let expected_ids: Vec = available_core_ids .into_iter() @@ -573,30 +571,30 @@ mod tests { fn select_with_valid_single_core_range() { let available_core_ids = available_core_ids(); let first_id = available_core_ids[0].id; - let quota = Quota { - core_allocation: CoreAllocation::CoreSet { - set: vec![CoreRange { - start: first_id, - end: first_id, - }], - }, + let core_allocation = CoreAllocation::CoreSet { + set: vec![CoreRange { + start: first_id, + end: first_id, + }], }; - let result = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap(); + let result = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap(); assert_eq!(to_ids(&result), vec![first_id]); } #[test] fn select_with_valid_multi_core_range() { - let quota = Quota { - core_allocation: CoreAllocation::CoreSet { - set: vec![ - CoreRange { start: 2, end: 5 }, - CoreRange { start: 6, end: 6 }, - ], - }, + let core_allocation = CoreAllocation::CoreSet { + set: vec![ + CoreRange { start: 2, end: 5 }, + CoreRange { start: 6, end: 6 }, + ], }; let available_core_ids = available_core_ids(); - let result = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap(); + let result = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap(); assert_eq!(to_ids(&result), vec![2, 3, 4, 5, 6]); } @@ -605,11 +603,10 @@ mod tests { let core_allocation = CoreAllocation::CoreSet { set: vec![CoreRange { start: 2, end: 1 }], }; - let quota = Quota { - core_allocation: core_allocation.clone(), - }; let available_core_ids = available_core_ids(); - let err = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap_err(); + let err = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap_err(); match err { Error::InvalidCoreAllocation { alloc, .. } => { assert_eq!(alloc, core_allocation); @@ -625,11 +622,10 @@ mod tests { let core_allocation = CoreAllocation::CoreSet { set: vec![CoreRange { start, end }], }; - let quota = Quota { - core_allocation: core_allocation.clone(), - }; let available_core_ids = available_core_ids(); - let err = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap_err(); + let err = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap_err(); match err { Error::InvalidCoreAllocation { alloc, .. } => { assert_eq!(alloc, core_allocation); @@ -640,12 +636,12 @@ mod tests { #[test] fn select_with_zero_count_uses_all_cores() { - let quota = Quota { - core_allocation: CoreAllocation::CoreCount { count: 0 }, - }; + let core_allocation = CoreAllocation::CoreCount { count: 0 }; let available_core_ids = available_core_ids(); let expected_core_ids = available_core_ids.clone(); - let result = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap(); + let result = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap(); assert_eq!(to_ids(&result), to_ids(&expected_core_ids)); } @@ -657,11 +653,10 @@ mod tests { CoreRange { start: 4, end: 7 }, ], }; - let quota = Quota { - core_allocation: core_allocation.clone(), - }; let available_core_ids = available_core_ids(); - let err = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap_err(); + let err = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap_err(); match err { Error::InvalidCoreAllocation { alloc, message, .. } => { assert_eq!(alloc, core_allocation); @@ -683,11 +678,10 @@ mod tests { CoreRange { start: 3, end: 5 }, ], }; - let quota = Quota { - core_allocation: core_allocation.clone(), - }; let available_core_ids = available_core_ids(); - let err = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap_err(); + let err = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap_err(); match err { Error::InvalidCoreAllocation { alloc, message, .. } => { assert_eq!(alloc, core_allocation); @@ -709,11 +703,10 @@ mod tests { CoreRange { start: 3, end: 5 }, ], }; - let quota = Quota { - core_allocation: core_allocation.clone(), - }; let available_core_ids = available_core_ids(); - let err = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap_err(); + let err = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap_err(); match err { Error::InvalidCoreAllocation { alloc, message, .. } => { assert_eq!(alloc, core_allocation); @@ -730,16 +723,16 @@ mod tests { #[test] fn select_with_adjacent_ranges_succeeds() { // Adjacent but non-overlapping ranges should work - let quota = Quota { - core_allocation: CoreAllocation::CoreSet { - set: vec![ - CoreRange { start: 2, end: 3 }, - CoreRange { start: 4, end: 5 }, - ], - }, + let core_allocation = CoreAllocation::CoreSet { + set: vec![ + CoreRange { start: 2, end: 3 }, + CoreRange { start: 4, end: 5 }, + ], }; let available_core_ids = available_core_ids(); - let result = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap(); + let result = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap(); assert_eq!(to_ids(&result), vec![2, 3, 4, 5]); } @@ -752,11 +745,10 @@ mod tests { CoreRange { start: 5, end: 6 }, ], }; - let quota = Quota { - core_allocation: core_allocation.clone(), - }; let available_core_ids = available_core_ids(); - let err = Controller::<()>::select_cores_for_quota(available_core_ids, quota).unwrap_err(); + let err = + Controller::<()>::select_cores_for_allocation(available_core_ids, &core_allocation) + .unwrap_err(); match err { Error::InvalidCoreAllocation { alloc, message, .. } => { assert_eq!(alloc, core_allocation); From 1ec9d6cddd775c544d257ddb5c51da4bf7843fb5 Mon Sep 17 00:00:00 2001 From: lquerel Date: Tue, 20 Jan 2026 10:55:20 -0800 Subject: [PATCH 11/13] Update configuration files --- rust/otap-dataflow/configs/fake-otap.yaml | 11 ++- rust/otap-dataflow/configs/fake-otlp.yaml | 2 +- rust/otap-dataflow/configs/otap-noop.yaml | 22 +++++ .../configs/otap-route-otap.yaml | 98 +++++++++++++++++++ 4 files changed, 127 insertions(+), 6 deletions(-) create mode 100644 rust/otap-dataflow/configs/otap-noop.yaml create mode 100644 rust/otap-dataflow/configs/otap-route-otap.yaml diff --git a/rust/otap-dataflow/configs/fake-otap.yaml b/rust/otap-dataflow/configs/fake-otap.yaml index 2c504ece03..2ddf3c2125 100644 --- a/rust/otap-dataflow/configs/fake-otap.yaml +++ b/rust/otap-dataflow/configs/fake-otap.yaml @@ -14,13 +14,14 @@ nodes: dispatch_strategy: round_robin config: traffic_config: - max_signal_count: 1000 - max_batch_size: 1000 - signals_per_second: 1000 - log_weight: 100 + signals_per_second: 100000 + max_signal_count: null + metric_weight: 0 + trace_weight: 0 + log_weight: 30 registry_path: https://github.com/open-telemetry/semantic-conventions.git[model] exporter: kind: exporter plugin_urn: "urn:otel:otap:exporter" config: - grpc_endpoint: "http://127.0.0.1:4318" + grpc_endpoint: "http://127.0.0.1:4327" diff --git a/rust/otap-dataflow/configs/fake-otlp.yaml b/rust/otap-dataflow/configs/fake-otlp.yaml index 7c358fcc55..c8215ca86d 100644 --- a/rust/otap-dataflow/configs/fake-otlp.yaml +++ b/rust/otap-dataflow/configs/fake-otlp.yaml @@ -24,4 +24,4 @@ nodes: kind: exporter plugin_urn: "urn:otel:otlp:exporter" config: - grpc_endpoint: "http://127.0.0.1:4317" + grpc_endpoint: "http://127.0.0.1:4327" diff --git a/rust/otap-dataflow/configs/otap-noop.yaml b/rust/otap-dataflow/configs/otap-noop.yaml new file mode 100644 index 0000000000..94f6abe879 --- /dev/null +++ b/rust/otap-dataflow/configs/otap-noop.yaml @@ -0,0 +1,22 @@ +settings: + default_pipeline_ctrl_msg_channel_size: 100 + default_node_ctrl_msg_channel_size: 100 + default_pdata_channel_size: 100 + +nodes: + receiver: + kind: receiver + plugin_urn: "urn:otel:otap:receiver" + out_ports: + out_port: + destinations: + - perf_noop + dispatch_strategy: round_robin + config: + listening_addr: "127.0.0.1:4328" + response_stream_channel_size: 256 + + perf_noop: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: diff --git a/rust/otap-dataflow/configs/otap-route-otap.yaml b/rust/otap-dataflow/configs/otap-route-otap.yaml new file mode 100644 index 0000000000..b623fce2ff --- /dev/null +++ b/rust/otap-dataflow/configs/otap-route-otap.yaml @@ -0,0 +1,98 @@ +settings: + default_pipeline_ctrl_msg_channel_size: 100 + default_node_ctrl_msg_channel_size: 100 + default_pdata_channel_size: 100 + +nodes: + otlp_recv: + kind: receiver + plugin_urn: "urn:otel:otap:receiver" + out_ports: + out_port: + destinations: + - router + dispatch_strategy: round_robin + config: + listening_addr: "127.0.0.1:4327" + wait_for_result: true + response_stream_channel_size: 256 # Required: channel buffer capacity (number of messages) + + otap_recv: + kind: receiver + plugin_urn: "urn:otel:otap:receiver" + out_ports: + out_port: + destinations: + - otap_exporter + dispatch_strategy: round_robin + config: + listening_addr: "127.0.0.1:4329" + response_stream_channel_size: 256 + + router: + kind: processor + plugin_urn: "urn:otap:processor:signal_type_router" + out_ports: + logs: + destinations: + - retry + dispatch_strategy: round_robin + metrics: + destinations: + - metrics_exporter + dispatch_strategy: round_robin + traces: + destinations: + - spans_exporter + dispatch_strategy: round_robin + config: {} + +# logs_filter: +# kind: processor +# plugin_urn: "urn:otel:filter:processor" +# out_ports: +# out_port: +# destinations: +# - logs_exporter +# dispatch_strategy: round_robin +# config: +# logs: +# include: +# match_type: strict +# record_attributes: +# - key: gen_ai.system +# value: openai +# - key: ios.app.state +# value: active + + retry: + kind: processor + plugin_urn: "urn:otel:retry:processor" + out_ports: + out_port: + destinations: + - logs_exporter + dispatch_strategy: round_robin + config: + multiplier: 1.5 + + logs_exporter: + kind: exporter + plugin_urn: "urn:otel:otap:exporter" + config: + grpc_endpoint: "http://127.0.0.1:4328" + + metrics_exporter: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + + spans_exporter: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + + otap_exporter: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: From e89820accb5736675d1ba106179401568131e400 Mon Sep 17 00:00:00 2001 From: lquerel Date: Tue, 20 Jan 2026 11:12:30 -0800 Subject: [PATCH 12/13] Merge with main upstream --- rust/otap-dataflow/crates/controller/src/lib.rs | 9 ++++----- rust/otap-dataflow/crates/state/src/store.rs | 4 +++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 0343c49c6d..b637833cc6 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -44,11 +44,9 @@ use crate::error::Error; use crate::thread_task::spawn_thread_local_task; use core_affinity::CoreId; -use otap_df_config::engine::HttpAdminSettings; -use otap_df_config::{ - PipelineGroupId, PipelineId, - pipeline::PipelineConfig, -}; +use otap_df_config::engine::EngineConfig; +use otap_df_config::pipeline::CoreAllocation; +use otap_df_config::{DeployedPipelineKey, PipelineKey, pipeline::PipelineConfig}; use otap_df_engine::PipelineFactory; use otap_df_engine::context::{ControllerContext, PipelineContext}; use otap_df_engine::control::{ @@ -58,6 +56,7 @@ use otap_df_engine::entity_context::set_pipeline_entity_key; use otap_df_engine::error::{Error as EngineError, error_summary_from}; use otap_df_state::reporter::ObservedEventReporter; use otap_df_state::store::ObservedStateStore; +use otap_df_telemetry::event::{ErrorSummary, ObservedEvent}; use otap_df_telemetry::reporter::MetricsReporter; use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn}; use std::thread; diff --git a/rust/otap-dataflow/crates/state/src/store.rs b/rust/otap-dataflow/crates/state/src/store.rs index d51b09c1eb..3d6bd13895 100644 --- a/rust/otap-dataflow/crates/state/src/store.rs +++ b/rust/otap-dataflow/crates/state/src/store.rs @@ -9,7 +9,9 @@ use crate::phase::PipelinePhase; use crate::pipeline_rt_status::{ApplyOutcome, PipelineRuntimeStatus}; use crate::pipeline_status::PipelineStatus; use crate::reporter::ObservedEventReporter; -use otap_df_config::{PipelineKey, pipeline::PipelineSettings}; +use otap_df_config::PipelineKey; +use otap_df_config::health::HealthPolicy; +use otap_df_config::observed_state::ObservedStateSettings; use otap_df_telemetry::event::{EventType, ObservedEvent}; use serde::Serialize; use std::collections::HashMap; From 29ed40e5d8ae2ae337fb4ae2dee15564a8dffca6 Mon Sep 17 00:00:00 2001 From: lquerel Date: Tue, 20 Jan 2026 11:23:07 -0800 Subject: [PATCH 13/13] Fix markdown lint issues --- rust/otap-dataflow/README.md | 2 +- rust/otap-dataflow/crates/controller/README.md | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/otap-dataflow/README.md b/rust/otap-dataflow/README.md index f2fe74ab78..ef322b1f4f 100644 --- a/rust/otap-dataflow/README.md +++ b/rust/otap-dataflow/README.md @@ -87,7 +87,7 @@ the many N-to-1 relationships expressed within an OTAP request. ### Controller and Engine -See the controller and engine crate READMEs: +See the controller and engine crate READMEs: - [controller](./crates/controller/README.md). - [engine](./crates/engine/README.md), diff --git a/rust/otap-dataflow/crates/controller/README.md b/rust/otap-dataflow/crates/controller/README.md index cdee737008..1a0f461841 100644 --- a/rust/otap-dataflow/crates/controller/README.md +++ b/rust/otap-dataflow/crates/controller/README.md @@ -82,7 +82,7 @@ be explicitly declared in the pipeline topology. ### NUMA-Aware Controller Future versions of the controller are expected to become fully NUMA-aware. -This includes: +This includes: - allocating pipelines on specific NUMA nodes, - pinning threads accordingly, @@ -95,7 +95,7 @@ multi-socket systems. ### Morsel-Driven Parallelism In the OTAP Dataflow Engine, **batches** are the natural equivalent of -“morsels”. Future designs may introduce batching at the ingress stage, before +"morsels". Future designs may introduce batching at the ingress stage, before data traverses load-balancing boundaries such as per-core queues or MPMC named channels. This enables a form of **morsel-driven parallelism**, where: @@ -107,9 +107,9 @@ channels. This enables a form of **morsel-driven parallelism**, where: This approach is inspired by the work described in: > *Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the -Many-Core Age* -> Thomas Neumann, Tobias Mühlbauer, Alfons Kemper -> https://db.in.tum.de/~leis/papers/morsels.pdf +Many-Core Age* +> Thomas Neumann, Tobias Muhlbauer, Alfons Kemper +> [https://db.in.tum.de/~leis/papers/morsels.pdf](https://db.in.tum.de/~leis/papers/morsels.pdf) The goal is to enable flexible and explicit load balancing for compute-heavy or skewed workloads, while remaining fully compatible with the engine's