Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: ingestion data flow #1100

Merged
merged 20 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::{
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use bytes::Bytes;
use chrono::{DateTime, Local, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::io::Error as IOError;
Expand Down Expand Up @@ -412,13 +411,11 @@ pub async fn get_first_event(
base_path_without_preceding_slash(),
stream_name
);
// Convert dates vector to Bytes object
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
ingestor.clone(),
dates_bytes,
&dates,
)
.await?;
if !ingestor_first_event_at.is_empty() {
Expand Down
29 changes: 27 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::{
collections::{HashMap, HashSet},
fmt::Display,
sync::Arc,
};

Expand All @@ -29,7 +30,10 @@ use chrono::DateTime;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{metadata::SchemaVersion, utils::arrow::get_field};
use crate::{
metadata::SchemaVersion,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
};

use super::DEFAULT_TIMESTAMP_KEY;

Expand Down Expand Up @@ -73,6 +77,20 @@ impl From<&str> for LogSource {
}
}

impl Display for LogSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
LogSource::Kinesis => "kinesis",
LogSource::OtelLogs => "otel-logs",
LogSource::OtelMetrics => "otel-metrics",
LogSource::OtelTraces => "otel-traces",
LogSource::Json => "json",
LogSource::Pmeta => "pmeta",
LogSource::Custom(custom) => custom,
})
}
}

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
Expand Down Expand Up @@ -126,7 +144,14 @@ pub trait EventFormat: Sized {
}
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
let rb = Self::decode(data, new_schema.clone())?;

let mut rb = Self::decode(data, new_schema.clone())?;
rb = replace_columns(
rb.schema(),
&rb,
&[0],
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

Ok((rb, is_first))
}
Expand Down
40 changes: 10 additions & 30 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing::error;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType};
use crate::{metadata, storage::StreamType};
use chrono::NaiveDateTime;
use std::collections::HashMap;

Expand All @@ -49,7 +49,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(&self) -> Result<(), EventError> {
pub async fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand All @@ -69,10 +69,10 @@ impl Event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(
STREAM_WRITERS.append_to_local(
&self.stream_name,
&key,
self.rb.clone(),
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
Expand All @@ -98,44 +98,24 @@ impl Event {
Ok(())
}

pub fn process_unchecked(&self) -> Result<(), PostError> {
pub fn process_unchecked(&self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

Self::process_event(
STREAM_WRITERS.append_to_local(
&self.stream_name,
&key,
self.rb.clone(),
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
)
.map_err(PostError::Event)
)?;

Ok(())
}

pub fn clear(&self, stream_name: &str) {
STREAM_WRITERS.clear(stream_name);
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
stream_type: &StreamType,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values.clone(),
stream_type,
)?;
Ok(())
}
}

pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
Expand Down
6 changes: 3 additions & 3 deletions src/event/writer/mem_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<const N: usize> Default for MemWriter<N> {
}

impl<const N: usize> MemWriter<N> {
pub fn push(&mut self, schema_key: &str, rb: RecordBatch) {
pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) {
if !self.schema_map.contains(schema_key) {
self.schema_map.insert(schema_key.to_owned());
self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap();
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct MutableBuffer<const N: usize> {
}

impl<const N: usize> MutableBuffer<N> {
fn push(&mut self, rb: RecordBatch) -> Option<Vec<RecordBatch>> {
fn push(&mut self, rb: &RecordBatch) -> Option<Vec<RecordBatch>> {
if self.rows + rb.num_rows() >= N {
let left = N - self.rows;
let right = rb.num_rows() - left;
Expand All @@ -121,7 +121,7 @@ impl<const N: usize> MutableBuffer<N> {
Some(inner)
} else {
self.rows += rb.num_rows();
self.inner.push(rb);
self.inner.push(rb.clone());
None
}
}
Expand Down
Loading
Loading