diff --git a/helm/templates/service-monitor.yaml b/helm/templates/service-monitor.yaml index 49497ad98..0750ab4ec 100644 --- a/helm/templates/service-monitor.yaml +++ b/helm/templates/service-monitor.yaml @@ -4,7 +4,10 @@ kind: ServiceMonitor metadata: name: {{ include "parseable.fullname" . }} namespace: {{ default .Release.Namespace .Values.parseable.metrics.serviceMonitor.namespace | quote }} - labels: + labels: + {{- with .Values.parseable.metrics.serviceMonitor.labels }} + {{- toYaml . | nindent 4 }} + {{- end }} {{- include "parseable.labels" . | nindent 4 }} spec: {{ if .Values.parseable.metrics.serviceMonitor.spec.jobLabel }} diff --git a/helm/values.yaml b/helm/values.yaml index a7ba8d7ad..949ae2365 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -3,7 +3,7 @@ parseable: repository: containers.parseable.com/parseable/parseable tag: v1.6.3 pullPolicy: Always - ## object store can be local, s3 or blob. + ## object store can be local-store, s3-store or blob-store. ## local needs to be false if set to object store. store: local-store ## Set to true if you want to deploy Parseable in a HA mode (multiple ingestors) @@ -192,6 +192,7 @@ parseable: metrics: serviceMonitor: enabled: false + labels: {} namespace: "" spec: jobLabel: "" @@ -300,7 +301,7 @@ fluent-bit: replicaCount: 1 image: repository: parseable/fluent-bit - tag: "v1" + tag: "v2" pullPolicy: Always testFramework: enabled: true @@ -382,25 +383,21 @@ fluent-bit: [OUTPUT] Name parseable Match kube.* - P_Server parseable.parseable.svc.cluster.local - P_Port 80 - P_Username admin - P_Password admin - P_Stream $NAMESPACE + Server_Host parseable.parseable.svc.cluster.local + Username admin + Password admin + Server_Port 80 + Stream $NAMESPACE + Exclude_Namespaces kube-system, default [OUTPUT] - Name http + Name parseable Match k8s_events - host parseable.parseable.svc.cluster.local - http_User admin - http_Passwd admin - format json - port 80 - header Content-Type application/json - header X-P-Stream k8s-events - uri /api/v1/ingest - json_date_key timestamp - json_date_format iso8601 + Server_Host parseable.parseable.svc.cluster.local + Server_Port 80 + Username admin + Password admin + Stream k8s-events upstream: {} diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 9f72000b4..66cdd3cdd 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -18,7 +18,6 @@ use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; -use super::otel; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; use crate::event::{ @@ -27,7 +26,7 @@ use crate::event::{ format::{self, EventFormat}, }; use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY}; +use crate::handlers::STREAM_NAME_HEADER_KEY; use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::STREAM_INFO; @@ -115,25 +114,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result() - .unwrap(), - &ListArray::from_iter_primitive::(c_a) + rb.column_by_name("c_a").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, None, Some(1), Some(1)]) ); assert_eq!( - rb.column_by_name("c_b") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(c_b) + rb.column_by_name("c_b").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, None, None, Some(2)]) ); } } diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f77d6335b..94b681adb 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -36,7 +36,6 @@ pub mod logstream; pub mod middleware; pub mod modal; pub mod oidc; -mod otel; pub mod query; pub mod rbac; pub mod role; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 81ccfbd44..c21f22a10 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,10 +16,7 @@ * */ -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use actix_web::HttpRequest; use arrow_schema::Field; @@ -33,8 +30,8 @@ use crate::{ format::{self, EventFormat}, }, handlers::{ - http::{ingest::PostError, kinesis, otel}, - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, + http::{ingest::PostError, kinesis}, + LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR, }, metadata::STREAM_INFO, storage::StreamType, @@ -46,26 +43,19 @@ pub async fn flatten_and_push_logs( body: Bytes, stream_name: String, ) -> Result<(), PostError> { - //flatten logs - if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) { - let mut json: Vec> = Vec::new(); - let log_source: String = log_source.to_str().unwrap().to_owned(); - match log_source.as_str() { - LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), - LOG_SOURCE_OTEL => { - json = otel::flatten_otel_logs(&body); - } - _ => { - log::warn!("Unknown log source: {}", log_source); - push_logs(stream_name.to_string(), req.clone(), body).await?; - } - } - for record in json.iter_mut() { + let log_source = req + .headers() + .get(LOG_SOURCE_KEY) + .map(|header| header.to_str().unwrap_or_default()) + .unwrap_or_default(); + if log_source == LOG_SOURCE_KINESIS { + let json = kinesis::flatten_kinesis_logs(&body); + for record in json.iter() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name.to_string(), req.clone(), body).await?; + push_logs(stream_name.clone(), req.clone(), body.clone()).await?; } } else { - push_logs(stream_name.to_string(), req, body).await?; + push_logs(stream_name, req, body).await?; } Ok(()) } diff --git a/src/handlers/http/otel.rs b/src/handlers/http/otel.rs deleted file mode 100644 index 09b3eaba1..000000000 --- a/src/handlers/http/otel.rs +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use bytes::Bytes; -use proto::common::v1::KeyValue; -use proto::logs::v1::LogRecord; -use serde_json::Value; -mod proto; -use crate::handlers::http::otel::proto::logs::v1::LogRecordFlags; -use crate::handlers::http::otel::proto::logs::v1::LogsData; -use crate::handlers::http::otel::proto::logs::v1::SeverityNumber; -use std::collections::BTreeMap; -// Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte -fn collect_json_from_any_value( - key: &String, - value: super::otel::proto::common::v1::Value, -) -> BTreeMap { - let mut value_json: BTreeMap = BTreeMap::new(); - if value.str_val.is_some() { - value_json.insert( - key.to_string(), - Value::String(value.str_val.as_ref().unwrap().to_owned()), - ); - } - if value.bool_val.is_some() { - value_json.insert(key.to_string(), Value::Bool(value.bool_val.unwrap())); - } - if value.int_val.is_some() { - value_json.insert( - key.to_string(), - Value::String(value.int_val.as_ref().unwrap().to_owned()), - ); - } - if value.double_val.is_some() { - value_json.insert( - key.to_string(), - Value::Number(serde_json::Number::from_f64(value.double_val.unwrap()).unwrap()), - ); - } - - //ArrayValue is a vector of AnyValue - //traverse by recursively calling the same function - if value.array_val.is_some() { - let array_val = value.array_val.as_ref().unwrap(); - let values = &array_val.values; - for value in values { - let array_value_json = collect_json_from_any_value(key, value.clone()); - for key in array_value_json.keys() { - value_json.insert( - format!( - "{}_{}", - key.to_owned(), - value_to_string(array_value_json[key].to_owned()) - ), - array_value_json[key].to_owned(), - ); - } - } - } - - //KeyValueList is a vector of KeyValue - //traverse through each element in the vector - if value.kv_list_val.is_some() { - let kv_list_val = value.kv_list_val.unwrap(); - for key_value in kv_list_val.values { - let value = key_value.value; - if value.is_some() { - let value = value.unwrap(); - let key_value_json = collect_json_from_any_value(key, value); - - for key in key_value_json.keys() { - value_json.insert( - format!( - "{}_{}_{}", - key.to_owned(), - key_value.key, - value_to_string(key_value_json[key].to_owned()) - ), - key_value_json[key].to_owned(), - ); - } - } - } - } - if value.bytes_val.is_some() { - value_json.insert( - key.to_string(), - Value::String(value.bytes_val.as_ref().unwrap().to_owned()), - ); - } - - value_json -} - -//traverse through Value by calling function ollect_json_from_any_value -fn collect_json_from_values( - values: &Option, - key: &String, -) -> BTreeMap { - let mut value_json: BTreeMap = BTreeMap::new(); - - for value in values.iter() { - value_json = collect_json_from_any_value(key, value.clone()); - } - - value_json -} - -fn value_to_string(value: serde_json::Value) -> String { - match value.clone() { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - } -} - -pub fn flatten_attributes( - attributes: &Vec, - attribute_source_key: String, -) -> BTreeMap { - let mut attributes_json: BTreeMap = BTreeMap::new(); - for attribute in attributes { - let key = &attribute.key; - let value = &attribute.value; - let value_json = - collect_json_from_values(value, &format!("{}_{}", attribute_source_key, key)); - for key in value_json.keys() { - attributes_json.insert(key.to_owned(), value_json[key].to_owned()); - } - } - attributes_json -} - -pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { - let mut log_record_json: BTreeMap = BTreeMap::new(); - if log_record.time_unix_nano.is_some() { - log_record_json.insert( - "time_unix_nano".to_string(), - Value::String(log_record.time_unix_nano.as_ref().unwrap().to_string()), - ); - } - if log_record.observed_time_unix_nano.is_some() { - log_record_json.insert( - "observed_time_unix_nano".to_string(), - Value::String( - log_record - .observed_time_unix_nano - .as_ref() - .unwrap() - .to_string(), - ), - ); - } - if log_record.severity_number.is_some() { - let severity_number: i32 = log_record.severity_number.unwrap(); - log_record_json.insert( - "severity_number".to_string(), - Value::Number(serde_json::Number::from(severity_number)), - ); - if log_record.severity_text.is_none() { - log_record_json.insert( - "severity_text".to_string(), - Value::String(SeverityNumber::as_str_name(severity_number).to_string()), - ); - } - } - if log_record.severity_text.is_some() { - log_record_json.insert( - "severity_text".to_string(), - Value::String(log_record.severity_text.as_ref().unwrap().to_string()), - ); - } - - if log_record.body.is_some() { - let body = &log_record.body; - let body_json = collect_json_from_values(body, &"body".to_string()); - for key in body_json.keys() { - log_record_json.insert(key.to_owned(), body_json[key].to_owned()); - } - } - - if let Some(attributes) = log_record.attributes.as_ref() { - let attributes_json = flatten_attributes(attributes, "log_record".to_string()); - for key in attributes_json.keys() { - log_record_json.insert(key.to_owned(), attributes_json[key].to_owned()); - } - } - - if log_record.dropped_attributes_count.is_some() { - log_record_json.insert( - "log_record_dropped_attributes_count".to_string(), - Value::Number(serde_json::Number::from( - log_record.dropped_attributes_count.unwrap(), - )), - ); - } - - if log_record.flags.is_some() { - let flags: u32 = log_record.flags.unwrap(); - log_record_json.insert( - "flags_number".to_string(), - Value::Number(serde_json::Number::from(flags)), - ); - log_record_json.insert( - "flags_string".to_string(), - Value::String(LogRecordFlags::as_str_name(flags).to_string()), - ); - } - - if log_record.span_id.is_some() { - log_record_json.insert( - "span_id".to_string(), - Value::String(log_record.span_id.as_ref().unwrap().to_string()), - ); - } - - if log_record.trace_id.is_some() { - log_record_json.insert( - "trace_id".to_string(), - Value::String(log_record.trace_id.as_ref().unwrap().to_string()), - ); - } - - log_record_json -} - -pub fn flatten_otel_logs(body: &Bytes) -> Vec> { - let mut vec_otel_json: Vec> = Vec::new(); - let body_str = std::str::from_utf8(body).unwrap(); - let message: LogsData = serde_json::from_str(body_str).unwrap(); - - if let Some(records) = message.resource_logs.as_ref() { - let mut vec_resource_logs_json: Vec> = Vec::new(); - for record in records.iter() { - let mut resource_log_json: BTreeMap = BTreeMap::new(); - - if let Some(resource) = record.resource.as_ref() { - if let Some(attributes) = resource.attributes.as_ref() { - let attributes_json = flatten_attributes(attributes, "resource".to_string()); - for key in attributes_json.keys() { - resource_log_json.insert(key.to_owned(), attributes_json[key].to_owned()); - } - } - - if resource.dropped_attributes_count.is_some() { - resource_log_json.insert( - "resource_dropped_attributes_count".to_string(), - Value::Number(serde_json::Number::from( - resource.dropped_attributes_count.unwrap(), - )), - ); - } - } - - if let Some(scope_logs) = record.scope_logs.as_ref() { - let mut vec_scope_log_json: Vec> = Vec::new(); - for scope_log in scope_logs.iter() { - let mut scope_log_json: BTreeMap = BTreeMap::new(); - if scope_log.scope.is_some() { - let instrumentation_scope = scope_log.scope.as_ref().unwrap(); - if instrumentation_scope.name.is_some() { - scope_log_json.insert( - "instrumentation_scope_name".to_string(), - Value::String( - instrumentation_scope.name.as_ref().unwrap().to_string(), - ), - ); - } - if instrumentation_scope.version.is_some() { - scope_log_json.insert( - "instrumentation_scope_version".to_string(), - Value::String( - instrumentation_scope.version.as_ref().unwrap().to_string(), - ), - ); - } - - if let Some(attributes) = instrumentation_scope.attributes.as_ref() { - let attributes_json = - flatten_attributes(attributes, "instrumentation_scope".to_string()); - for key in attributes_json.keys() { - scope_log_json - .insert(key.to_owned(), attributes_json[key].to_owned()); - } - } - - if instrumentation_scope.dropped_attributes_count.is_some() { - scope_log_json.insert( - "instrumentation_scope_dropped_attributes_count".to_string(), - Value::Number(serde_json::Number::from( - instrumentation_scope.dropped_attributes_count.unwrap(), - )), - ); - } - } - if scope_log.schema_url.is_some() { - scope_log_json.insert( - "scope_log_schema_url".to_string(), - Value::String(scope_log.schema_url.as_ref().unwrap().to_string()), - ); - } - - for log_record in scope_log.log_records.iter() { - let log_record_json = flatten_log_record(log_record); - - for key in log_record_json.keys() { - scope_log_json.insert(key.to_owned(), log_record_json[key].to_owned()); - } - vec_scope_log_json.push(scope_log_json.clone()); - } - } - for scope_log_json in vec_scope_log_json.iter() { - vec_resource_logs_json.push(scope_log_json.clone()); - } - } - if record.schema_url.is_some() { - resource_log_json.insert( - "schema_url".to_string(), - Value::String(record.schema_url.as_ref().unwrap().to_string()), - ); - } - - for resource_logs_json in vec_resource_logs_json.iter_mut() { - for key in resource_log_json.keys() { - resource_logs_json.insert(key.to_owned(), resource_log_json[key].to_owned()); - } - } - } - - for resource_logs_json in vec_resource_logs_json.iter() { - vec_otel_json.push(resource_logs_json.clone()); - } - } - vec_otel_json -} diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 2232ce53a..4a4259354 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -48,9 +48,5 @@ const TRINO_USER: &str = "x-trino-user"; // constants for log Source values for known sources and formats const LOG_SOURCE_KINESIS: &str = "kinesis"; -// OpenTelemetry sends data in JSON format with -// specification as explained here https://opentelemetry.io/docs/specs/otel/logs/data-model/ -const LOG_SOURCE_OTEL: &str = "otel"; - // AWS Kinesis constants const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes"; diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 6e84ab0b7..43d12482d 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -304,6 +304,67 @@ pub fn flatten_array_objects( Ok(()) } +pub fn flatten_json(value: &Value) -> Vec { + match value { + Value::Array(arr) => { + let mut results = Vec::new(); + for item in arr { + results.extend(flatten_json(item)); + } + results + } + Value::Object(map) => { + let mut results = vec![map.clone()]; + for (key, val) in map { + if matches!(val, Value::Array(_)) { + if let Value::Array(arr) = val { + let mut new_results = Vec::new(); + for item in arr { + let flattened_items = flatten_json(item); + for flattened_item in flattened_items { + for result in &results { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), flattened_item.clone()); + new_results.push(new_obj); + } + } + } + results = new_results; + } + } else if matches!(val, Value::Object(_)) { + let nested_results = flatten_json(val); + let mut new_results = Vec::new(); + for nested_result in nested_results { + for result in &results { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), nested_result.clone()); + new_results.push(new_obj); + } + } + results = new_results; + } + } + results.into_iter().map(Value::Object).collect() + } + _ => vec![value.clone()], + } +} + +pub fn convert_to_array(flattened: Vec) -> Result { + let mut result = Vec::new(); + for item in flattened { + let mut map = Map::new(); + if let Some(item) = item.as_object() { + for (key, value) in item { + map.insert(key.clone(), value.clone()); + } + result.push(Value::Object(map)); + } else { + return Err(anyhow!("Expected object in array of objects")); + } + } + Ok(Value::Array(result)) +} #[cfg(test)] mod tests { use crate::utils::json::flatten::flatten_array_objects; diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 526fb532f..263a951e5 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -28,14 +28,17 @@ pub fn flatten_json_body( custom_partition: Option, validation_required: bool, ) -> Result { - flatten::flatten( - body, - "_", - time_partition, - time_partition_limit, - custom_partition, - validation_required, - ) + match flatten::convert_to_array(flatten::flatten_json(&body)) { + Ok(nested_value) => flatten::flatten( + nested_value, + "_", + time_partition, + time_partition_limit, + custom_partition, + validation_required, + ), + Err(err) => Err(err), + } } pub fn convert_array_to_object(