Skip to content
Open
548 changes: 540 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ edition = "2024"

[dependencies]
tokio = { version = "1.43", features = ["full"] }
foyer = {version ="0.17.3",features=["serde"]}
object_store = { version = "0.12.2", features = ["aws", "azure", "gcp"] }
datafusion = "46.0.0"
arrow = "54.2.0"
uuid = { version = "1.13", features = ["v4", "serde"] }
Expand All @@ -18,7 +20,7 @@ log = "0.4.25"
color-eyre = "0.6.3"
arrow-schema = "54.1.0"
regex = "1.11.1"
deltalake = { version = "0.25.0", features = ["datafusion", "s3"] }
deltalake = { version = "0.26.2", features = ["datafusion", "s3","azure", "gcs",] }
delta_kernel = { version = "0.8.0", features = [
"arrow-conversion",
"default-engine",
Expand Down Expand Up @@ -67,6 +69,9 @@ aws-sdk-s3 = "1.3.0"
url = "2.5.4"
datafusion-common = "46.0.0"
tokio-cron-scheduler = "0.10"
metrics = "0.24.2"
flate2 = "1.1.1"
async-stream = "0.3"

[dev-dependencies]
serial_test = "3.2.0"
Expand All @@ -75,5 +80,9 @@ scopeguard = "1.2.0"
rand = "0.8.5"

[features]
default = []
default = ["s3", "azure", "gcs"]
s3 = ["deltalake/s3", "object_store/aws"]
azure = ["deltalake/azure", "object_store/azure"]
gcs = ["deltalake/gcs", "object_store/gcp"]
test = []

176 changes: 88 additions & 88 deletions benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,51 +34,51 @@ fn bench_batch_ingestion(c: &mut Criterion) {
let mut records = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
records.push(IngestRecord {
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
instrumentation_scope: None,
errors: None,
tags: vec!["tag".to_string()],
errors: None,
tags: vec!["tag".to_string()],
});
}

Expand Down Expand Up @@ -110,51 +110,51 @@ fn bench_insertion_range(c: &mut Criterion) {
let mut records = Vec::with_capacity(size);
for _ in 0..size {
records.push(IngestRecord {
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
instrumentation_scope: None,
errors: None,
tags: vec!["tag".to_string()],
errors: None,
tags: vec!["tag".to_string()],
});
}

Expand Down
21 changes: 11 additions & 10 deletions src/batch_queue.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use anyhow::Result;
use crossbeam::queue::SegQueue;
use delta_kernel::arrow::record_batch::RecordBatch;
use tokio::sync::RwLock;
use tokio::time::interval;
use tokio::{sync::RwLock, time::interval};
use tracing::{error, info};

/// BatchQueue collects RecordBatches and processes them at intervals
#[derive(Debug)]
pub struct BatchQueue {
queue: Arc<SegQueue<RecordBatch>>,
queue: Arc<SegQueue<RecordBatch>>,
is_shutting_down: Arc<RwLock<bool>>,
}

Expand Down Expand Up @@ -105,14 +106,14 @@ async fn process_batches(db: &Arc<crate::database::Database>, queue: &Arc<SegQue

#[cfg(test)]
mod tests {
use super::*;
use crate::database::Database;
use crate::persistent_queue::OtelLogsAndSpans;
use chrono::Utc;
use serde_arrow::schema::SchemaLike;
use std::sync::Arc;

use chrono::Utc;
use tokio::time::sleep;

use super::*;
use crate::{database::Database, persistent_queue::OtelLogsAndSpans};

#[tokio::test]
async fn test_batch_queue() -> Result<()> {
dotenv::dotenv().ok();
Expand Down
Loading
Loading