Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ opentelemetry-proto = { version = "0.31", default-features = false, features = [
opentelemetry_sdk = "0.31.0"
opentelemetry-stdout = "0.31.0"
opentelemetry-otlp = "0.31.0"
opentelemetry-prometheus = "0.31.0"
parking_lot = "0.12.5"
paste = "1"
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"]}
portpicker = "0.1.1"
pretty_assertions = "1.4.1"
proc-macro2 = "1.0"
prometheus = "0.14.0"
prost = "0.14"
quote = "1.0"
rand = "0.9.2"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
settings:
default_pipeline_ctrl_msg_channel_size: 100
default_node_ctrl_msg_channel_size: 100
default_pdata_channel_size: 100

nodes:
receiver:
kind: receiver
plugin_urn: "urn:otel:otap:fake_data_generator:receiver"
out_ports:
out_port:
destinations:
- debug
dispatch_strategy: round_robin
config:
traffic_config:
max_signal_count: 1000
max_batch_size: 1000
signals_per_second: 1000
log_weight: 100
registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]
debug:
kind: processor
plugin_urn: "urn:otel:debug:processor"
out_ports:
out_port:
destinations:
- noop
dispatch_strategy: round_robin
config:
verbosity: basic
noop:
kind: exporter
plugin_urn: "urn:otel:noop:exporter"
config:

service:
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 9090
path: "/metrics"
views:
- selector:
instrument_name: "logs.produced"
stream:
name: "otlp.logs.produced.count"
description: "Count of logs produced"
resource:
service.name: "fake-debug-noop-service"
23 changes: 22 additions & 1 deletion rust/otap-dataflow/crates/config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ The telemetry configuration includes:

- **Metrics**: OpenTelemetry metrics for pipeline observability
- **Readers**: Periodic or pull-based metric readers
- **Exporters**: Console, OTLP (gRPC/HTTP), or custom exporters
- **Periodic Readers**: Export metrics at regular intervals
- **Pull Readers**: Expose metrics via HTTP endpoint for scraping (e.g., Prometheus)
- **Exporters**: Console, OTLP (gRPC/HTTP), or Prometheus
- **Views**: Metric aggregation and transformation rules
- **Temporality**: Delta or cumulative aggregation
- **Logs**: Internal logging configuration
Expand All @@ -89,11 +91,20 @@ service:
deployment.environment: "production"
metrics:
readers:
# Periodic reader - pushes metrics to OTLP endpoint
- periodic:
exporter:
otlp:
endpoint: "http://localhost:4318"
protocol: "grpc/protobuf"
interval: "60s"
# Pull reader - exposes metrics for Prometheus scraping
- pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 9090
path: "/metrics"
views:
- selector:
instrument_name: "logs.produced"
Expand All @@ -114,12 +125,22 @@ service:

#### Metric Exporters

##### Periodic Exporters

- **Console**: Prints metrics to stdout (useful for debugging)
- **OTLP**: OpenTelemetry Protocol exporters
- **grpc/protobuf**: Binary protocol over gRPC
- **http/protobuf**: Binary protobuf over HTTP
- **http/json**: JSON over HTTP

##### Pull Exporters

- **Prometheus**: Exposes metrics via HTTP endpoint for Prometheus scraping
- Configurable host, port, and path
- Exposes metrics in Prometheus text format
- Example: `http://0.0.0.0:9090/metrics`
- Compatible with Prometheus, Grafana, and other scraping systems

#### Log Exporters

- **Console**: Prints logs to stdout with structured formatting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
//! Readers level configurations.

pub mod periodic;
pub mod pull;

use std::time::Duration;

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::pipeline::service::telemetry::metrics::readers::periodic::MetricsPeriodicExporterConfig;
use crate::pipeline::service::telemetry::metrics::readers::{
periodic::MetricsPeriodicExporterConfig, pull::MetricsPullExporterConfig,
};

/// OpenTelemetry Metrics Reader configuration.
#[derive(Debug, Clone, Serialize, JsonSchema)]
Expand All @@ -19,9 +22,7 @@ pub enum MetricsReaderConfig {
/// Periodic reader that exports metrics at regular intervals.
Periodic(MetricsReaderPeriodicConfig),
/// Pull reader that allows on-demand metric collection.
Pull {
//TODO: Add specific configuration for supported pull readers.
},
Pull(MetricsReaderPullConfig),
}

/// OpenTelemetry Metrics Periodic Reader configuration.
Expand Down Expand Up @@ -50,16 +51,16 @@ impl<'de> Deserialize<'de> for MetricsReaderConfig {
#[serde(rename = "periodic")]
periodic: Option<MetricsReaderPeriodicConfig>,
#[serde(rename = "pull")]
pull: Option<()>,
pull: Option<MetricsReaderPullConfig>,
}

let reader_options_result = ReaderOptions::deserialize(deserializer);
match reader_options_result {
Ok(options) => {
if let Some(config) = options.periodic {
Ok(MetricsReaderConfig::Periodic(config))
} else if options.pull.is_some() {
Ok(MetricsReaderConfig::Pull {})
} else if let Some(config) = options.pull {
Ok(MetricsReaderConfig::Pull(config))
} else {
Err(serde::de::Error::custom(
"Expected either 'periodic' or 'pull' reader",
Expand All @@ -74,6 +75,13 @@ impl<'de> Deserialize<'de> for MetricsReaderConfig {
}
}

/// OpenTelemetry Metrics Pull Reader configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct MetricsReaderPullConfig {
/// The metrics exporter to use.
pub exporter: MetricsPullExporterConfig,
}

/// The temporality of the metrics to be exported.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -109,6 +117,26 @@ mod tests {
}
}

#[test]
fn test_metrics_reader_config_deserialize_pull() {
let yaml_str = r#"
pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 9090
"#;
let config: MetricsReaderConfig = serde_yaml::from_str(yaml_str).unwrap();

if let MetricsReaderConfig::Pull(pull_config) = config {
let MetricsPullExporterConfig::Prometheus(prometheus_config) = pull_config.exporter;
assert_eq!(prometheus_config.host, "0.0.0.0");
assert_eq!(prometheus_config.port, 9090);
} else {
panic!("Expected pull reader");
}
}

#[test]
fn test_temporality_deserialize() {
let yaml_str_cumulative = r#"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Pull reader level configurations.

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// OpenTelemetry Metrics Pull Exporter configuration.
#[derive(Debug, Clone, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum MetricsPullExporterConfig {
/// Prometheus exporter that exposes metrics for scraping.
Prometheus(PrometheusExporterConfig),
}

impl<'de> Deserialize<'de> for MetricsPullExporterConfig {
/// Custom deserialization to handle different exporter types.
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{MapAccess, Visitor};
use std::fmt;
struct MetricsPullExporterConfigVisitor;

impl<'de> Visitor<'de> for MetricsPullExporterConfigVisitor {
type Value = MetricsPullExporterConfig;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("a map with 'prometheus' key")
}

fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
if let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"prometheus" => {
let prometheus_config: PrometheusExporterConfig = map.next_value()?;
Ok(MetricsPullExporterConfig::Prometheus(prometheus_config))
}
_ => Err(serde::de::Error::unknown_field(&key, &["prometheus"])),
}
} else {
Err(serde::de::Error::custom("Expected 'prometheus' exporter"))
}
}
}

deserializer.deserialize_map(MetricsPullExporterConfigVisitor)
}
}

/// Prometheus Exporter configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct PrometheusExporterConfig {
/// The host address where the Prometheus exporter will expose metrics.
#[serde(default = "default_host")]
pub host: String,

/// The port on which the Prometheus exporter will listen for scrape requests.
#[serde(default = "default_port")]
pub port: u16,

/// The HTTP path where metrics will be exposed.
#[serde(default = "default_metrics_path")]
pub path: String,
}

fn default_host() -> String {
"0.0.0.0".to_string()
}

fn default_port() -> u16 {
9090
}

fn default_metrics_path() -> String {
"/metrics".to_string()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_metrics_pull_exporter_config_deserialize() {
let yaml_str = r#"
prometheus:
host: "127.0.0.1"
port: 9090
path: "/"
"#;
let config: MetricsPullExporterConfig = serde_yaml::from_str(yaml_str).unwrap();

let MetricsPullExporterConfig::Prometheus(prometheus_config) = config;
assert_eq!(prometheus_config.host, "127.0.0.1");
assert_eq!(prometheus_config.port, 9090);
assert_eq!(prometheus_config.path, "/");
}

#[test]
fn test_metrics_pull_exporter_invalid_config_deserialize() {
let yaml_str = r#"
unknown_exporter:
some_field: "value"
"#;
let result: Result<MetricsPullExporterConfig, _> = serde_yaml::from_str(yaml_str);
match result {
Ok(_) => panic!("Deserialization should have failed for unknown exporter"),
Err(err) => {
let err_msg = err.to_string();
assert!(err_msg.contains("unknown field"));
assert!(err_msg.contains("prometheus"));
}
}
}

#[test]
fn test_prometheus_exporter_config_deserialize() {
let yaml_str = r#"
host: "127.0.0.1"
port: 9090
path: "/custom_metrics"
"#;
let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 9090);
assert_eq!(config.path, "/custom_metrics");
}

#[test]
fn test_prometheus_exporter_config_default_path_deserialize() {
let yaml_str = r#"
host: "127.0.0.1"
port: 9090
"#;
let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 9090);
assert_eq!(config.path, "/metrics");
}

#[test]
fn test_prometheus_exporter_unknown_field_config_deserialize() {
let yaml_str = r#"
host: "0.0.0.0"
port: 8080
extra_field: "unexpected"
"#;
let result: Result<PrometheusExporterConfig, _> = serde_yaml::from_str(yaml_str);
match result {
Ok(_) => panic!("Deserialization should have failed for unknown field"),
Err(err) => {
let err_msg = err.to_string();
assert!(err_msg.contains("unknown field `extra_field`"));
}
}
}

#[test]
fn test_prometheus_exporter_config_defaults() {
let yaml_str = r#""#;
let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.host, "0.0.0.0");
assert_eq!(config.port, 9090);
assert_eq!(config.path, "/metrics");
}
}
Loading
Loading