diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 297c01099..e5a03277f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -26,7 +26,10 @@ use crate::event::{ format::{self, EventFormat}, }; use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -use crate::handlers::STREAM_NAME_HEADER_KEY; +use crate::handlers::{ + LOG_SOURCE_KEY, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, LOG_SOURCE_OTEL_TRACES, + STREAM_NAME_HEADER_KEY, +}; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{SchemaVersion, STREAM_INFO}; use crate::option::{Mode, CONFIG}; @@ -120,6 +123,16 @@ pub async fn handle_otel_logs_ingestion( let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; + + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_LOGS { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-logs for ingesting otel logs" + ))); + } + let stream_name = stream_name.to_str().unwrap().to_owned(); create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; @@ -144,6 +157,14 @@ pub async fn handle_otel_metrics_ingestion( let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_METRICS { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-metrics for ingesting otel metrics" + ))); + } let stream_name = stream_name.to_str().unwrap().to_owned(); create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; @@ -168,6 +189,15 @@ pub async fn handle_otel_traces_ingestion( let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; + + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_TRACES { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-traces for ingesting otel traces" + ))); + } let stream_name = stream_name.to_str().unwrap().to_owned(); create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 058e74710..f790e74f4 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -21,16 +21,9 @@ use anyhow::anyhow; use arrow_schema::Field; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; -use nom::AsBytes; -use opentelemetry_proto::tonic::{ - logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, -}; use itertools::Itertools; use serde_json::Value; -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use crate::{ event::{ @@ -39,11 +32,9 @@ use crate::{ }, handlers::{ http::{ingest::PostError, kinesis}, - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, - LOG_SOURCE_OTEL_TRACES, PREFIX_META, PREFIX_TAGS, SEPARATOR, + LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR, }, metadata::{SchemaVersion, STREAM_INFO}, - otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, storage::StreamType, utils::{header_parsing::collect_labelled_headers, json::convert_array_to_object}, }; @@ -57,32 +48,19 @@ pub async fn flatten_and_push_logs( push_logs(stream_name, &req, &body).await?; return Ok(()); }; - let mut json: Vec> = Vec::new(); - match log_source.to_str().unwrap() { - LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), - //custom flattening required for otel logs - LOG_SOURCE_OTEL_LOGS => { - let logs: LogsData = serde_json::from_slice(body.as_bytes())?; - json = flatten_otel_logs(&logs); - } - //custom flattening required for otel metrics - LOG_SOURCE_OTEL_METRICS => { - let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?; - json = flatten_otel_metrics(metrics); - } - //custom flattening required for otel traces - LOG_SOURCE_OTEL_TRACES => { - let traces: TracesData = serde_json::from_slice(body.as_bytes())?; - json = flatten_otel_traces(&traces); - } - log_source => { - tracing::warn!("Unknown log source: {}", log_source); + let log_source = log_source.to_str().unwrap(); + if log_source == LOG_SOURCE_KINESIS { + let json = kinesis::flatten_kinesis_logs(&body); + for record in json.iter() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); push_logs(stream_name, &req, &body).await?; } - } - - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); + } else if log_source.contains("otel") { + return Err(PostError::Invalid(anyhow!( + "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" + ))); + } else { + tracing::warn!("Unknown log source: {}", log_source); push_logs(stream_name, &req, &body).await?; } diff --git a/src/utils/header_parsing.rs b/src/utils/header_parsing.rs index 8d4feab1e..89caf4d9e 100644 --- a/src/utils/header_parsing.rs +++ b/src/utils/header_parsing.rs @@ -68,6 +68,8 @@ pub enum ParseHeaderError { SeperatorInValue(char), #[error("Stream name not found in header [x-p-stream]")] MissingStreamName, + #[error("Log source not found in header [x-p-log-source]")] + MissingLogSource, } impl ResponseError for ParseHeaderError {