Skip to content

Commit

Permalink
Merge branch 'main' into hottier-perf
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Dec 6, 2024
2 parents 1b41e6b + 4af4e6c commit 00c4b71
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 444 deletions.
5 changes: 4 additions & 1 deletion helm/templates/service-monitor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ kind: ServiceMonitor
metadata:
name: {{ include "parseable.fullname" . }}
namespace: {{ default .Release.Namespace .Values.parseable.metrics.serviceMonitor.namespace | quote }}
labels:
labels:
{{- with .Values.parseable.metrics.serviceMonitor.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
{{- include "parseable.labels" . | nindent 4 }}
spec:
{{ if .Values.parseable.metrics.serviceMonitor.spec.jobLabel }}
Expand Down
33 changes: 15 additions & 18 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ parseable:
repository: containers.parseable.com/parseable/parseable
tag: v1.6.3
pullPolicy: Always
## object store can be local, s3 or blob.
## object store can be local-store, s3-store or blob-store.
## local needs to be false if set to object store.
store: local-store
## Set to true if you want to deploy Parseable in a HA mode (multiple ingestors)
Expand Down Expand Up @@ -192,6 +192,7 @@ parseable:
metrics:
serviceMonitor:
enabled: false
labels: {}
namespace: ""
spec:
jobLabel: ""
Expand Down Expand Up @@ -300,7 +301,7 @@ fluent-bit:
replicaCount: 1
image:
repository: parseable/fluent-bit
tag: "v1"
tag: "v2"
pullPolicy: Always
testFramework:
enabled: true
Expand Down Expand Up @@ -382,25 +383,21 @@ fluent-bit:
[OUTPUT]
Name parseable
Match kube.*
P_Server parseable.parseable.svc.cluster.local
P_Port 80
P_Username admin
P_Password admin
P_Stream $NAMESPACE
Server_Host parseable.parseable.svc.cluster.local
Username admin
Password admin
Server_Port 80
Stream $NAMESPACE
Exclude_Namespaces kube-system, default
[OUTPUT]
Name http
Name parseable
Match k8s_events
host parseable.parseable.svc.cluster.local
http_User admin
http_Passwd admin
format json
port 80
header Content-Type application/json
header X-P-Stream k8s-events
uri /api/v1/ingest
json_date_key timestamp
json_date_format iso8601
Server_Host parseable.parseable.svc.cluster.local
Server_Port 80
Username admin
Password admin
Stream k8s-events
upstream: {}

Expand Down
46 changes: 7 additions & 39 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::otel;
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use crate::event::{
Expand All @@ -27,7 +26,7 @@ use crate::event::{
format::{self, EventFormat},
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::STREAM_INFO;
Expand Down Expand Up @@ -115,25 +114,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
{
let log_source: String = log_source.to_str().unwrap().to_owned();
if log_source == LOG_SOURCE_OTEL {
let mut json = otel::flatten_otel_logs(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
return Err(PostError::CustomError("Unknown log source".to_string()));
}
} else {
return Err(PostError::CustomError(
"log source key header is missing".to_string(),
));
}
push_logs(stream_name.to_string(), req.clone(), body).await?;
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down Expand Up @@ -293,9 +274,7 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

use actix_web::test::TestRequest;
use arrow_array::{
types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray,
};
use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray};
use arrow_schema::{DataType, Field};
use serde_json::json;

Expand Down Expand Up @@ -689,25 +668,14 @@ mod tests {
])
);

let c_a = vec![None, None, Some(vec![Some(1i64)]), Some(vec![Some(1)])];
let c_b = vec![None, None, None, Some(vec![Some(2i64)])];

assert_eq!(
rb.column_by_name("c_a")
.unwrap()
.as_any()
.downcast_ref::<ListArray>()
.unwrap(),
&ListArray::from_iter_primitive::<Int64Type, _, _>(c_a)
rb.column_by_name("c_a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, None, Some(1), Some(1)])
);

assert_eq!(
rb.column_by_name("c_b")
.unwrap()
.as_any()
.downcast_ref::<ListArray>()
.unwrap(),
&ListArray::from_iter_primitive::<Int64Type, _, _>(c_b)
rb.column_by_name("c_b").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, None, None, Some(2)])
);
}
}
1 change: 0 additions & 1 deletion src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub mod logstream;
pub mod middleware;
pub mod modal;
pub mod oidc;
mod otel;
pub mod query;
pub mod rbac;
pub mod role;
Expand Down
36 changes: 13 additions & 23 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*
*/

use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};

use actix_web::HttpRequest;
use arrow_schema::Field;
Expand All @@ -33,8 +30,8 @@ use crate::{
format::{self, EventFormat},
},
handlers::{
http::{ingest::PostError, kinesis, otel},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
http::{ingest::PostError, kinesis},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
},
metadata::STREAM_INFO,
storage::StreamType,
Expand All @@ -46,26 +43,19 @@ pub async fn flatten_and_push_logs(
body: Bytes,
stream_name: String,
) -> Result<(), PostError> {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => {
json = otel::flatten_otel_logs(&body);
}
_ => {
log::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
}
for record in json.iter_mut() {
let log_source = req
.headers()
.get(LOG_SOURCE_KEY)
.map(|header| header.to_str().unwrap_or_default())
.unwrap_or_default();
if log_source == LOG_SOURCE_KINESIS {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
push_logs(stream_name.clone(), req.clone(), body.clone()).await?;
}
} else {
push_logs(stream_name.to_string(), req, body).await?;
push_logs(stream_name, req, body).await?;
}
Ok(())
}
Expand Down
Loading

0 comments on commit 00c4b71

Please sign in to comment.