From 0fbc5ba6fa1056d140ec0e86da74f52f3f29af55 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 15 Jan 2026 11:13:41 -0800 Subject: [PATCH 1/3] Internal telemetry pipeline nodes config --- .../crates/config/src/pipeline.rs | 322 +++++++++++++----- .../config/tests/fixtures/test_pipeline.yaml | 25 ++ 2 files changed, 253 insertions(+), 94 deletions(-) diff --git a/rust/otap-dataflow/crates/config/src/pipeline.rs b/rust/otap-dataflow/crates/config/src/pipeline.rs index fcc49e482e..3f2d45534c 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline.rs @@ -46,7 +46,14 @@ pub struct PipelineConfig { /// /// Note: We use `Arc` to allow sharing the same pipeline configuration /// across multiple cores/threads without cloning the entire configuration. - nodes: HashMap>, + #[serde(default)] + nodes: PipelineNodes, + + /// Internal telemetry pipeline nodes. These have the same structure + /// as `nodes` but are independent and isolated to a separate internal + /// telemetry runtime. + #[serde(default, skip_serializing_if = "PipelineNodes::is_empty")] + internal: PipelineNodes, /// Service-level telemetry configuration. #[serde(default)] @@ -68,6 +75,183 @@ pub enum PipelineType { /// OpenTelemetry with Apache Arrow Protocol (OTAP) pipeline. Otap, } + +/// A collection of nodes forming a pipeline graph. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] +#[serde(transparent)] +pub struct PipelineNodes(HashMap>); + +impl PipelineNodes { + /// Returns true if the node collection is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Returns the number of nodes. + #[must_use] + pub fn len(&self) -> usize { + self.0.len() + } + + /// Returns a reference to the node with the given ID, if it exists. + #[must_use] + pub fn get(&self, id: &str) -> Option<&Arc> { + self.0.get(id) + } + + /// Returns true if a node with the given ID exists. + #[must_use] + pub fn contains_key(&self, id: &str) -> bool { + self.0.contains_key(id) + } + + /// Returns an iterator visiting all nodes. + pub fn iter(&self) -> impl Iterator)> { + self.0.iter() + } + + /// Returns an iterator over node IDs. + pub fn keys(&self) -> impl Iterator { + self.0.keys() + } + + /// Validate the node graph structure. + /// + /// Checks for: + /// - Invalid hyper-edges (missing target nodes) + /// - Cycles in the DAG + pub fn validate( + &self, + pipeline_group_id: &PipelineGroupId, + pipeline_id: &PipelineId, + errors: &mut Vec, + ) { + self.validate_hyper_edges(pipeline_group_id, pipeline_id, errors); + + // Only check for cycles if no hyper-edge errors + if errors.is_empty() { + for cycle in self.detect_cycles() { + errors.push(Error::CycleDetected { + context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), + nodes: cycle, + }); + } + } + } + + /// Validate hyper-edges (check that all destination nodes exist). + fn validate_hyper_edges( + &self, + pipeline_group_id: &PipelineGroupId, + pipeline_id: &PipelineId, + errors: &mut Vec, + ) { + for (node_id, node) in self.0.iter() { + for edge in node.out_ports.values() { + let missing_targets: Vec<_> = edge + .destinations + .iter() + .filter(|target| !self.0.contains_key(*target)) + .cloned() + .collect(); + + if !missing_targets.is_empty() { + errors.push(Error::InvalidHyperEdgeSpec { + context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), + source_node: node_id.clone(), + missing_source: false, + details: Box::new(HyperEdgeSpecDetails { + target_nodes: edge.destinations.iter().cloned().collect(), + dispatch_strategy: edge.dispatch_strategy.clone(), + missing_targets, + }), + }); + } + } + } + } + + /// Detect cycles in the node graph. + fn detect_cycles(&self) -> Vec> { + fn visit( + node: &NodeId, + nodes: &HashMap>, + visiting: &mut HashSet, + visited: &mut HashSet, + current_path: &mut Vec, + cycles: &mut Vec>, + ) { + if visited.contains(node) { + return; + } + if visiting.contains(node) { + if let Some(pos) = current_path.iter().position(|n| n == node) { + cycles.push(current_path[pos..].to_vec()); + } + return; + } + _ = visiting.insert(node.clone()); + current_path.push(node.clone()); + + if let Some(n) = nodes.get(node) { + for edge in n.out_ports.values() { + for tgt in &edge.destinations { + visit(tgt, nodes, visiting, visited, current_path, cycles); + } + } + } + + _ = visiting.remove(node); + _ = visited.insert(node.clone()); + _ = current_path.pop(); + } + + let mut visiting = HashSet::new(); + let mut current_path = Vec::new(); + let mut visited = HashSet::new(); + let mut cycles = Vec::new(); + + for node in self.0.keys() { + if !visited.contains(node) { + visit( + node, + &self.0, + &mut visiting, + &mut visited, + &mut current_path, + &mut cycles, + ); + } + } + + cycles + } +} + +impl std::ops::Index<&str> for PipelineNodes { + type Output = Arc; + + fn index(&self, id: &str) -> &Self::Output { + &self.0[id] + } +} + +impl IntoIterator for PipelineNodes { + type Item = (NodeId, Arc); + type IntoIter = std::collections::hash_map::IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl FromIterator<(NodeId, Arc)> for PipelineNodes { + fn from_iter)>>(iter: T) -> Self { + Self(iter.into_iter().collect()) + } +} + /// A configuration for a pipeline. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct PipelineSettings { @@ -262,6 +446,12 @@ impl PipelineConfig { &self.settings } + /// Returns a reference to the main pipeline nodes. + #[must_use] + pub fn nodes(&self) -> &PipelineNodes { + &self.nodes + } + /// Returns an iterator visiting all nodes in the pipeline. pub fn node_iter(&self) -> impl Iterator)> { self.nodes.iter() @@ -278,13 +468,29 @@ impl PipelineConfig { &self.service } + /// Returns true if the internal telemetry pipeline is configured. + #[must_use] + pub fn has_internal_pipeline(&self) -> bool { + !self.internal.is_empty() + } + + /// Returns a reference to the internal pipeline nodes. + #[must_use] + pub fn internal_nodes(&self) -> &PipelineNodes { + &self.internal + } + + /// Returns an iterator visiting all nodes in the internal telemetry pipeline. + pub fn internal_node_iter(&self) -> impl Iterator)> { + self.internal.iter() + } + /// Validate the pipeline specification. /// /// This method checks for: /// - Duplicate node IDs /// - Duplicate out-ports (same source node + port name) /// - Invalid hyper-edges (missing source or target nodes) - /// - Cycles in the DAG pub fn validate( &self, pipeline_group_id: &PipelineGroupId, @@ -292,41 +498,14 @@ impl PipelineConfig { ) -> Result<(), Error> { let mut errors = Vec::new(); - // Check for invalid hyper-edges (references to non-existent nodes) - for (node_id, node) in self.nodes.iter() { - for edge in node.out_ports.values() { - let mut missing_targets = Vec::new(); + // Validate main pipeline + self.nodes + .validate(pipeline_group_id, pipeline_id, &mut errors); - for target in &edge.destinations { - if !self.nodes.contains_key(target) { - missing_targets.push(target.clone()); - } - } - - if !missing_targets.is_empty() { - errors.push(Error::InvalidHyperEdgeSpec { - context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), - source_node: node_id.clone(), - missing_source: false, // source exists since we're iterating over nodes - details: Box::new(HyperEdgeSpecDetails { - target_nodes: edge.destinations.iter().cloned().collect(), - dispatch_strategy: edge.dispatch_strategy.clone(), - missing_targets, - }), - }); - } - } - } - - // Check for cycles if no errors so far - if errors.is_empty() { - let cycles = self.detect_cycles(); - for cycle in cycles { - errors.push(Error::CycleDetected { - context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), - nodes: cycle, - }); - } + // Validate internal pipeline if present + if !self.internal.is_empty() { + self.internal + .validate(pipeline_group_id, pipeline_id, &mut errors); } if !errors.is_empty() { @@ -335,65 +514,13 @@ impl PipelineConfig { Ok(()) } } - - fn detect_cycles(&self) -> Vec> { - fn visit( - node: &NodeId, - nodes: &HashMap>, - visiting: &mut HashSet, - visited: &mut HashSet, - current_path: &mut Vec, - cycles: &mut Vec>, - ) { - if visited.contains(node) { - return; - } - if visiting.contains(node) { - // Cycle found - if let Some(pos) = current_path.iter().position(|n| n == node) { - cycles.push(current_path[pos..].to_vec()); - } - return; - } - _ = visiting.insert(node.clone()); - current_path.push(node.clone()); - - if let Some(n) = nodes.get(node) { - for edge in n.out_ports.values() { - for tgt in &edge.destinations { - visit(tgt, nodes, visiting, visited, current_path, cycles); - } - } - } - - _ = visiting.remove(node); - _ = visited.insert(node.clone()); - _ = current_path.pop(); - } - - let mut visiting = HashSet::new(); - let mut current_path = Vec::new(); - let mut visited = HashSet::new(); - let mut cycles = Vec::new(); - - for node in self.nodes.keys() { - if !visited.contains(node) { - visit( - node, - &self.nodes, - &mut visiting, - &mut visited, - &mut current_path, - &mut cycles, - ); - } - } - - cycles - } } -/// A builder for constructing a [`PipelineConfig`]. +/// A builder for constructing a [`PipelineConfig`]. This type is used +/// for easy testing of the PipelineNodes logic. +/// +/// Note: does not support testing the internal pipeline build, +/// because it is identical. pub struct PipelineConfigBuilder { description: Option, nodes: HashMap, @@ -656,6 +783,7 @@ impl PipelineConfigBuilder { .into_iter() .map(|(id, node)| (id, Arc::new(node))) .collect(), + internal: PipelineNodes(HashMap::new()), settings: PipelineSettings::default(), r#type: pipeline_type, service: ServiceConfig::default(), @@ -1052,13 +1180,19 @@ mod tests { file_path, ); - assert!(result.is_ok()); + assert!(result.is_ok(), "failed parsing {}", result.unwrap_err()); let config = result.unwrap(); assert_eq!(config.nodes.len(), 3); assert!(config.nodes.contains_key("receiver1")); assert!(config.nodes.contains_key("processor1")); assert!(config.nodes.contains_key("exporter1")); + assert_eq!(config.internal.len(), 4); + assert!(config.internal.contains_key("receiver1")); + assert!(config.internal.contains_key("processor1")); + assert!(config.internal.contains_key("processor2")); + assert!(config.internal.contains_key("exporter1")); + let telemetry_config = &config.service().telemetry; let reporting_interval = telemetry_config.reporting_interval; assert_eq!(reporting_interval.as_secs(), 5); diff --git a/rust/otap-dataflow/crates/config/tests/fixtures/test_pipeline.yaml b/rust/otap-dataflow/crates/config/tests/fixtures/test_pipeline.yaml index e9af26e99b..500687e334 100644 --- a/rust/otap-dataflow/crates/config/tests/fixtures/test_pipeline.yaml +++ b/rust/otap-dataflow/crates/config/tests/fixtures/test_pipeline.yaml @@ -18,6 +18,31 @@ nodes: plugin_urn: "urn:test:exporter" config: null out_ports: {} + +# Note the internal and nodes graphs are separate. +# They do not share a namespace. +internal: + receiver1: + kind: receiver + plugin_urn: "urn:test:receiver" + config: null + out_ports: {} + processor1: + kind: processor + plugin_urn: "urn:test:processor" + config: null + out_ports: {} + processor2: + kind: processor + plugin_urn: "urn:test:processor" + config: null + out_ports: {} + exporter1: + kind: exporter + plugin_urn: "urn:test:exporter" + config: null + out_ports: {} + service: telemetry: reporting_interval: "5s" From ca3b523c3bd46b4e98f4e2a7e366ccd7b2d25766 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 15 Jan 2026 11:17:58 -0800 Subject: [PATCH 2/3] doc --- rust/otap-dataflow/docs/self_tracing_architecture.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rust/otap-dataflow/docs/self_tracing_architecture.md b/rust/otap-dataflow/docs/self_tracing_architecture.md index bf9ed25fee..eb7b19903b 100644 --- a/rust/otap-dataflow/docs/self_tracing_architecture.md +++ b/rust/otap-dataflow/docs/self_tracing_architecture.md @@ -28,6 +28,11 @@ telemetry pipeline consists of one (global) or more (NUMA-regional) ITR components and any of the connected processor and exporter components reachable from ITR source nodes. +Nodes of an internal telemetry pipeline have identical structure with +ordinary pipeline nodes, however they are separate and isolated. The +main dataflow engine knows nothing about the internal telemetry +pipeline engine. + ## Logs instrumentation The OTAP Dataflow engine has dedicated macros, and every component is @@ -137,6 +142,12 @@ In this configuration, a dedicated `LogsCollector` thread consumes from the channel and prints to console. ```yaml +nodes: + # pipeline nodes + +internal: + # internal telemetry pipeline nodes + service: telemetry: logs: From e852279a2e5a0b025d687f76e9d0bb1c3f3b3c08 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 15 Jan 2026 16:52:10 -0800 Subject: [PATCH 3/3] add prefix; move comment --- rust/otap-dataflow/crates/config/src/pipeline.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/rust/otap-dataflow/crates/config/src/pipeline.rs b/rust/otap-dataflow/crates/config/src/pipeline.rs index 3f2d45534c..078f49e7ef 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline.rs @@ -43,9 +43,6 @@ pub struct PipelineConfig { settings: PipelineSettings, /// All nodes in this pipeline, keyed by node ID. - /// - /// Note: We use `Arc` to allow sharing the same pipeline configuration - /// across multiple cores/threads without cloning the entire configuration. #[serde(default)] nodes: PipelineNodes, @@ -77,6 +74,9 @@ pub enum PipelineType { } /// A collection of nodes forming a pipeline graph. +/// +/// Note: We use `Arc` to allow sharing the same pipeline configuration +/// across multiple cores/threads without cloning the entire configuration. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] #[serde(transparent)] pub struct PipelineNodes(HashMap>); @@ -504,8 +504,14 @@ impl PipelineConfig { // Validate internal pipeline if present if !self.internal.is_empty() { + // TODO: the location of the internal telemetry pipeline + // nodes is subject to change. Temporarily, we append + // ("_internal") to the pipeline_id. We need a way to + // refer to the set of node defining the internal + // pipeline. + let internal_id: PipelineId = format!("{}_internal", &pipeline_id).into(); self.internal - .validate(pipeline_group_id, pipeline_id, &mut errors); + .validate(pipeline_group_id, &internal_id, &mut errors); } if !errors.is_empty() {