diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index 7520702b17776..19ad9922cceca 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -1,7 +1,8 @@ -use lookup::lookup_v2::OptionalValuePath; +use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath}; use lookup::{OwnedTargetPath, OwnedValuePath}; use once_cell::sync::{Lazy, OnceCell}; use vector_config::configurable_component; +use vrl::path::PathPrefix; static LOG_SCHEMA: OnceCell = OnceCell::new(); static LOG_SCHEMA_DEFAULT: Lazy = Lazy::new(LogSchema::default); @@ -49,24 +50,24 @@ pub struct LogSchema { /// /// This would be the field that holds the raw message, such as a raw log line. #[serde(default = "LogSchema::default_message_key")] - message_key: OptionalValuePath, + message_key: OptionalTargetPath, /// The name of the event field to treat as the event timestamp. #[serde(default = "LogSchema::default_timestamp_key")] - timestamp_key: OptionalValuePath, + timestamp_key: OptionalTargetPath, /// The name of the event field to treat as the host which sent the message. /// /// This field will generally represent a real host, or container, that generated the message, /// but is somewhat source-dependent. #[serde(default = "LogSchema::default_host_key")] - host_key: OptionalValuePath, + host_key: OptionalTargetPath, /// The name of the event field to set the source identifier in. /// /// This field will be set by the Vector source that the event was created in. #[serde(default = "LogSchema::default_source_type_key")] - source_type_key: OptionalValuePath, + source_type_key: OptionalTargetPath, /// The name of the event field to set the event metadata in. /// @@ -89,20 +90,20 @@ impl Default for LogSchema { } impl LogSchema { - fn default_message_key() -> OptionalValuePath { - OptionalValuePath::new(MESSAGE) + fn default_message_key() -> OptionalTargetPath { + OptionalTargetPath::event(MESSAGE) } - fn default_timestamp_key() -> OptionalValuePath { - OptionalValuePath::new(TIMESTAMP) + fn default_timestamp_key() -> OptionalTargetPath { + OptionalTargetPath::event(TIMESTAMP) } - fn default_host_key() -> OptionalValuePath { - OptionalValuePath::new(HOST) + fn default_host_key() -> OptionalTargetPath { + OptionalTargetPath::event(HOST) } - fn default_source_type_key() -> OptionalValuePath { - OptionalValuePath::new(SOURCE_TYPE) + fn default_source_type_key() -> OptionalTargetPath { + OptionalTargetPath::event(SOURCE_TYPE) } fn default_metadata_key() -> OptionalValuePath { @@ -110,48 +111,68 @@ impl LogSchema { } pub fn message_key(&self) -> Option<&OwnedValuePath> { - self.message_key.path.as_ref() + self.message_key.path.as_ref().map(|key| &key.path) } /// Returns an `OwnedTargetPath` of the message key. /// This parses the path and will panic if it is invalid. /// /// This should only be used where the result will either be cached, - /// or performance isn't critical, since this requires parsing / memory allocation. + /// or performance isn't critical, since this requires memory allocation. pub fn owned_message_path(&self) -> OwnedTargetPath { - OwnedTargetPath::event(self.message_key.clone().path.expect("valid message key")) + self.message_key + .path + .as_ref() + .expect("valid message key") + .clone() } pub fn timestamp_key(&self) -> Option<&OwnedValuePath> { - self.timestamp_key.path.as_ref() + self.timestamp_key.as_ref().map(|key| &key.path) } pub fn host_key(&self) -> Option<&OwnedValuePath> { - self.host_key.path.as_ref() + self.host_key.as_ref().map(|key| &key.path) } pub fn source_type_key(&self) -> Option<&OwnedValuePath> { - self.source_type_key.path.as_ref() + self.source_type_key.as_ref().map(|key| &key.path) } pub fn metadata_key(&self) -> Option<&OwnedValuePath> { self.metadata_key.path.as_ref() } + pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> { + self.message_key.as_ref() + } + + pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> { + self.timestamp_key.as_ref() + } + + pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> { + self.host_key.as_ref() + } + + pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> { + self.source_type_key.as_ref() + } + pub fn set_message_key(&mut self, path: Option) { - self.message_key = OptionalValuePath { path }; + self.message_key = OptionalTargetPath::from(PathPrefix::Event, path); } - pub fn set_timestamp_key(&mut self, v: Option) { - self.timestamp_key = OptionalValuePath { path: v }; + pub fn set_timestamp_key(&mut self, path: Option) { + self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path); } pub fn set_host_key(&mut self, path: Option) { - self.host_key = OptionalValuePath { path }; + self.host_key = OptionalTargetPath::from(PathPrefix::Event, path); } pub fn set_source_type_key(&mut self, path: Option) { - self.source_type_key = OptionalValuePath { path }; + self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path); } pub fn set_metadata_key(&mut self, path: Option) { diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 703546404b985..5e26e2542957e 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -20,7 +20,7 @@ use vector_common::{ request_metadata::GetEventCountTags, EventDataEq, }; -use vrl::path::OwnedValuePath; +use vrl::path::{OwnedTargetPath, OwnedValuePath}; use super::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, @@ -32,6 +32,15 @@ use crate::config::LogNamespace; use crate::config::{log_schema, telemetry}; use crate::{event::MaybeAsLogMut, ByteSizeOf}; use lookup::{metadata_path, path}; +use once_cell::sync::Lazy; +use vrl::owned_value_path; + +static VECTOR_SOURCE_TYPE_PATH: Lazy> = Lazy::new(|| { + Some(OwnedTargetPath::metadata(owned_value_path!( + "vector", + "source_type" + ))) +}); #[derive(Debug, Deserialize)] struct Inner { @@ -296,7 +305,7 @@ impl LogEvent { /// Retrieves the value of a field based on it's meaning. /// This will first check if the value has previously been dropped. It is worth being - /// aware that if the field has been dropped and then some how readded, we still fetch + /// aware that if the field has been dropped and then somehow re-added, we still fetch /// the dropped value here. pub fn get_by_meaning(&self, meaning: impl AsRef) -> Option<&Value> { if let Some(dropped) = self.metadata().dropped_field(&meaning) { @@ -309,12 +318,11 @@ impl LogEvent { } } - // TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed. - pub fn find_key_by_meaning(&self, meaning: impl AsRef) -> Option { + /// Retrieves the target path of a field based on the specified `meaning`. + fn find_key_by_meaning(&self, meaning: impl AsRef) -> Option<&OwnedTargetPath> { self.metadata() .schema_definition() .meaning_path(meaning.as_ref()) - .map(std::string::ToString::to_string) } #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference @@ -452,45 +460,37 @@ impl LogEvent { impl LogEvent { /// Fetches the "message" path of the event. This is either from the "message" semantic meaning (Vector namespace) /// or from the message key set on the "Global Log Schema" (Legacy namespace). - // TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the - // "Global Log Schema" are updated to the new path lookup code - pub fn message_path(&self) -> Option { + pub fn message_path(&self) -> Option<&OwnedTargetPath> { match self.namespace() { LogNamespace::Vector => self.find_key_by_meaning("message"), - LogNamespace::Legacy => log_schema().message_key().map(ToString::to_string), + LogNamespace::Legacy => log_schema().message_key_target_path(), } } /// Fetches the "timestamp" path of the event. This is either from the "timestamp" semantic meaning (Vector namespace) /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace). - // TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the - // "Global Log Schema" are updated to the new path lookup code - pub fn timestamp_path(&self) -> Option { + pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> { match self.namespace() { LogNamespace::Vector => self.find_key_by_meaning("timestamp"), - LogNamespace::Legacy => log_schema().timestamp_key().map(ToString::to_string), + LogNamespace::Legacy => log_schema().timestamp_key_target_path(), } } /// Fetches the `host` path of the event. This is either from the "host" semantic meaning (Vector namespace) /// or from the host key set on the "Global Log Schema" (Legacy namespace). - // TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the - // "Global Log Schema" are updated to the new path lookup code - pub fn host_path(&self) -> Option { + pub fn host_path(&self) -> Option<&OwnedTargetPath> { match self.namespace() { LogNamespace::Vector => self.find_key_by_meaning("host"), - LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string), + LogNamespace::Legacy => log_schema().host_key_target_path(), } } /// Fetches the `source_type` path of the event. This is either from the `source_type` Vector metadata field (Vector namespace) /// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace). - // TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the - // "Global Log Schema" are updated to the new path lookup code - pub fn source_type_path(&self) -> Option { + pub fn source_type_path(&self) -> Option<&OwnedTargetPath> { match self.namespace() { - LogNamespace::Vector => Some("%vector.source_type".to_string()), - LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string), + LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(), + LogNamespace::Legacy => log_schema().source_type_key_target_path(), } } @@ -520,7 +520,8 @@ impl LogEvent { /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace). pub fn remove_timestamp(&mut self) -> Option { self.timestamp_path() - .and_then(|key| self.remove(key.as_str())) + .cloned() + .and_then(|key| self.remove(&key)) } /// Fetches the `host` of the event. This is either from the "host" semantic meaning (Vector namespace) diff --git a/lib/vector-lookup/src/lookup_v2/optional_path.rs b/lib/vector-lookup/src/lookup_v2/optional_path.rs index 5bbfe4ad082d4..de5f5d9d67ac0 100644 --- a/lib/vector-lookup/src/lookup_v2/optional_path.rs +++ b/lib/vector-lookup/src/lookup_v2/optional_path.rs @@ -1,5 +1,6 @@ use vector_config::configurable_component; use vrl::owned_value_path; +use vrl::path::PathPrefix; use crate::lookup_v2::PathParseError; use crate::{OwnedTargetPath, OwnedValuePath}; @@ -16,6 +17,25 @@ impl OptionalTargetPath { pub fn none() -> Self { Self { path: None } } + + pub fn event(path: &str) -> Self { + Self { + path: Some(OwnedTargetPath { + prefix: PathPrefix::Event, + path: owned_value_path!(path), + }), + } + } + + pub fn from(prefix: PathPrefix, path: Option) -> Self { + Self { + path: path.map(|path| OwnedTargetPath { prefix, path }), + } + } + + pub fn as_ref(&self) -> Option<&OwnedTargetPath> { + self.path.as_ref() + } } impl TryFrom for OptionalTargetPath { diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index 19a7646c658b8..4a7a1a04facd7 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -58,25 +58,26 @@ async fn ensure_required_fields(event: Event) -> Option { if !log.contains("text") { let message_path = log .message_path() - .expect("message is required (make sure the \"message\" semantic meaning is set)"); - log.rename_key(message_path.as_str(), event_path!("text")) + .expect("message is required (make sure the \"message\" semantic meaning is set)") + .clone(); + log.rename_key(&message_path, event_path!("text")); } if !log.contains("host") { - if let Some(host_path) = log.host_path() { - log.rename_key(host_path.as_str(), event_path!("host")); + if let Some(host_path) = log.host_path().cloned().as_ref() { + log.rename_key(host_path, event_path!("host")); } } if !log.contains("date_happened") { - if let Some(timestamp_path) = log.timestamp_path() { - log.rename_key(timestamp_path.as_str(), "date_happened"); + if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() { + log.rename_key(timestamp_path, "date_happened"); } } if !log.contains("source_type_name") { - if let Some(source_type_path) = log.source_type_path() { - log.rename_key(source_type_path.as_str(), "source_type_name") + if let Some(source_type_path) = log.source_type_path().cloned().as_ref() { + log.rename_key(source_type_path, "source_type_name"); } } diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index ca1bb60e8de7a..1213b1d3c50c0 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -134,19 +134,21 @@ impl crate::sinks::util::encoding::Encoder> for JsonEncoding { let log = event.as_mut_log(); let message_path = log .message_path() - .expect("message is required (make sure the \"message\" semantic meaning is set)"); - log.rename_key(message_path.as_str(), event_path!("message")); + .expect("message is required (make sure the \"message\" semantic meaning is set)") + .clone(); + log.rename_key(&message_path, event_path!("message")); - if let Some(host_path) = log.host_path() { - log.rename_key(host_path.as_str(), event_path!("hostname")); + if let Some(host_path) = log.host_path().cloned().as_ref() { + log.rename_key(host_path, event_path!("hostname")); } - if let Some(Value::Timestamp(ts)) = log.remove( - log + let message_path = log .timestamp_path() - .expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)") - .as_str() - ) { + .expect( + "timestamp is required (make sure the \"timestamp\" semantic meaning is set)", + ) + .clone(); + if let Some(Value::Timestamp(ts)) = log.remove(&message_path) { log.insert( event_path!("timestamp"), Value::Integer(ts.timestamp_millis()), diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 1e6c9d708894a..9f50d79e44a2d 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -355,15 +355,12 @@ impl DataStreamConfig { /// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema. pub fn remap_timestamp(&self, log: &mut LogEvent) { - if let Some(timestamp_key) = log.timestamp_path() { - if timestamp_key == DATA_STREAM_TIMESTAMP_KEY { + if let Some(timestamp_key) = log.timestamp_path().cloned() { + if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY { return; } - log.rename_key( - timestamp_key.as_str(), - event_path!(DATA_STREAM_TIMESTAMP_KEY), - ) + log.rename_key(×tamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY)); } } diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index cea355dabb487..777f8b7d903d7 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -4,10 +4,11 @@ use bytes::{Bytes, BytesMut}; use futures::SinkExt; use http::{Request, Uri}; use indoc::indoc; +use vrl::path::OwnedValuePath; use vrl::value::Kind; use lookup::lookup_v2::OptionalValuePath; -use lookup::{OwnedValuePath, PathPrefix}; +use lookup::PathPrefix; use vector_config::configurable_component; use vector_core::config::log_schema; use vector_core::schema; @@ -188,28 +189,24 @@ impl SinkConfig for InfluxDbLogsConfig { let host_key = self .host_key - .clone() - .and_then(|k| k.path) - .or(log_schema().host_key().cloned()) + .as_ref() + .and_then(|k| k.path.clone()) + .or_else(|| log_schema().host_key().cloned()) .expect("global log_schema.host_key to be valid path"); let message_key = self .message_key - .clone() - .and_then(|k| k.path) - .unwrap_or_else(|| { - log_schema() - .message_key() - .cloned() - .expect("global log_schema.message_key to be valid path") - }); + .as_ref() + .and_then(|k| k.path.clone()) + .or_else(|| log_schema().message_key().cloned()) + .expect("global log_schema.message_key to be valid path"); let source_type_key = self .source_type_key - .clone() - .and_then(|k| k.path) - .or(log_schema().source_type_key().cloned()) - .unwrap(); + .as_ref() + .and_then(|k| k.path.clone()) + .or_else(|| log_schema().source_type_key().cloned()) + .expect("global log_schema.source_type_key to be valid path"); let sink = InfluxDbLogsSink { uri, @@ -267,25 +264,19 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { // the original value that was assigned to the root. To avoid this we intentionally rename // the path that points to "message" such that it has a dedicated key. // TODO: add a `TargetPath::is_event_root()` to conditionally rename? - if let Some(message_path) = log.message_path() { - log.rename_key( - message_path.as_str(), - (PathPrefix::Event, &self.message_key), - ) + if let Some(message_path) = log.message_path().cloned().as_ref() { + log.rename_key(message_path, (PathPrefix::Event, &self.message_key)); } // Add the `host` and `source_type` to the HashSet of tags to include // Ensure those paths are on the event to be encoded, rather than metadata - if let Some(host_path) = log.host_path() { - self.tags.replace(host_path.clone()); - log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key)); + if let Some(host_path) = log.host_path().cloned().as_ref() { + self.tags.replace(host_path.path.to_string()); + log.rename_key(host_path, (PathPrefix::Event, &self.host_key)); } - if let Some(source_type_path) = log.source_type_path() { - self.tags.replace(source_type_path.clone()); - log.rename_key( - source_type_path.as_str(), - (PathPrefix::Event, &self.source_type_key), - ); + if let Some(source_type_path) = log.source_type_path().cloned().as_ref() { + self.tags.replace(source_type_path.path.to_string()); + log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key)); } self.tags.replace("metric_type".to_string()); diff --git a/src/sinks/mezmo.rs b/src/sinks/mezmo.rs index a79179ef3f0d0..97d96f40c8c40 100644 --- a/src/sinks/mezmo.rs +++ b/src/sinks/mezmo.rs @@ -253,12 +253,15 @@ impl HttpEventEncoder> for let line = log .message_path() - .and_then(|path| log.remove(path.as_str())) + .cloned() + .as_ref() + .and_then(|path| log.remove(path)) .unwrap_or_else(|| String::from("").into()); let timestamp: Value = log .timestamp_path() - .and_then(|path| log.remove(path.as_str())) + .cloned() + .and_then(|path| log.remove(&path)) .unwrap_or_else(|| chrono::Utc::now().into()); let mut map = serde_json::map::Map::new(); diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index e4a3d05b00452..e5484b5dbd859 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -143,12 +143,12 @@ fn map_timestamp(mut events: EventArray) -> EventArray { match &mut events { EventArray::Logs(logs) => { for log in logs { - if let Some(path) = log.timestamp_path() { - log.rename_key(path.as_str(), "@timestamp"); + if let Some(path) = log.timestamp_path().cloned().as_ref() { + log.rename_key(path, "@timestamp"); } - if let Some(path) = log.host_path() { - log.rename_key(path.as_str(), "os.host"); + if let Some(path) = log.host_path().cloned().as_ref() { + log.rename_key(path, "os.host"); } } }