Skip to content

Commit 8a7dafd

Browse files
restrict /api/v1/ingest api for otel
1 parent 1dd7c3d commit 8a7dafd

File tree

3 files changed

+46
-36
lines changed

3 files changed

+46
-36
lines changed

src/handlers/http/ingest.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use crate::event::{
2626
format::{self, EventFormat},
2727
};
2828
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
29-
use crate::handlers::STREAM_NAME_HEADER_KEY;
29+
use crate::handlers::{
30+
LOG_SOURCE_KEY, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, LOG_SOURCE_OTEL_TRACES,
31+
STREAM_NAME_HEADER_KEY,
32+
};
3033
use crate::metadata::error::stream_info::MetadataError;
3134
use crate::metadata::{SchemaVersion, STREAM_INFO};
3235
use crate::option::{Mode, CONFIG};
@@ -120,6 +123,16 @@ pub async fn handle_otel_logs_ingestion(
120123
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
121124
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
122125
};
126+
127+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
128+
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
129+
};
130+
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_LOGS {
131+
return Err(PostError::Invalid(anyhow::anyhow!(
132+
"Please use x-p-log-source: otel-logs for ingesting otel logs"
133+
)));
134+
}
135+
123136
let stream_name = stream_name.to_str().unwrap().to_owned();
124137
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
125138

@@ -144,6 +157,14 @@ pub async fn handle_otel_metrics_ingestion(
144157
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
145158
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
146159
};
160+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
161+
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
162+
};
163+
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_METRICS {
164+
return Err(PostError::Invalid(anyhow::anyhow!(
165+
"Please use x-p-log-source: otel-metrics for ingesting otel metrics"
166+
)));
167+
}
147168
let stream_name = stream_name.to_str().unwrap().to_owned();
148169
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
149170

@@ -168,6 +189,15 @@ pub async fn handle_otel_traces_ingestion(
168189
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
169190
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
170191
};
192+
193+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
194+
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
195+
};
196+
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_TRACES {
197+
return Err(PostError::Invalid(anyhow::anyhow!(
198+
"Please use x-p-log-source: otel-traces for ingesting otel traces"
199+
)));
200+
}
171201
let stream_name = stream_name.to_str().unwrap().to_owned();
172202
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
173203

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,9 @@ use anyhow::anyhow;
2121
use arrow_schema::Field;
2222
use bytes::Bytes;
2323
use chrono::{DateTime, NaiveDateTime, Utc};
24-
use nom::AsBytes;
25-
use opentelemetry_proto::tonic::{
26-
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
27-
};
2824
use itertools::Itertools;
2925
use serde_json::Value;
30-
use std::{
31-
collections::{BTreeMap, HashMap},
32-
sync::Arc,
33-
};
26+
use std::{collections::HashMap, sync::Arc};
3427

3528
use crate::{
3629
event::{
@@ -39,11 +32,9 @@ use crate::{
3932
},
4033
handlers::{
4134
http::{ingest::PostError, kinesis},
42-
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS,
43-
LOG_SOURCE_OTEL_TRACES, PREFIX_META, PREFIX_TAGS, SEPARATOR,
35+
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
4436
},
4537
metadata::{SchemaVersion, STREAM_INFO},
46-
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
4738
storage::StreamType,
4839
utils::{header_parsing::collect_labelled_headers, json::convert_array_to_object},
4940
};
@@ -57,32 +48,19 @@ pub async fn flatten_and_push_logs(
5748
push_logs(stream_name, &req, &body).await?;
5849
return Ok(());
5950
};
60-
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
61-
match log_source.to_str().unwrap() {
62-
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
63-
//custom flattening required for otel logs
64-
LOG_SOURCE_OTEL_LOGS => {
65-
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
66-
json = flatten_otel_logs(&logs);
67-
}
68-
//custom flattening required for otel metrics
69-
LOG_SOURCE_OTEL_METRICS => {
70-
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
71-
json = flatten_otel_metrics(metrics);
72-
}
73-
//custom flattening required for otel traces
74-
LOG_SOURCE_OTEL_TRACES => {
75-
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
76-
json = flatten_otel_traces(&traces);
77-
}
78-
log_source => {
79-
tracing::warn!("Unknown log source: {}", log_source);
51+
let log_source = log_source.to_str().unwrap();
52+
if log_source == LOG_SOURCE_KINESIS {
53+
let json = kinesis::flatten_kinesis_logs(&body);
54+
for record in json.iter() {
55+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
8056
push_logs(stream_name, &req, &body).await?;
8157
}
82-
}
83-
84-
for record in json.iter_mut() {
85-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
58+
} else if log_source.contains("otel") {
59+
return Err(PostError::Invalid(anyhow!(
60+
"Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces"
61+
)));
62+
} else {
63+
tracing::warn!("Unknown log source: {}", log_source);
8664
push_logs(stream_name, &req, &body).await?;
8765
}
8866

src/utils/header_parsing.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ pub enum ParseHeaderError {
6868
SeperatorInValue(char),
6969
#[error("Stream name not found in header [x-p-stream]")]
7070
MissingStreamName,
71+
#[error("Log source not found in header [x-p-log-source]")]
72+
MissingLogSource,
7173
}
7274

7375
impl ResponseError for ParseHeaderError {

0 commit comments

Comments
 (0)