Skip to content
Merged
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/admin/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use axum::http::StatusCode;
use axum::routing::get;
use axum::{Json, Router};
use chrono::Utc;
use otap_df_state::PipelineKey;
use otap_df_config::PipelineKey;
use otap_df_state::conditions::{Condition, ConditionKind, ConditionReason, ConditionStatus};
use otap_df_state::pipeline_status::PipelineStatus;
use serde::Serialize;
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/admin/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{Json, Router};
use otap_df_state::PipelineKey;
use otap_df_config::PipelineKey;
use otap_df_state::pipeline_status::PipelineStatus;

/// All the routes for pipelines.
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/admin/src/pipeline_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::Utc;
use otap_df_state::PipelineKey;
use otap_df_config::PipelineKey;
use otap_df_state::pipeline_status::PipelineStatus;
use otap_df_telemetry::otel_info;
use serde::{Deserialize, Serialize};
Expand Down
66 changes: 65 additions & 1 deletion rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
//! A data plane is a collection of pipeline groups, where each group can have multiple pipelines.
//! A pipeline is a collection of nodes interconnected in a directed acyclic graph (DAG).

use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, ser::Serializer};
use std::borrow::Cow;
use std::hash::Hash;

pub mod byte_units;
pub mod engine;
Expand Down Expand Up @@ -66,3 +67,66 @@ pub type PortName = Cow<'static, str>;

/// The description of a pipeline or a node.
pub type Description = Cow<'static, str>;

/// Type alias for CPU core identifier.
/// Note: Not using core_affinity::CoreId directly to avoid dependency leakage in this public API
pub type CoreId = usize;

/// Unique key for identifying a pipeline within a pipeline group.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct PipelineKey {
pipeline_group_id: PipelineGroupId,
pipeline_id: PipelineId,
}

impl PipelineKey {
/// Construct a new PipelineKey from group and pipeline ids.
#[must_use]
pub fn new(pipeline_group_id: PipelineGroupId, pipeline_id: PipelineId) -> Self {
Self {
pipeline_group_id,
pipeline_id,
}
}

/// Returns the pipeline group identifier.
#[must_use]
pub fn pipeline_group_id(&self) -> &PipelineGroupId {
&self.pipeline_group_id
}

/// Returns the pipeline identifier.
#[must_use]
pub fn pipeline_id(&self) -> &PipelineId {
&self.pipeline_id
}

/// Returns a `group_id:pipeline_id` string representation.
#[must_use]
pub fn as_string(&self) -> String {
format!("{}:{}", self.pipeline_group_id, self.pipeline_id)
}
}

impl Serialize for PipelineKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let s = self.as_string();
serializer.serialize_str(&s)
}
}

/// Unique key for identifying a pipeline running on a specific core.
#[derive(Debug, Clone, Serialize)]
pub struct DeployedPipelineKey {
/// The unique ID of the pipeline group the pipeline belongs to.
pub pipeline_group_id: PipelineGroupId,

/// The unique ID of the pipeline within its group.
pub pipeline_id: PipelineId,

/// The CPU core ID the pipeline is pinned to.
pub core_id: CoreId,
}
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::Error;
use crate::thread_task::spawn_thread_local_task;
use core_affinity::CoreId;
use otap_df_config::DeployedPipelineKey;
use otap_df_config::engine::HttpAdminSettings;
use otap_df_config::{
PipelineGroupId, PipelineId,
Expand All @@ -32,10 +33,9 @@ use otap_df_engine::control::{
PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel,
};
use otap_df_engine::error::{Error as EngineError, error_summary_from};
use otap_df_state::DeployedPipelineKey;
use otap_df_state::event::{ErrorSummary, ObservedEvent};
use otap_df_state::reporter::ObservedEventReporter;
use otap_df_state::store::ObservedStateStore;
use otap_df_telemetry::event::{ErrorSummary, ObservedEvent};
use otap_df_telemetry::reporter::MetricsReporter;
use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn};
use std::thread;
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::node::{NodeId, NodeName};
use otap_df_channel::error::SendError;
use otap_df_config::node::NodeKind;
use otap_df_config::{NodeUrn, PortName};
use otap_df_state::event::ErrorSummary;
use otap_df_telemetry::event::ErrorSummary;
use std::borrow::Cow;
use std::fmt;

Expand Down
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use crate::context::PipelineContext;
use crate::control::{ControlSenders, NodeControlMsg, PipelineControlMsg, PipelineCtrlMsgReceiver};
use crate::error::Error;
use crate::pipeline_metrics::PipelineMetricsMonitor;
use otap_df_config::DeployedPipelineKey;
use otap_df_config::pipeline::TelemetrySettings;
use otap_df_state::DeployedPipelineKey;
use otap_df_state::event::{ErrorSummary, ObservedEvent};
use otap_df_state::reporter::ObservedEventReporter;
use otap_df_telemetry::event::{ErrorSummary, ObservedEvent};
use otap_df_telemetry::otel_warn;
use otap_df_telemetry::reporter::MetricsReporter;
use std::cmp::Reverse;
Expand Down
7 changes: 3 additions & 4 deletions rust/otap-dataflow/crates/engine/src/runtime_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Set of runtime pipeline configuration structures used by the engine and derived from the pipeline configuration.

use crate::channel_metrics::ChannelMetricsHandle;
use crate::context::PipelineContext;
use crate::control::{
ControlSenders, Controllable, NodeControlMsg, PipelineCtrlMsgReceiver, PipelineCtrlMsgSender,
};
Expand All @@ -12,12 +13,10 @@ use crate::node::{Node, NodeDefs, NodeId, NodeType, NodeWithPDataReceiver, NodeW
use crate::pipeline_ctrl::PipelineCtrlMsgManager;
use crate::terminal_state::TerminalState;
use crate::{exporter::ExporterWrapper, processor::ProcessorWrapper, receiver::ReceiverWrapper};
use otap_df_config::DeployedPipelineKey;
use otap_df_config::pipeline::PipelineConfig;
use otap_df_telemetry::reporter::MetricsReporter;

use crate::context::PipelineContext;
use otap_df_state::DeployedPipelineKey;
use otap_df_state::reporter::ObservedEventReporter;
use otap_df_telemetry::reporter::MetricsReporter;
use std::fmt::Debug;
use tokio::runtime::Builder;
use tokio::task::LocalSet;
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true

[dependencies]
otap-df-config = { path = "../config" }
otap-df-telemetry = { path = "../telemetry" }

serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/state/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

//! Errors for the state crate.

use crate::event::{EventType, ObservedEvent};
use crate::phase::PipelinePhase;
use otap_df_telemetry::event::{EventType, ObservedEvent};

/// All errors that can occur in the state crate.
#[derive(thiserror::Error, Debug)]
Expand Down
89 changes: 39 additions & 50 deletions rust/otap-dataflow/crates/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,65 @@

//! State stores

use otap_df_config::{PipelineGroupId, PipelineId};
use serde::{Serialize, Serializer};
use serde::ser::{SerializeSeq, Serializer};
use std::collections::VecDeque;

pub mod conditions;
pub mod error;
pub mod event;
pub mod phase;
mod pipeline_rt_status;
pub mod pipeline_status;
pub mod reporter;
pub mod store;

type CoreId = usize;
use otap_df_telemetry::event::ObservedEvent;
use serde::Serialize;

/// Unique key for identifying a pipeline within a pipeline group.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct PipelineKey {
pipeline_group_id: PipelineGroupId,
pipeline_id: PipelineId,
/// A ring buffer for storing recent observed events.
///
/// When the buffer reaches capacity, the oldest event is dropped to make room
/// for new events. Events are serialized in reverse order (newest first).
#[derive(Debug, Clone)]
pub struct ObservedEventRingBuffer {
buf: VecDeque<ObservedEvent>,
cap: usize,
}

impl PipelineKey {
/// Construct a new PipelineKey from group and pipeline ids.
#[must_use]
pub fn new(pipeline_group_id: PipelineGroupId, pipeline_id: PipelineId) -> Self {
Self {
pipeline_group_id,
pipeline_id,
impl Serialize for ObservedEventRingBuffer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.buf.len()))?;
for ev in self.buf.iter().rev() {
// <— reverse iteration
seq.serialize_element(ev)?;
}
seq.end()
}
}

/// Returns the pipeline group identifier.
impl ObservedEventRingBuffer {
/// Create a new ring buffer with the given capacity.
#[must_use]
pub fn pipeline_group_id(&self) -> &PipelineGroupId {
&self.pipeline_group_id
pub fn new(cap: usize) -> Self {
Self {
buf: VecDeque::with_capacity(cap),
cap,
}
}

/// Returns the pipeline identifier.
#[must_use]
pub fn pipeline_id(&self) -> &PipelineId {
&self.pipeline_id
/// Push an event into the ring buffer, dropping the oldest if full.
pub fn push(&mut self, event: ObservedEvent) {
if self.buf.len() == self.cap {
_ = self.buf.pop_front(); // drop oldest
}
self.buf.push_back(event);
}

/// Returns a `group_id:pipeline_id` string representation.
/// Returns true if the buffer is empty.
#[must_use]
pub fn as_string(&self) -> String {
format!("{}:{}", self.pipeline_group_id, self.pipeline_id)
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
}

impl Serialize for PipelineKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let s = format!("{}:{}", self.pipeline_group_id, self.pipeline_id);
serializer.serialize_str(&s)
}
}

/// Unique key for identifying a pipeline running on a specific core.
#[derive(Debug, Clone, Serialize)]
pub struct DeployedPipelineKey {
/// The unique ID of the pipeline group the pipeline belongs to.
pub pipeline_group_id: PipelineGroupId,

/// The unique ID of the pipeline within its group.
pub pipeline_id: PipelineId,

/// The CPU core ID the pipeline is pinned to.
/// Note: Not using core_affinity::CoreId directly to avoid dependency leakage in this public API
pub core_id: usize,
}
11 changes: 7 additions & 4 deletions rust/otap-dataflow/crates/state/src/pipeline_rt_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@

//! Observed per-core pipeline runtime status and phase transition logic.

use crate::ObservedEventRingBuffer;
use crate::conditions::{
Condition, ConditionKind, ConditionReason, ConditionState, ConditionStatus,
};
use crate::error::Error;
use crate::error::Error::InvalidTransition;
use crate::event::{ErrorEvent as ErrEv, RequestEvent as Req, SuccessEvent as OkEv};
use crate::event::{ErrorSummary, EventType, ObservedEvent, ObservedEventRingBuffer};
use crate::phase::{DeletionMode, FailReason, PipelinePhase};
use chrono::{DateTime, Utc};
use otap_df_telemetry::event::{
ErrorEvent as ErrEv, ErrorSummary, EventType, ObservedEvent, RequestEvent as Req,
SuccessEvent as OkEv,
};
use serde::Serialize;
use serde::ser::SerializeStruct;
use std::time::SystemTime;
Expand Down Expand Up @@ -558,13 +561,13 @@ impl Serialize for PipelineRuntimeStatus {
}

fn event_message(event: &ObservedEvent) -> Option<String> {
event.message().map(|s| s.to_string())
event.message.formatted()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::event::{
use otap_df_telemetry::event::{
ErrorEvent as ErrEv, ErrorSummary, RequestEvent as Req, SuccessEvent as OkEv,
};

Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/state/src/pipeline_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

//! Observed pipeline status and aggregation logic per core.

use crate::CoreId;
use crate::conditions::{
Condition, ConditionKind, ConditionReason, ConditionState, ConditionStatus,
};
use crate::phase::PipelinePhase;
use crate::pipeline_rt_status::PipelineRuntimeStatus;
use otap_df_config::CoreId;
use otap_df_config::health::{HealthPolicy, PhaseKind, Quorum};
use serde::Serialize;
use serde::ser::SerializeStruct;
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/state/src/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! A reporter of observed events.

use crate::event::ObservedEvent;
use otap_df_telemetry::event::ObservedEvent;
use std::time::Duration;

/// A sharable/clonable observed event reporter sending events to an `ObservedStore`.
Expand Down
Loading
Loading