Skip to content

Commit

Permalink
refactor: ingestion data flow (#1100)
Browse files Browse the repository at this point in the history
* refactor: kinesis message construction may panic

* refactor: replace `BTreeMap` with `serde_json::Map`

* refactor: get rid of clone

* refactor: use `Value` for JSON data

* refactor: `HeaderMap::get` and `let Some else`

* refacror: ingest utils don't need http context anymore

* refactor: more descriptive error variants

* refactor: PUT stream header extraction

* refactor: use Path and Json extractor

* don't extract where not required

* refactor: serde `date_list`

* refactor: serde `DefaultPrivilege`

* refactor: serde `Dashboard`

* refactor: serde `Filter`

* refactor: move up `p_timestamp` addition to recordbatch

* refactor: refer over clone

* fix: don't hog write privileges

* refactor: DRY stream writer creation

* refactor: serde `StreamType`
  • Loading branch information
de-sh authored Jan 23, 2025
1 parent c5964fc commit 4d2897e
Show file tree
Hide file tree
Showing 29 changed files with 597 additions and 618 deletions.
5 changes: 1 addition & 4 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::{
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use bytes::Bytes;
use chrono::{DateTime, Local, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::io::Error as IOError;
Expand Down Expand Up @@ -412,13 +411,11 @@ pub async fn get_first_event(
base_path_without_preceding_slash(),
stream_name
);
// Convert dates vector to Bytes object
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
ingestor.clone(),
dates_bytes,
&dates,
)
.await?;
if !ingestor_first_event_at.is_empty() {
Expand Down
29 changes: 27 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::{
collections::{HashMap, HashSet},
fmt::Display,
sync::Arc,
};

Expand All @@ -29,7 +30,10 @@ use chrono::DateTime;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{metadata::SchemaVersion, utils::arrow::get_field};
use crate::{
metadata::SchemaVersion,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
};

use super::DEFAULT_TIMESTAMP_KEY;

Expand Down Expand Up @@ -73,6 +77,20 @@ impl From<&str> for LogSource {
}
}

impl Display for LogSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
LogSource::Kinesis => "kinesis",
LogSource::OtelLogs => "otel-logs",
LogSource::OtelMetrics => "otel-metrics",
LogSource::OtelTraces => "otel-traces",
LogSource::Json => "json",
LogSource::Pmeta => "pmeta",
LogSource::Custom(custom) => custom,
})
}
}

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
Expand Down Expand Up @@ -126,7 +144,14 @@ pub trait EventFormat: Sized {
}
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
let rb = Self::decode(data, new_schema.clone())?;

let mut rb = Self::decode(data, new_schema.clone())?;
rb = replace_columns(
rb.schema(),
&rb,
&[0],
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

Ok((rb, is_first))
}
Expand Down
40 changes: 10 additions & 30 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing::error;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType};
use crate::{metadata, storage::StreamType};
use chrono::NaiveDateTime;
use std::collections::HashMap;

Expand All @@ -49,7 +49,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(&self) -> Result<(), EventError> {
pub async fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand All @@ -69,10 +69,10 @@ impl Event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(
STREAM_WRITERS.append_to_local(
&self.stream_name,
&key,
self.rb.clone(),
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
Expand All @@ -98,44 +98,24 @@ impl Event {
Ok(())
}

pub fn process_unchecked(&self) -> Result<(), PostError> {
pub fn process_unchecked(&self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

Self::process_event(
STREAM_WRITERS.append_to_local(
&self.stream_name,
&key,
self.rb.clone(),
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
)
.map_err(PostError::Event)
)?;

Ok(())
}

pub fn clear(&self, stream_name: &str) {
STREAM_WRITERS.clear(stream_name);
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
stream_type: &StreamType,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values.clone(),
stream_type,
)?;
Ok(())
}
}

pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
Expand Down
6 changes: 3 additions & 3 deletions src/event/writer/mem_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<const N: usize> Default for MemWriter<N> {
}

impl<const N: usize> MemWriter<N> {
pub fn push(&mut self, schema_key: &str, rb: RecordBatch) {
pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) {
if !self.schema_map.contains(schema_key) {
self.schema_map.insert(schema_key.to_owned());
self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap();
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct MutableBuffer<const N: usize> {
}

impl<const N: usize> MutableBuffer<N> {
fn push(&mut self, rb: RecordBatch) -> Option<Vec<RecordBatch>> {
fn push(&mut self, rb: &RecordBatch) -> Option<Vec<RecordBatch>> {
if self.rows + rb.num_rows() >= N {
let left = N - self.rows;
let right = rb.num_rows() - left;
Expand All @@ -121,7 +121,7 @@ impl<const N: usize> MutableBuffer<N> {
Some(inner)
} else {
self.rows += rb.num_rows();
self.inner.push(rb);
self.inner.push(rb.clone());
None
}
}
Expand Down
Loading

0 comments on commit 4d2897e

Please sign in to comment.