Skip to content
Merged
47 changes: 24 additions & 23 deletions rust/otap-dataflow/benchmarks/benches/self_tracing/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,36 @@
//! Example: `encode/3_attrs/1000_events` = 300 µs → 300 ns per event

use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use otap_df_pdata::otlp::ProtoBuffer;
use otap_df_telemetry::self_tracing::{ConsoleWriter, DirectLogRecordEncoder, LogRecord};
use std::time::SystemTime;
use tracing::{Event, Subscriber};
use tracing_subscriber::layer::Layer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;

use otap_df_pdata::otlp::ProtoBuffer;
use otap_df_telemetry::self_tracing::{
ConsoleWriter, DirectLogRecordEncoder, LogRecord, SavedCallsite,
};

#[cfg(not(windows))]
use tikv_jemallocator::Jemalloc;

#[cfg(not(windows))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

/// The operation to perform on each event within the layer.
/// The operation to perform on each event within the layer. The cost
/// of generating the timestamp is not included in the measurement.
#[derive(Clone, Copy)]
enum BenchOp {
/// Encode the event into a LogRecord only.
Encode,
/// Encode once, then format N times.
/// Encode the event body into a new LogRecord. This includes
/// encoding the body and attributes, not callsite details or
/// timestamp.
NewRecord,
/// Encode once, then format standard representation (with
/// timestamp) N times.
Format,
/// Encode and format together N times.
/// Encode and format standard representation (with timestamp) N
/// times.
EncodeAndFormat,
/// Encode to protobuf N times.
/// Encode to complete protobuf (with timestamp) N times.
EncodeProto,
}

Expand All @@ -58,8 +61,9 @@ where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let now = SystemTime::now();
match self.op {
BenchOp::Encode => {
BenchOp::NewRecord => {
for _ in 0..self.iterations {
let record = LogRecord::new(event);
let _ = std::hint::black_box(record);
Expand All @@ -69,10 +73,9 @@ where
// Encode once, format N times
let record = LogRecord::new(event);
let writer = ConsoleWriter::no_color();
let callsite = SavedCallsite::new(event.metadata());

for _ in 0..self.iterations {
let line = writer.format_log_record(&record, &callsite);
let line = writer.format_log_record(Some(now), &record);
let _ = std::hint::black_box(line);
}
}
Expand All @@ -81,19 +84,17 @@ where

for _ in 0..self.iterations {
let record = LogRecord::new(event);
let callsite = SavedCallsite::new(event.metadata());
let line = writer.format_log_record(&record, &callsite);
let line = writer.format_log_record(Some(now), &record);
let _ = std::hint::black_box(line);
}
}
BenchOp::EncodeProto => {
let mut buf = ProtoBuffer::new();
let mut encoder = DirectLogRecordEncoder::new(&mut buf);
let callsite = SavedCallsite::new(event.metadata());

for _ in 0..self.iterations {
encoder.clear();
let size = encoder.encode_log_record(LogRecord::new(event), &callsite);
let size = encoder.encode_log_record(now, &LogRecord::new(event));
let _ = std::hint::black_box(size);
}
}
Expand Down Expand Up @@ -169,16 +170,16 @@ fn bench_op(c: &mut Criterion, group_name: &str, op: BenchOp) {
group.finish();
}

fn bench_encode(c: &mut Criterion) {
bench_op(c, "encode", BenchOp::Encode);
fn bench_new_record(c: &mut Criterion) {
bench_op(c, "new_record", BenchOp::NewRecord);
}

fn bench_format(c: &mut Criterion) {
bench_op(c, "format", BenchOp::Format);
}

fn bench_encode_and_format(c: &mut Criterion) {
bench_op(c, "encode_and_format", BenchOp::EncodeAndFormat);
fn bench_format_new_record(c: &mut Criterion) {
bench_op(c, "format_new_record", BenchOp::EncodeAndFormat);
}

fn bench_encode_proto(c: &mut Criterion) {
Expand All @@ -192,7 +193,7 @@ mod bench_entry {
criterion_group!(
name = benches;
config = Criterion::default();
targets = bench_encode, bench_format, bench_encode_and_format, bench_encode_proto
targets = bench_new_record, bench_format, bench_format_new_record, bench_encode_proto
);
}

Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/admin/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use axum::http::StatusCode;
use axum::routing::get;
use axum::{Json, Router};
use chrono::Utc;
use otap_df_state::PipelineKey;
use otap_df_config::PipelineKey;
use otap_df_state::conditions::{Condition, ConditionKind, ConditionReason, ConditionStatus};
use otap_df_state::pipeline_status::PipelineStatus;
use serde::Serialize;
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/admin/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{Json, Router};
use otap_df_state::PipelineKey;
use otap_df_config::PipelineKey;
use otap_df_state::pipeline_status::PipelineStatus;

/// All the routes for pipelines.
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/admin/src/pipeline_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::Utc;
use otap_df_state::PipelineKey;
use otap_df_config::PipelineKey;
use otap_df_state::pipeline_status::PipelineStatus;
use otap_df_telemetry::otel_info;
use serde::{Deserialize, Serialize};
Expand Down
66 changes: 65 additions & 1 deletion rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
//! A data plane is a collection of pipeline groups, where each group can have multiple pipelines.
//! A pipeline is a collection of nodes interconnected in a directed acyclic graph (DAG).

use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, ser::Serializer};
use std::borrow::Cow;
use std::hash::Hash;

pub mod byte_units;
pub mod engine;
Expand Down Expand Up @@ -66,3 +67,66 @@ pub type PortName = Cow<'static, str>;

/// The description of a pipeline or a node.
pub type Description = Cow<'static, str>;

/// Type alias for CPU core identifier.
/// Note: Not using core_affinity::CoreId directly to avoid dependency leakage in this public API
pub type CoreId = usize;

/// Unique key for identifying a pipeline within a pipeline group.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct PipelineKey {
pipeline_group_id: PipelineGroupId,
pipeline_id: PipelineId,
}

impl PipelineKey {
/// Construct a new PipelineKey from group and pipeline ids.
#[must_use]
pub fn new(pipeline_group_id: PipelineGroupId, pipeline_id: PipelineId) -> Self {
Self {
pipeline_group_id,
pipeline_id,
}
}

/// Returns the pipeline group identifier.
#[must_use]
pub fn pipeline_group_id(&self) -> &PipelineGroupId {
&self.pipeline_group_id
}

/// Returns the pipeline identifier.
#[must_use]
pub fn pipeline_id(&self) -> &PipelineId {
&self.pipeline_id
}

/// Returns a `group_id:pipeline_id` string representation.
#[must_use]
pub fn as_string(&self) -> String {
format!("{}:{}", self.pipeline_group_id, self.pipeline_id)
}
}

impl Serialize for PipelineKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let s = self.as_string();
serializer.serialize_str(&s)
}
}

/// Unique key for identifying a pipeline running on a specific core.
#[derive(Debug, Clone, Serialize)]
pub struct DeployedPipelineKey {
/// The unique ID of the pipeline group the pipeline belongs to.
pub pipeline_group_id: PipelineGroupId,

/// The unique ID of the pipeline within its group.
pub pipeline_id: PipelineId,

/// The CPU core ID the pipeline is pinned to.
pub core_id: CoreId,
}
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::Error;
use crate::thread_task::spawn_thread_local_task;
use core_affinity::CoreId;
use otap_df_config::DeployedPipelineKey;
use otap_df_config::engine::HttpAdminSettings;
use otap_df_config::{
PipelineGroupId, PipelineId,
Expand All @@ -33,10 +34,9 @@ use otap_df_engine::control::{
};
use otap_df_engine::entity_context::set_pipeline_entity_key;
use otap_df_engine::error::{Error as EngineError, error_summary_from};
use otap_df_state::DeployedPipelineKey;
use otap_df_state::event::{ErrorSummary, ObservedEvent};
use otap_df_state::reporter::ObservedEventReporter;
use otap_df_state::store::ObservedStateStore;
use otap_df_telemetry::event::{ErrorSummary, ObservedEvent};
use otap_df_telemetry::reporter::MetricsReporter;
use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn};
use std::thread;
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::node::{NodeId, NodeName};
use otap_df_channel::error::SendError;
use otap_df_config::node::NodeKind;
use otap_df_config::{NodeUrn, PortName};
use otap_df_state::event::ErrorSummary;
use otap_df_telemetry::event::ErrorSummary;
use std::borrow::Cow;
use std::fmt;

Expand Down
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use crate::context::PipelineContext;
use crate::control::{ControlSenders, NodeControlMsg, PipelineControlMsg, PipelineCtrlMsgReceiver};
use crate::error::Error;
use crate::pipeline_metrics::PipelineMetricsMonitor;
use otap_df_config::DeployedPipelineKey;
use otap_df_config::pipeline::TelemetrySettings;
use otap_df_state::DeployedPipelineKey;
use otap_df_state::event::{ErrorSummary, ObservedEvent};
use otap_df_state::reporter::ObservedEventReporter;
use otap_df_telemetry::event::{ErrorSummary, ObservedEvent};
use otap_df_telemetry::otel_warn;
use otap_df_telemetry::reporter::MetricsReporter;
use std::cmp::Reverse;
Expand Down
7 changes: 3 additions & 4 deletions rust/otap-dataflow/crates/engine/src/runtime_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Set of runtime pipeline configuration structures used by the engine and derived from the pipeline configuration.

use crate::channel_metrics::ChannelMetricsHandle;
use crate::context::PipelineContext;
use crate::control::{
ControlSenders, Controllable, NodeControlMsg, PipelineCtrlMsgReceiver, PipelineCtrlMsgSender,
};
Expand All @@ -13,12 +14,10 @@ use crate::node::{Node, NodeDefs, NodeId, NodeType, NodeWithPDataReceiver, NodeW
use crate::pipeline_ctrl::PipelineCtrlMsgManager;
use crate::terminal_state::TerminalState;
use crate::{exporter::ExporterWrapper, processor::ProcessorWrapper, receiver::ReceiverWrapper};
use otap_df_config::DeployedPipelineKey;
use otap_df_config::pipeline::PipelineConfig;
use otap_df_telemetry::reporter::MetricsReporter;

use crate::context::PipelineContext;
use otap_df_state::DeployedPipelineKey;
use otap_df_state::reporter::ObservedEventReporter;
use otap_df_telemetry::reporter::MetricsReporter;
use std::fmt::Debug;
use tokio::runtime::Builder;
use tokio::task::LocalSet;
Expand Down
3 changes: 1 addition & 2 deletions rust/otap-dataflow/crates/otap/tests/pipeline_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
//! and metric sets are unregistered to avoid registry leaks.

use otap_df_config::pipeline::{PipelineConfig, PipelineConfigBuilder, PipelineType};
use otap_df_config::{PipelineGroupId, PipelineId};
use otap_df_config::{DeployedPipelineKey, PipelineGroupId, PipelineId};
use otap_df_engine::context::ControllerContext;
use otap_df_engine::control::{PipelineControlMsg, pipeline_ctrl_msg_channel};
use otap_df_engine::entity_context::set_pipeline_entity_key;
use otap_df_otap::OTAP_PIPELINE_FACTORY;
use otap_df_otap::fake_data_generator::OTAP_FAKE_DATA_GENERATOR_URN;
use otap_df_otap::fake_data_generator::config::{Config as FakeDataGeneratorConfig, TrafficConfig};
use otap_df_state::DeployedPipelineKey;
use otap_df_state::store::ObservedStateStore;
use otap_df_telemetry::InternalTelemetrySystem;
use serde_json::to_value;
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true

[dependencies]
otap-df-config = { path = "../config" }
otap-df-telemetry = { path = "../telemetry" }

serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/state/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

//! Errors for the state crate.

use crate::event::{EventType, ObservedEvent};
use crate::phase::PipelinePhase;
use otap_df_telemetry::event::{EventType, ObservedEvent};

/// All errors that can occur in the state crate.
#[derive(thiserror::Error, Debug)]
Expand Down
Loading
Loading