diff --git a/changelog.d/influx_serializer.enhancement.md b/changelog.d/influx_serializer.enhancement.md new file mode 100644 index 0000000000000..92fc6c4ad8dde --- /dev/null +++ b/changelog.d/influx_serializer.enhancement.md @@ -0,0 +1,3 @@ +Add an `influxdb` encoding codec so generic HTTP and WebSocket sinks can emit metrics using Influx line protocol without the dedicated InfluxDB sink. + +authors: elohmeier diff --git a/lib/codecs/src/encoding/format/influxdb.rs b/lib/codecs/src/encoding/format/influxdb.rs new file mode 100644 index 0000000000000..592ddcb04fa7d --- /dev/null +++ b/lib/codecs/src/encoding/format/influxdb.rs @@ -0,0 +1,583 @@ +use std::{collections::HashMap, sync::Arc}; + +use bytes::{BufMut, BytesMut}; +use chrono::{DateTime, Utc}; +use derivative::Derivative; +use snafu::Snafu; +use vector_config::configurable_component; +use vector_core::{ + config::DataType, + event::{ + Event, KeyString, MetricTags, + metric::{Metric, MetricSketch, MetricValue, Quantile, Sample, StatisticKind}, + }, + schema, +}; + +/// Representation of a field value in the Influx line protocol. +#[derive(Clone, Debug)] +pub enum Field { + /// String field. + String(String), + /// Floating point field. + Float(f64), + /// Unsigned integer field. + UnsignedInt(u64), + /// Signed integer field. + Int(i64), + /// Boolean field. + Bool(bool), +} + +/// Influx line protocol version. +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)] +#[derivative(Default)] +#[serde(rename_all = "snake_case")] +pub enum ProtocolVersion { + /// Line protocol for InfluxDB v1.x. + V1, + /// Line protocol for InfluxDB v2.x. + #[derivative(Default)] + V2, +} + +/// Serializer configuration for encoding metrics as Influx line protocol. +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct InfluxLineProtocolSerializerConfig { + /// The protocol version to encode fields with. + #[serde(default)] + pub protocol_version: ProtocolVersion, + + /// A default namespace applied when metrics do not specify one. + #[serde(default)] + pub default_namespace: Option, + + /// Additional tags that are appended to every encoded metric. + #[configurable(metadata( + docs::additional_props_description = "A tag key/value pair that is appended to each measurement." + ))] + #[serde(default)] + pub tags: Option>, + + /// Quantiles to calculate when encoding distribution metrics. + #[serde(default = "default_summary_quantiles")] + pub quantiles: Vec, +} + +impl InfluxLineProtocolSerializerConfig { + /// Build the serializer from this configuration. + pub fn build(&self) -> InfluxLineProtocolSerializer { + InfluxLineProtocolSerializer::new(self.clone()) + } + + /// The data type of events accepted by this serializer. + pub fn input_type(&self) -> DataType { + DataType::Metric + } + + /// The schema requirement for events encoded by this serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + schema::Requirement::empty() + } +} + +/// Default quantiles that align with the existing InfluxDB sink behaviour. +pub fn default_summary_quantiles() -> Vec { + vec![0.5, 0.75, 0.9, 0.95, 0.99] +} + +/// Serializer that emits metrics encoded with the Influx line protocol. +#[derive(Clone, Debug)] +pub struct InfluxLineProtocolSerializer { + protocol_version: ProtocolVersion, + default_namespace: Option, + tags: Option>>, + quantiles: Arc>, +} + +impl InfluxLineProtocolSerializer { + /// Creates a new serializer from the provided configuration. + pub fn new(config: InfluxLineProtocolSerializerConfig) -> Self { + Self { + protocol_version: config.protocol_version, + default_namespace: config.default_namespace, + tags: config.tags.map(Arc::new), + quantiles: Arc::new(config.quantiles), + } + } + + fn encode_metric( + &self, + metric: Metric, + output: &mut BytesMut, + ) -> Result<(), InfluxLineProtocolSerializerError> { + let namespace = metric.namespace().or(self.default_namespace.as_deref()); + let measurement = encode_namespace(namespace, '.', metric.name()); + let timestamp = encode_timestamp(metric.timestamp()); + let tags = merge_tags(&metric, self.tags.as_deref()); + let (metric_type, fields) = get_type_and_fields(metric.value(), &self.quantiles); + + let mut merged_tags = tags.unwrap_or_default(); + merged_tags.replace("metric_type".to_owned(), metric_type.to_owned()); + + influx_line_protocol( + self.protocol_version, + &measurement, + Some(merged_tags), + fields, + timestamp, + output, + ) + .map_err(|message| InfluxLineProtocolSerializerError::LineProtocol { message }) + } +} + +impl tokio_util::codec::Encoder for InfluxLineProtocolSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, item: Event, dst: &mut BytesMut) -> Result<(), Self::Error> { + match item { + Event::Metric(metric) => self.encode_metric(metric, dst).map_err(Into::into), + Event::Log(_) => { + Err(InfluxLineProtocolSerializerError::InvalidEventType { actual: "log" }.into()) + } + Event::Trace(_) => { + Err(InfluxLineProtocolSerializerError::InvalidEventType { actual: "trace" }.into()) + } + } + } +} + +/// Errors returned by the Influx line protocol serializer. +#[derive(Debug, Snafu)] +pub enum InfluxLineProtocolSerializerError { + /// The encoder received an event that was not a metric. + #[snafu(display("Influx line protocol encoding expects metric events, got {actual}"))] + InvalidEventType { + /// The kind of non-metric event encountered. + actual: &'static str, + }, + /// The encoder could not construct a valid line protocol frame. + #[snafu(display("Influx line protocol error: {message}"))] + LineProtocol { + /// The message describing why line protocol generation failed. + message: &'static str, + }, +} + +fn merge_tags(metric: &Metric, extra: Option<&HashMap>) -> Option { + match (metric.tags().cloned(), extra) { + (Some(mut metric_tags), Some(extra)) => { + metric_tags.extend( + extra + .iter() + .map(|(key, value)| (key.clone(), value.clone())), + ); + Some(metric_tags) + } + (Some(metric_tags), None) => Some(metric_tags), + (None, Some(extra)) => Some( + extra + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect(), + ), + (None, None) => None, + } +} + +fn get_type_and_fields( + value: &MetricValue, + quantiles: &[f64], +) -> (&'static str, Option>) { + match value { + MetricValue::Counter { value } => ("counter", Some(to_fields(*value))), + MetricValue::Gauge { value } => ("gauge", Some(to_fields(*value))), + MetricValue::Set { values } => ("set", Some(to_fields(values.len() as f64))), + MetricValue::AggregatedHistogram { + buckets, + count, + sum, + } => { + let mut fields: HashMap = buckets + .iter() + .map(|sample| { + ( + format!("bucket_{}", sample.upper_limit).into(), + Field::UnsignedInt(sample.count), + ) + }) + .collect(); + fields.insert("count".into(), Field::UnsignedInt(*count)); + fields.insert("sum".into(), Field::Float(*sum)); + + ("histogram", Some(fields)) + } + MetricValue::AggregatedSummary { + quantiles, + count, + sum, + } => { + let mut fields: HashMap = quantiles + .iter() + .map(|quantile| { + ( + format!("quantile_{}", quantile.quantile).into(), + Field::Float(quantile.value), + ) + }) + .collect(); + fields.insert("count".into(), Field::UnsignedInt(*count)); + fields.insert("sum".into(), Field::Float(*sum)); + + ("summary", Some(fields)) + } + MetricValue::Distribution { samples, statistic } => { + let quantiles = match statistic { + StatisticKind::Histogram => &[0.95] as &[f64], + StatisticKind::Summary => quantiles, + }; + let fields = encode_distribution(samples, quantiles); + ("distribution", fields) + } + MetricValue::Sketch { sketch } => match sketch { + MetricSketch::AgentDDSketch(ddsketch) => { + let mut fields = [0.5, 0.75, 0.9, 0.99] + .iter() + .map(|q| { + let quantile = Quantile { + quantile: *q, + value: ddsketch.quantile(*q).unwrap_or(0.0), + }; + ( + quantile.to_percentile_string().into(), + Field::Float(quantile.value), + ) + }) + .collect::>(); + fields.insert( + "count".into(), + Field::UnsignedInt(u64::from(ddsketch.count())), + ); + fields.insert( + "min".into(), + Field::Float(ddsketch.min().unwrap_or(f64::MAX)), + ); + fields.insert( + "max".into(), + Field::Float(ddsketch.max().unwrap_or(f64::MIN)), + ); + fields.insert("sum".into(), Field::Float(ddsketch.sum().unwrap_or(0.0))); + fields.insert("avg".into(), Field::Float(ddsketch.avg().unwrap_or(0.0))); + + ("sketch", Some(fields)) + } + }, + } +} + +fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option> { + let statistic = DistributionStatistic::from_samples(samples, quantiles)?; + + Some( + [ + ("min".into(), Field::Float(statistic.min)), + ("max".into(), Field::Float(statistic.max)), + ("median".into(), Field::Float(statistic.median)), + ("avg".into(), Field::Float(statistic.avg)), + ("sum".into(), Field::Float(statistic.sum)), + ("count".into(), Field::Float(statistic.count as f64)), + ] + .into_iter() + .chain( + statistic + .quantiles + .iter() + .map(|&(p, val)| (format!("quantile_{p:.2}").into(), Field::Float(val))), + ) + .collect(), + ) +} + +fn to_fields(value: f64) -> HashMap { + [("value".into(), Field::Float(value))] + .into_iter() + .collect() +} + +fn encode_namespace(namespace: Option<&str>, delimiter: char, name: &str) -> String { + namespace + .map(|namespace| format!("{namespace}{delimiter}{name}")) + .unwrap_or_else(|| name.to_owned()) +} + +/// Encode a full InfluxDB line protocol entry into the provided buffer. +pub fn influx_line_protocol( + protocol_version: ProtocolVersion, + measurement: &str, + tags: Option, + fields: Option>, + timestamp: i64, + buffer: &mut BytesMut, +) -> Result<(), &'static str> { + let fields = fields.unwrap_or_default(); + if fields.is_empty() { + return Err("fields must not be empty"); + } + + encode_string(measurement, buffer); + + if let Some(tags) = tags { + let mut tags_buffer = BytesMut::new(); + encode_tags(tags, &mut tags_buffer); + if !tags_buffer.is_empty() { + buffer.put_u8(b','); + buffer.extend_from_slice(&tags_buffer); + } + } + + buffer.put_u8(b' '); + encode_fields(protocol_version, fields, buffer); + buffer.put_u8(b' '); + buffer.put_slice(×tamp.to_string().into_bytes()); + buffer.put_u8(b'\n'); + Ok(()) +} + +fn encode_tags(tags: MetricTags, output: &mut BytesMut) { + let original_len = output.len(); + for (key, value) in tags.iter_single() { + if key.is_empty() || value.is_empty() { + continue; + } + encode_string(key, output); + output.put_u8(b'='); + encode_string(value, output); + output.put_u8(b','); + } + + if output.len() > original_len { + output.truncate(output.len() - 1); + } +} + +fn encode_fields( + protocol_version: ProtocolVersion, + fields: HashMap, + output: &mut BytesMut, +) { + let original_len = output.len(); + + for (key, value) in fields.into_iter() { + encode_string(&key, output); + output.put_u8(b'='); + + match value { + Field::String(s) => { + output.put_u8(b'"'); + for ch in s.chars() { + if "\\\"".contains(ch) { + output.put_u8(b'\\'); + } + let mut buffer: [u8; 4] = [0; 4]; + output.put_slice(ch.encode_utf8(&mut buffer).as_bytes()); + } + output.put_u8(b'"'); + } + Field::Float(f) => output.put_slice(&f.to_string().into_bytes()), + Field::UnsignedInt(i) => { + output.put_slice(&i.to_string().into_bytes()); + let suffix = match protocol_version { + ProtocolVersion::V1 => 'i', + ProtocolVersion::V2 => 'u', + }; + let mut buffer: [u8; 4] = [0; 4]; + output.put_slice(suffix.encode_utf8(&mut buffer).as_bytes()); + } + Field::Int(i) => { + output.put_slice(&i.to_string().into_bytes()); + output.put_u8(b'i'); + } + Field::Bool(b) => { + output.put_slice(&b.to_string().into_bytes()); + } + } + + output.put_u8(b','); + } + + if output.len() > original_len { + output.truncate(output.len() - 1); + } +} + +fn encode_string(value: &str, output: &mut BytesMut) { + for ch in value.chars() { + if "\\, =".contains(ch) { + output.put_u8(b'\\'); + } + let mut buffer: [u8; 4] = [0; 4]; + output.put_slice(ch.encode_utf8(&mut buffer).as_bytes()); + } +} + +/// Converts an optional timestamp into the nanoseconds expected by the protocol. +pub fn encode_timestamp(timestamp: Option>) -> i64 { + match timestamp { + Some(timestamp) => timestamp.timestamp_nanos_opt().unwrap(), + None => encode_timestamp(Some(Utc::now())), + } +} + +#[derive(Debug, Clone)] +struct DistributionStatistic { + min: f64, + max: f64, + median: f64, + avg: f64, + sum: f64, + count: u64, + quantiles: Vec<(f64, f64)>, +} + +impl DistributionStatistic { + fn from_samples(source: &[Sample], quantiles: &[f64]) -> Option { + let mut bins = source + .iter() + .filter(|sample| sample.rate > 0) + .copied() + .collect::>(); + + match bins.len() { + 0 => None, + 1 => { + let val = bins[0].value; + let count = bins[0].rate; + Some(Self { + min: val, + max: val, + median: val, + avg: val, + sum: val * count as f64, + count: count as u64, + quantiles: quantiles.iter().map(|&p| (p, val)).collect(), + }) + } + _ => { + bins.sort_unstable_by(|a, b| a.value.total_cmp(&b.value)); + + let min = bins.first().unwrap().value; + let max = bins.last().unwrap().value; + let sum = bins + .iter() + .map(|sample| sample.value * sample.rate as f64) + .sum::(); + + for index in 1..bins.len() { + bins[index].rate += bins[index - 1].rate; + } + + let count = bins.last().unwrap().rate; + let avg = sum / count as f64; + + let median = find_quantile(&bins, 0.5); + let quantiles = quantiles + .iter() + .map(|&p| (p, find_quantile(&bins, p))) + .collect(); + + Some(Self { + min, + max, + median, + avg, + sum, + count: count as u64, + quantiles, + }) + } + } + } +} + +fn find_quantile(bins: &[Sample], p: f64) -> f64 { + let count = bins.last().expect("bins is empty").rate; + find_sample(bins, (p * count as f64).round() as u32) +} + +fn find_sample(bins: &[Sample], i: u32) -> f64 { + let index = match bins.binary_search_by_key(&i, |sample| sample.rate) { + Ok(index) => index, + Err(index) => index, + }; + bins[index].value +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + use vector_core::{ + config::DataType, + event::{ + MetricTags, + metric::{Metric, MetricKind, MetricValue}, + }, + }; + + #[test] + fn encode_counter_metric() { + let mut serializer = + InfluxLineProtocolSerializer::new(InfluxLineProtocolSerializerConfig { + default_namespace: Some("ns".to_string()), + ..Default::default() + }); + let metric = Metric::new( + "total", + MetricKind::Incremental, + MetricValue::Counter { value: 1.5 }, + ) + .with_timestamp(Some(Utc::now())); + + let mut buffer = BytesMut::new(); + serializer + .encode(Event::Metric(metric), &mut buffer) + .unwrap(); + assert!( + String::from_utf8(buffer.to_vec()) + .unwrap() + .starts_with("ns.total,metric_type=counter value=1.5 ") + ); + } + + #[test] + fn encode_tags_are_escaped() { + let measurement = "cpu"; + let mut buffer = BytesMut::new(); + let mut tags = MetricTags::default(); + tags.insert("host name".to_owned(), "a=b".to_owned()); + influx_line_protocol( + ProtocolVersion::V2, + measurement, + Some(tags), + Some(HashMap::from([("v".into(), Field::Float(1.0))])), + 42, + &mut buffer, + ) + .unwrap(); + + assert_eq!( + String::from_utf8(buffer.to_vec()).unwrap(), + "cpu,host\\ name=a\\=b v=1 42\n" + ); + } + + #[test] + fn serializer_schema() { + let config = InfluxLineProtocolSerializerConfig::default(); + assert_eq!(config.input_type(), DataType::Metric); + } +} diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 9377cdca5d906..ee984131ab5e2 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -8,6 +8,7 @@ mod cef; mod common; mod csv; mod gelf; +mod influxdb; mod json; mod logfmt; mod native; @@ -24,6 +25,11 @@ pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions}; pub use cef::{CefSerializer, CefSerializerConfig}; use dyn_clone::DynClone; pub use gelf::{GelfSerializer, GelfSerializerConfig}; +pub use influxdb::{ + Field, InfluxLineProtocolSerializer, InfluxLineProtocolSerializerConfig, + InfluxLineProtocolSerializerError, ProtocolVersion, default_summary_quantiles, + encode_timestamp, influx_line_protocol, +}; pub use json::{JsonSerializer, JsonSerializerConfig, JsonSerializerOptions}; pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig}; pub use native::{NativeSerializer, NativeSerializerConfig}; diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 8dd2c4ddc79a5..314cb1e62eb4c 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -9,9 +9,10 @@ pub use chunking::{Chunker, Chunking, GelfChunker}; pub use format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer, CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, - JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer, - LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, + InfluxLineProtocolSerializer, InfluxLineProtocolSerializerConfig, + InfluxLineProtocolSerializerError, JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, + LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, + NativeSerializer, NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index eef088fca4b72..144e913fe1d1e 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -8,10 +8,11 @@ use super::chunking::Chunker; use super::format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer, CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, - JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, - NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, - ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig, - TextSerializer, TextSerializerConfig, + InfluxLineProtocolSerializer, InfluxLineProtocolSerializerConfig, JsonSerializer, + JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, + NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, ProtobufSerializer, + ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, + TextSerializerConfig, }; #[cfg(feature = "opentelemetry")] use super::format::{OtlpSerializer, OtlpSerializerConfig}; @@ -124,6 +125,9 @@ pub enum SerializerConfig { /// transform) and removing the message field while doing additional parsing on it, as this /// could lead to the encoding emitting empty strings for the given event. Text(TextSerializerConfig), + + /// Encodes metric events using the InfluxDB line protocol. + Influxdb(InfluxLineProtocolSerializerConfig), } impl Default for SerializerConfig { @@ -199,6 +203,12 @@ impl From for SerializerConfig { } } +impl From for SerializerConfig { + fn from(config: InfluxLineProtocolSerializerConfig) -> Self { + Self::Influxdb(config) + } +} + impl From for SerializerConfig { fn from(config: TextSerializerConfig) -> Self { Self::Text(config) @@ -230,6 +240,7 @@ impl SerializerConfig { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())), + SerializerConfig::Influxdb(config) => Ok(Serializer::Influxdb(config.build())), } } @@ -261,7 +272,8 @@ impl SerializerConfig { | SerializerConfig::Logfmt | SerializerConfig::NativeJson | SerializerConfig::RawMessage - | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited, + | SerializerConfig::Text(_) + | SerializerConfig::Influxdb(_) => FramingConfig::NewlineDelimited, SerializerConfig::Gelf(_) => { FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0)) } @@ -286,6 +298,7 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), + SerializerConfig::Influxdb(config) => config.input_type(), } } @@ -307,6 +320,7 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), + SerializerConfig::Influxdb(config) => config.schema_requirement(), } } } @@ -335,6 +349,8 @@ pub enum Serializer { Otlp(OtlpSerializer), /// Uses a `ProtobufSerializer` for serialization. Protobuf(ProtobufSerializer), + /// Uses an `InfluxLineProtocolSerializer` for serialization. + Influxdb(InfluxLineProtocolSerializer), /// Uses a `RawMessageSerializer` for serialization. RawMessage(RawMessageSerializer), /// Uses a `TextSerializer` for serialization. @@ -352,6 +368,7 @@ impl Serializer { | Serializer::Logfmt(_) | Serializer::Text(_) | Serializer::Native(_) + | Serializer::Influxdb(_) | Serializer::Protobuf(_) | Serializer::RawMessage(_) => false, #[cfg(feature = "opentelemetry")] @@ -376,6 +393,7 @@ impl Serializer { | Serializer::Logfmt(_) | Serializer::Text(_) | Serializer::Native(_) + | Serializer::Influxdb(_) | Serializer::Protobuf(_) | Serializer::RawMessage(_) => { panic!("Serializer does not support JSON") @@ -413,7 +431,8 @@ impl Serializer { | Serializer::Gelf(_) | Serializer::Json(_) | Serializer::Text(_) - | Serializer::NativeJson(_) => false, + | Serializer::NativeJson(_) + | Serializer::Influxdb(_) => false, } } } @@ -479,6 +498,12 @@ impl From for Serializer { } } +impl From for Serializer { + fn from(serializer: InfluxLineProtocolSerializer) -> Self { + Self::Influxdb(serializer) + } +} + impl From for Serializer { fn from(serializer: RawMessageSerializer) -> Self { Self::RawMessage(serializer) @@ -507,6 +532,7 @@ impl tokio_util::codec::Encoder for Serializer { #[cfg(feature = "opentelemetry")] Serializer::Otlp(serializer) => serializer.encode(event, buffer), Serializer::Protobuf(serializer) => serializer.encode(event, buffer), + Serializer::Influxdb(serializer) => serializer.encode(event, buffer), Serializer::RawMessage(serializer) => serializer.encode(event, buffer), Serializer::Text(serializer) => serializer.encode(event, buffer), } diff --git a/lib/codecs/src/lib.rs b/lib/codecs/src/lib.rs index e386bb30f378a..95fd49aabe5e0 100644 --- a/lib/codecs/src/lib.rs +++ b/lib/codecs/src/lib.rs @@ -22,11 +22,13 @@ pub use decoding::{ pub use decoding::{SyslogDeserializer, SyslogDeserializerConfig}; pub use encoding::{ BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder, CharacterDelimitedEncoderConfig, - CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, - JsonSerializerConfig, LengthDelimitedEncoder, LengthDelimitedEncoderConfig, LogfmtSerializer, - LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, - RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, + CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, + InfluxLineProtocolSerializer, InfluxLineProtocolSerializerConfig, + InfluxLineProtocolSerializerError, JsonSerializer, JsonSerializerConfig, + LengthDelimitedEncoder, LengthDelimitedEncoderConfig, LogfmtSerializer, LogfmtSerializerConfig, + NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, + NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, RawMessageSerializer, + RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; pub use gelf::{VALID_FIELD_REGEX, gelf_fields}; use vector_config_macros::configurable_component; diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index a04f44315047a..64cfa65b15598 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -130,7 +130,8 @@ impl EncodingConfigWithFraming { | Serializer::Logfmt(_) | Serializer::NativeJson(_) | Serializer::RawMessage(_) - | Serializer::Text(_), + | Serializer::Text(_) + | Serializer::Influxdb(_), ) => NewlineDelimitedEncoder::default().into(), #[cfg(feature = "codecs-opentelemetry")] (None, Serializer::Otlp(_)) => BytesEncoder.into(), diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index f1d0741bb669c..0300780e13ec7 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -125,7 +125,8 @@ impl Encoder { | Serializer::Logfmt(_) | Serializer::NativeJson(_) | Serializer::RawMessage(_) - | Serializer::Text(_), + | Serializer::Text(_) + | Serializer::Influxdb(_), _, ) => "text/plain", #[cfg(feature = "codecs-opentelemetry")] diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 67c22910e07b0..79d0e1fee0b41 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -236,6 +236,7 @@ fn serializer_config_to_deserializer( }, }) } + SerializerConfig::Influxdb(_) => DeserializerConfig::Influxdb(Default::default()), SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, #[cfg(feature = "codecs-opentelemetry")] SerializerConfig::Otlp => todo!(), diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 2c9f257fae5e1..130da381735b5 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -265,7 +265,7 @@ impl SinkConfig for HttpSinkConfig { use Framer::*; use Serializer::*; match (encoder.serializer(), encoder.framer()) { - (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()), + (RawMessage(_) | Text(_) | Influxdb(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()), (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()), (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { Some(CONTENT_TYPE_JSON.to_owned()) diff --git a/src/sinks/http/tests.rs b/src/sinks/http/tests.rs index 1b545c4c01257..0107c11d5c776 100644 --- a/src/sinks/http/tests.rs +++ b/src/sinks/http/tests.rs @@ -13,10 +13,14 @@ use hyper::{Body, Method, Response, StatusCode}; use serde::{Deserialize, de}; use vector_lib::{ codecs::{ - JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig, + InfluxLineProtocolSerializerConfig, JsonSerializerConfig, NewlineDelimitedEncoderConfig, + TextSerializerConfig, encoding::{Framer, FramingConfig}, }, - event::{BatchNotifier, BatchStatus, Event, LogEvent}, + event::{ + BatchNotifier, BatchStatus, Event, LogEvent, + metric::{Metric, MetricKind, MetricValue}, + }, finalization::AddBatchNotifier, }; @@ -119,6 +123,37 @@ fn http_encode_event_ndjson() { assert_eq!(output.message, "hello world".to_string()); } +#[test] +fn http_encode_event_influxdb() { + let metric = Metric::new( + "cpu_usage", + MetricKind::Absolute, + MetricValue::Gauge { value: 42.0 }, + ) + .into(); + + let cfg = default_cfg( + ( + None::, + InfluxLineProtocolSerializerConfig::default(), + ) + .into(), + ); + let encoder = cfg.build_encoder().unwrap(); + let transformer = cfg.encoding.transformer(); + + let encoder = HttpEncoder::new(encoder, transformer, "".to_owned(), "".to_owned()); + + let mut encoded = vec![]; + encoder.encode_input(vec![metric], &mut encoded).unwrap(); + + let payload = String::from_utf8(encoded).unwrap(); + assert!(payload.ends_with('\n')); + let line = payload.trim_end(); + assert!(line.starts_with("cpu_usage,metric_type=gauge")); + assert!(line.contains("value=42")); +} + #[test] fn http_validates_normal_headers() { let config = r#" diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index d386dfae8fa99..a562f38457000 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -1,43 +1,18 @@ pub mod logs; pub mod metrics; -use std::collections::HashMap; - -use bytes::{BufMut, BytesMut}; -use chrono::{DateTime, Utc}; use futures::FutureExt; use http::{StatusCode, Uri}; use snafu::{ResultExt, Snafu}; use tower::Service; -use vector_lib::{ - configurable::configurable_component, - event::{KeyString, MetricTags}, - sensitive_string::SensitiveString, +use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString}; + +pub use vector_lib::codecs::encoding::format::{ + Field, ProtocolVersion, encode_timestamp, influx_line_protocol, }; use crate::http::HttpClient; -pub(in crate::sinks) enum Field { - /// string - String(String), - /// float - Float(f64), - /// unsigned integer - /// Influx can support 64 bit integers if compiled with a flag, see: - /// - UnsignedInt(u64), - /// integer - Int(i64), - /// boolean - Bool(bool), -} - -#[derive(Clone, Copy, Debug)] -pub(in crate::sinks) enum ProtocolVersion { - V1, - V2, -} - #[derive(Debug, Snafu)] enum ConfigError { #[snafu(display("InfluxDB v1 or v2 should be configured as endpoint."))] @@ -229,127 +204,6 @@ fn healthcheck( .boxed()) } -// https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ -pub(in crate::sinks) fn influx_line_protocol( - protocol_version: ProtocolVersion, - measurement: &str, - tags: Option, - fields: Option>, - timestamp: i64, - line_protocol: &mut BytesMut, -) -> Result<(), &'static str> { - // Fields - let unwrapped_fields = fields.unwrap_or_default(); - // LineProtocol should have a field - if unwrapped_fields.is_empty() { - return Err("fields must not be empty"); - } - - encode_string(measurement, line_protocol); - - // Tags are optional - let unwrapped_tags = tags.unwrap_or_default(); - if !unwrapped_tags.is_empty() { - line_protocol.put_u8(b','); - encode_tags(unwrapped_tags, line_protocol); - } - line_protocol.put_u8(b' '); - - // Fields - encode_fields(protocol_version, unwrapped_fields, line_protocol); - line_protocol.put_u8(b' '); - - // Timestamp - line_protocol.put_slice(×tamp.to_string().into_bytes()); - line_protocol.put_u8(b'\n'); - Ok(()) -} - -fn encode_tags(tags: MetricTags, output: &mut BytesMut) { - let original_len = output.len(); - // `tags` is already sorted - for (key, value) in tags.iter_single() { - if key.is_empty() || value.is_empty() { - continue; - } - encode_string(key, output); - output.put_u8(b'='); - encode_string(value, output); - output.put_u8(b','); - } - - // remove last ',' - if output.len() > original_len { - output.truncate(output.len() - 1); - } -} - -fn encode_fields( - protocol_version: ProtocolVersion, - fields: HashMap, - output: &mut BytesMut, -) { - let original_len = output.len(); - for (key, value) in fields.into_iter() { - encode_string(&key, output); - output.put_u8(b'='); - match value { - Field::String(s) => { - output.put_u8(b'"'); - for c in s.chars() { - if "\\\"".contains(c) { - output.put_u8(b'\\'); - } - let mut c_buffer: [u8; 4] = [0; 4]; - output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes()); - } - output.put_u8(b'"'); - } - Field::Float(f) => output.put_slice(&f.to_string().into_bytes()), - Field::UnsignedInt(i) => { - output.put_slice(&i.to_string().into_bytes()); - let c = match protocol_version { - ProtocolVersion::V1 => 'i', - ProtocolVersion::V2 => 'u', - }; - let mut c_buffer: [u8; 4] = [0; 4]; - output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes()); - } - Field::Int(i) => { - output.put_slice(&i.to_string().into_bytes()); - output.put_u8(b'i'); - } - Field::Bool(b) => { - output.put_slice(&b.to_string().into_bytes()); - } - }; - output.put_u8(b','); - } - - // remove last ',' - if output.len() > original_len { - output.truncate(output.len() - 1); - } -} - -fn encode_string(key: &str, output: &mut BytesMut) { - for c in key.chars() { - if "\\, =".contains(c) { - output.put_u8(b'\\'); - } - let mut c_buffer: [u8; 4] = [0; 4]; - output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes()); - } -} - -pub(in crate::sinks) fn encode_timestamp(timestamp: Option>) -> i64 { - if let Some(ts) = timestamp { - ts.timestamp_nanos_opt().unwrap() - } else { - encode_timestamp(Some(Utc::now())) - } -} - pub(in crate::sinks) fn encode_uri( endpoint: &str, path: &str, @@ -382,6 +236,7 @@ pub mod test_util { use std::{fs::File, io::Read}; use chrono::{DateTime, SecondsFormat, Timelike, Utc, offset::TimeZone}; + use vector_lib::event::MetricTags; use vector_lib::metric_tags; use super::*; @@ -567,8 +422,6 @@ mod tests { use serde::{Deserialize, Serialize}; use super::*; - use crate::sinks::influxdb::test_util::{assert_fields, tags, ts}; - #[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(deny_unknown_fields)] pub struct InfluxDbTestConfig { @@ -692,151 +545,6 @@ mod tests { assert_eq!("http://localhost:9999/ping", uri.to_string()) } - #[test] - fn test_encode_tags() { - let mut value = BytesMut::new(); - encode_tags(tags(), &mut value); - - assert_eq!(value, "normal_tag=value,true_tag=true"); - - let tags_to_escape = vec![ - ("tag".to_owned(), "val=ue".to_owned()), - ("name escape".to_owned(), "true".to_owned()), - ("value_escape".to_owned(), "value escape".to_owned()), - ("a_first_place".to_owned(), "10".to_owned()), - ] - .into_iter() - .collect(); - - let mut value = BytesMut::new(); - encode_tags(tags_to_escape, &mut value); - assert_eq!( - value, - "a_first_place=10,name\\ escape=true,tag=val\\=ue,value_escape=value\\ escape" - ); - } - - #[test] - fn tags_order() { - let mut value = BytesMut::new(); - encode_tags( - vec![ - ("a", "value"), - ("b", "value"), - ("c", "value"), - ("d", "value"), - ("e", "value"), - ] - .into_iter() - .map(|(k, v)| (k.to_owned(), v.to_owned())) - .collect(), - &mut value, - ); - assert_eq!(value, "a=value,b=value,c=value,d=value,e=value"); - } - - #[test] - fn test_encode_fields_v1() { - let fields = vec![ - ("field_string".into(), Field::String("string value".into())), - ( - "field_string_escape".into(), - Field::String("string\\val\"ue".into()), - ), - ("field_float".into(), Field::Float(123.45)), - ("field_unsigned_int".into(), Field::UnsignedInt(657)), - ("field_int".into(), Field::Int(657646)), - ("field_bool_true".into(), Field::Bool(true)), - ("field_bool_false".into(), Field::Bool(false)), - ("escape key".into(), Field::Float(10.0)), - ] - .into_iter() - .collect(); - - let mut value = BytesMut::new(); - encode_fields(ProtocolVersion::V1, fields, &mut value); - let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap(); - assert_fields( - value, - [ - "escape\\ key=10", - "field_float=123.45", - "field_string=\"string value\"", - "field_string_escape=\"string\\\\val\\\"ue\"", - "field_unsigned_int=657i", - "field_int=657646i", - "field_bool_true=true", - "field_bool_false=false", - ] - .to_vec(), - ) - } - - #[test] - fn test_encode_fields() { - let fields = vec![ - ("field_string".into(), Field::String("string value".into())), - ( - "field_string_escape".into(), - Field::String("string\\val\"ue".into()), - ), - ("field_float".into(), Field::Float(123.45)), - ("field_unsigned_int".into(), Field::UnsignedInt(657)), - ("field_int".into(), Field::Int(657646)), - ("field_bool_true".into(), Field::Bool(true)), - ("field_bool_false".into(), Field::Bool(false)), - ("escape key".into(), Field::Float(10.0)), - ] - .into_iter() - .collect(); - - let mut value = BytesMut::new(); - encode_fields(ProtocolVersion::V2, fields, &mut value); - let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap(); - assert_fields( - value, - [ - "escape\\ key=10", - "field_float=123.45", - "field_string=\"string value\"", - "field_string_escape=\"string\\\\val\\\"ue\"", - "field_unsigned_int=657u", - "field_int=657646i", - "field_bool_true=true", - "field_bool_false=false", - ] - .to_vec(), - ) - } - - #[test] - fn test_encode_string() { - let mut value = BytesMut::new(); - encode_string("measurement_name", &mut value); - assert_eq!(value, "measurement_name"); - - let mut value = BytesMut::new(); - encode_string("measurement name", &mut value); - assert_eq!(value, "measurement\\ name"); - - let mut value = BytesMut::new(); - encode_string("measurement=name", &mut value); - assert_eq!(value, "measurement\\=name"); - - let mut value = BytesMut::new(); - encode_string("measurement,name", &mut value); - assert_eq!(value, "measurement\\,name"); - } - - #[test] - fn test_encode_timestamp() { - let start = Utc::now() - .timestamp_nanos_opt() - .expect("Timestamp out of range"); - assert_eq!(encode_timestamp(Some(ts())), 1542182950000000011); - assert!(encode_timestamp(None) >= start) - } - #[test] fn test_encode_uri_valid() { let uri = encode_uri( diff --git a/src/sinks/websocket/config.rs b/src/sinks/websocket/config.rs index 7707e650c9946..9b24dc204361f 100644 --- a/src/sinks/websocket/config.rs +++ b/src/sinks/websocket/config.rs @@ -77,9 +77,21 @@ impl WebSocketSinkConfig { #[cfg(test)] mod test { use super::*; + use vector_lib::codecs::InfluxLineProtocolSerializerConfig; #[test] fn generate_config() { crate::test_util::test_generate_config::(); } + + #[test] + fn influxdb_encoding_requires_metrics() { + let config = WebSocketSinkConfig { + common: WebSocketCommonConfig::default(), + encoding: InfluxLineProtocolSerializerConfig::default().into(), + acknowledgements: Default::default(), + }; + + assert_eq!(config.input(), Input::metric()); + } } diff --git a/website/cue/reference/components/sinks/generated/amqp.cue b/website/cue/reference/components/sinks/generated/amqp.cue index 66fb1312c5695..eb12eabef4829 100644 --- a/website/cue/reference/components/sinks/generated/amqp.cue +++ b/website/cue/reference/components/sinks/generated/amqp.cue @@ -178,6 +178,7 @@ generated: components: sinks: amqp: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -321,6 +322,12 @@ generated: components: sinks: amqp: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -412,6 +419,37 @@ generated: components: sinks: amqp: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/generated/aws_cloudwatch_logs.cue index 12686c9b27b65..ff8514df53706 100644 --- a/website/cue/reference/components/sinks/generated/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/generated/aws_cloudwatch_logs.cue @@ -374,6 +374,7 @@ generated: components: sinks: aws_cloudwatch_logs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -517,6 +518,12 @@ generated: components: sinks: aws_cloudwatch_logs: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -608,6 +615,37 @@ generated: components: sinks: aws_cloudwatch_logs: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/generated/aws_kinesis_firehose.cue index da311f458462e..d3e0653a9a0b9 100644 --- a/website/cue/reference/components/sinks/generated/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/generated/aws_kinesis_firehose.cue @@ -353,6 +353,7 @@ generated: components: sinks: aws_kinesis_firehose: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -496,6 +497,12 @@ generated: components: sinks: aws_kinesis_firehose: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -587,6 +594,37 @@ generated: components: sinks: aws_kinesis_firehose: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/generated/aws_kinesis_streams.cue index 4a800fa6e35da..30d7e3c1f6086 100644 --- a/website/cue/reference/components/sinks/generated/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/generated/aws_kinesis_streams.cue @@ -353,6 +353,7 @@ generated: components: sinks: aws_kinesis_streams: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -496,6 +497,12 @@ generated: components: sinks: aws_kinesis_streams: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -587,6 +594,37 @@ generated: components: sinks: aws_kinesis_streams: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/aws_s3.cue b/website/cue/reference/components/sinks/generated/aws_s3.cue index f17b2abf1e74f..97eb38ff8a0d4 100644 --- a/website/cue/reference/components/sinks/generated/aws_s3.cue +++ b/website/cue/reference/components/sinks/generated/aws_s3.cue @@ -462,6 +462,7 @@ generated: components: sinks: aws_s3: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -605,6 +606,12 @@ generated: components: sinks: aws_s3: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -696,6 +703,37 @@ generated: components: sinks: aws_s3: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/aws_sns.cue b/website/cue/reference/components/sinks/generated/aws_sns.cue index 1d2413066b071..ca65712b8c779 100644 --- a/website/cue/reference/components/sinks/generated/aws_sns.cue +++ b/website/cue/reference/components/sinks/generated/aws_sns.cue @@ -284,6 +284,7 @@ generated: components: sinks: aws_sns: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -427,6 +428,12 @@ generated: components: sinks: aws_sns: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -518,6 +525,37 @@ generated: components: sinks: aws_sns: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/aws_sqs.cue b/website/cue/reference/components/sinks/generated/aws_sqs.cue index 912e1f2c43ea6..8c0dde2ee7c09 100644 --- a/website/cue/reference/components/sinks/generated/aws_sqs.cue +++ b/website/cue/reference/components/sinks/generated/aws_sqs.cue @@ -284,6 +284,7 @@ generated: components: sinks: aws_sqs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -427,6 +428,12 @@ generated: components: sinks: aws_sqs: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -518,6 +525,37 @@ generated: components: sinks: aws_sqs: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/azure_blob.cue b/website/cue/reference/components/sinks/generated/azure_blob.cue index 69bdd368f1338..62cfa620e6b67 100644 --- a/website/cue/reference/components/sinks/generated/azure_blob.cue +++ b/website/cue/reference/components/sinks/generated/azure_blob.cue @@ -308,6 +308,7 @@ generated: components: sinks: azure_blob: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -451,6 +452,12 @@ generated: components: sinks: azure_blob: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -542,6 +549,37 @@ generated: components: sinks: azure_blob: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/console.cue b/website/cue/reference/components/sinks/generated/console.cue index 16545964f6950..334535174d47d 100644 --- a/website/cue/reference/components/sinks/generated/console.cue +++ b/website/cue/reference/components/sinks/generated/console.cue @@ -162,6 +162,7 @@ generated: components: sinks: console: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -305,6 +306,12 @@ generated: components: sinks: console: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -396,6 +403,37 @@ generated: components: sinks: console: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/file.cue b/website/cue/reference/components/sinks/generated/file.cue index 3135f4b70db5f..bf497e95680e7 100644 --- a/website/cue/reference/components/sinks/generated/file.cue +++ b/website/cue/reference/components/sinks/generated/file.cue @@ -182,6 +182,7 @@ generated: components: sinks: file: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -325,6 +326,12 @@ generated: components: sinks: file: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -416,6 +423,37 @@ generated: components: sinks: file: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/gcp_chronicle_unstructured.cue b/website/cue/reference/components/sinks/generated/gcp_chronicle_unstructured.cue index 0a027dfa45a74..1ce983bb684c6 100644 --- a/website/cue/reference/components/sinks/generated/gcp_chronicle_unstructured.cue +++ b/website/cue/reference/components/sinks/generated/gcp_chronicle_unstructured.cue @@ -250,6 +250,7 @@ generated: components: sinks: gcp_chronicle_unstructured: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -393,6 +394,12 @@ generated: components: sinks: gcp_chronicle_unstructured: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -484,6 +491,37 @@ generated: components: sinks: gcp_chronicle_unstructured: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue index 22f565c34665d..936d5a26cfcac 100644 --- a/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue @@ -323,6 +323,7 @@ generated: components: sinks: gcp_cloud_storage: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -466,6 +467,12 @@ generated: components: sinks: gcp_cloud_storage: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -557,6 +564,37 @@ generated: components: sinks: gcp_cloud_storage: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/gcp_pubsub.cue b/website/cue/reference/components/sinks/generated/gcp_pubsub.cue index 385bf9dd13793..0efafa297859e 100644 --- a/website/cue/reference/components/sinks/generated/gcp_pubsub.cue +++ b/website/cue/reference/components/sinks/generated/gcp_pubsub.cue @@ -229,6 +229,7 @@ generated: components: sinks: gcp_pubsub: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -372,6 +373,12 @@ generated: components: sinks: gcp_pubsub: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -463,6 +470,37 @@ generated: components: sinks: gcp_pubsub: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/http.cue b/website/cue/reference/components/sinks/generated/http.cue index 68406cb2c5037..3087c2da2508c 100644 --- a/website/cue/reference/components/sinks/generated/http.cue +++ b/website/cue/reference/components/sinks/generated/http.cue @@ -405,6 +405,7 @@ generated: components: sinks: http: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -548,6 +549,12 @@ generated: components: sinks: http: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -639,6 +646,37 @@ generated: components: sinks: http: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/humio_logs.cue b/website/cue/reference/components/sinks/generated/humio_logs.cue index b9e46513d8b9c..fc39c7ebfbed4 100644 --- a/website/cue/reference/components/sinks/generated/humio_logs.cue +++ b/website/cue/reference/components/sinks/generated/humio_logs.cue @@ -228,6 +228,7 @@ generated: components: sinks: humio_logs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -371,6 +372,12 @@ generated: components: sinks: humio_logs: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -462,6 +469,37 @@ generated: components: sinks: humio_logs: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/kafka.cue b/website/cue/reference/components/sinks/generated/kafka.cue index 9b3ef74b7dd76..35c117d4f5ce8 100644 --- a/website/cue/reference/components/sinks/generated/kafka.cue +++ b/website/cue/reference/components/sinks/generated/kafka.cue @@ -217,6 +217,7 @@ generated: components: sinks: kafka: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -360,6 +361,12 @@ generated: components: sinks: kafka: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -451,6 +458,37 @@ generated: components: sinks: kafka: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/loki.cue b/website/cue/reference/components/sinks/generated/loki.cue index 3a8754491faa8..3bb218c6f7c3e 100644 --- a/website/cue/reference/components/sinks/generated/loki.cue +++ b/website/cue/reference/components/sinks/generated/loki.cue @@ -407,6 +407,7 @@ generated: components: sinks: loki: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -550,6 +551,12 @@ generated: components: sinks: loki: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -641,6 +648,37 @@ generated: components: sinks: loki: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/mqtt.cue b/website/cue/reference/components/sinks/generated/mqtt.cue index 980996cff164d..a9b4a44a67807 100644 --- a/website/cue/reference/components/sinks/generated/mqtt.cue +++ b/website/cue/reference/components/sinks/generated/mqtt.cue @@ -172,6 +172,7 @@ generated: components: sinks: mqtt: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -315,6 +316,12 @@ generated: components: sinks: mqtt: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -406,6 +413,37 @@ generated: components: sinks: mqtt: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/nats.cue b/website/cue/reference/components/sinks/generated/nats.cue index 0979492903e9c..da37cdc0322d7 100644 --- a/website/cue/reference/components/sinks/generated/nats.cue +++ b/website/cue/reference/components/sinks/generated/nats.cue @@ -262,6 +262,7 @@ generated: components: sinks: nats: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -405,6 +406,12 @@ generated: components: sinks: nats: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -496,6 +503,37 @@ generated: components: sinks: nats: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/opentelemetry.cue b/website/cue/reference/components/sinks/generated/opentelemetry.cue index 308f48f0cee6d..ff8ce2adc8b58 100644 --- a/website/cue/reference/components/sinks/generated/opentelemetry.cue +++ b/website/cue/reference/components/sinks/generated/opentelemetry.cue @@ -408,6 +408,7 @@ generated: components: sinks: opentelemetry: configuration: protocol: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -551,6 +552,12 @@ generated: components: sinks: opentelemetry: configuration: protocol: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -642,6 +649,37 @@ generated: components: sinks: opentelemetry: configuration: protocol: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/papertrail.cue b/website/cue/reference/components/sinks/generated/papertrail.cue index b69042f48b2ff..9359d8ef36f46 100644 --- a/website/cue/reference/components/sinks/generated/papertrail.cue +++ b/website/cue/reference/components/sinks/generated/papertrail.cue @@ -162,6 +162,7 @@ generated: components: sinks: papertrail: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -305,6 +306,12 @@ generated: components: sinks: papertrail: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -396,6 +403,37 @@ generated: components: sinks: papertrail: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/pulsar.cue b/website/cue/reference/components/sinks/generated/pulsar.cue index cc2cb6c90cb3a..da6984b09a05b 100644 --- a/website/cue/reference/components/sinks/generated/pulsar.cue +++ b/website/cue/reference/components/sinks/generated/pulsar.cue @@ -296,6 +296,7 @@ generated: components: sinks: pulsar: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -439,6 +440,12 @@ generated: components: sinks: pulsar: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -530,6 +537,37 @@ generated: components: sinks: pulsar: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/redis.cue b/website/cue/reference/components/sinks/generated/redis.cue index fdd5686420da8..a3b7a0e5660a5 100644 --- a/website/cue/reference/components/sinks/generated/redis.cue +++ b/website/cue/reference/components/sinks/generated/redis.cue @@ -221,6 +221,7 @@ generated: components: sinks: redis: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -364,6 +365,12 @@ generated: components: sinks: redis: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -455,6 +462,37 @@ generated: components: sinks: redis: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/socket.cue b/website/cue/reference/components/sinks/generated/socket.cue index 17d6e8cf7d4d0..0379f712a228f 100644 --- a/website/cue/reference/components/sinks/generated/socket.cue +++ b/website/cue/reference/components/sinks/generated/socket.cue @@ -174,6 +174,7 @@ generated: components: sinks: socket: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -317,6 +318,12 @@ generated: components: sinks: socket: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -408,6 +415,37 @@ generated: components: sinks: socket: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/splunk_hec_logs.cue b/website/cue/reference/components/sinks/generated/splunk_hec_logs.cue index ce87f34e9ff83..d0438fd38fe2e 100644 --- a/website/cue/reference/components/sinks/generated/splunk_hec_logs.cue +++ b/website/cue/reference/components/sinks/generated/splunk_hec_logs.cue @@ -278,6 +278,7 @@ generated: components: sinks: splunk_hec_logs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -421,6 +422,12 @@ generated: components: sinks: splunk_hec_logs: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -512,6 +519,37 @@ generated: components: sinks: splunk_hec_logs: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/webhdfs.cue b/website/cue/reference/components/sinks/generated/webhdfs.cue index 236bd9f491f33..fbcc89c265586 100644 --- a/website/cue/reference/components/sinks/generated/webhdfs.cue +++ b/website/cue/reference/components/sinks/generated/webhdfs.cue @@ -228,6 +228,7 @@ generated: components: sinks: webhdfs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -371,6 +372,12 @@ generated: components: sinks: webhdfs: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -462,6 +469,37 @@ generated: components: sinks: webhdfs: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/websocket.cue b/website/cue/reference/components/sinks/generated/websocket.cue index 3e901d5ba0276..cb204328ef63e 100644 --- a/website/cue/reference/components/sinks/generated/websocket.cue +++ b/website/cue/reference/components/sinks/generated/websocket.cue @@ -334,6 +334,7 @@ generated: components: sinks: websocket: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -477,6 +478,12 @@ generated: components: sinks: websocket: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -568,6 +575,37 @@ generated: components: sinks: websocket: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/generated/websocket_server.cue b/website/cue/reference/components/sinks/generated/websocket_server.cue index 0827d1b9850b8..bc7ee01bfed56 100644 --- a/website/cue/reference/components/sinks/generated/websocket_server.cue +++ b/website/cue/reference/components/sinks/generated/websocket_server.cue @@ -218,6 +218,7 @@ generated: components: sinks: websocket_server: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: "Encodes metric events using the InfluxDB line protocol." json: """ Encodes an event as [JSON][json]. @@ -361,6 +362,12 @@ generated: components: sinks: websocket_server: configuration: { } } } + default_namespace: { + description: "A default namespace applied when metrics do not specify one." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: {} + } except_fields: { description: "List of fields that are excluded from the encoded event." required: false @@ -452,6 +459,37 @@ generated: components: sinks: websocket_server: configuration: { } } } + protocol_version: { + description: "The protocol version to encode fields with." + relevant_when: "codec = \"influxdb\"" + required: false + type: string: { + default: "v2" + enum: { + v1: "Line protocol for InfluxDB v1.x." + v2: "Line protocol for InfluxDB v2.x." + } + } + } + quantiles: { + description: "Quantiles to calculate when encoding distribution metrics." + relevant_when: "codec = \"influxdb\"" + required: false + type: array: { + default: [0.5, 0.75, 0.9, 0.95, 0.99] + items: type: float: {} + } + } + tags: { + description: "Additional tags that are appended to every encoded metric." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: "*": { + description: "A tag key/value pair that is appended to each measurement." + required: true + type: string: {} + } + } timestamp_format: { description: "Format used for timestamp fields." required: false