Skip to content

Commit eef2360

Browse files
committed
implement a batch queue and make insert query handling async
1 parent 3243f9e commit eef2360

File tree

8 files changed

+365
-85
lines changed

8 files changed

+365
-85
lines changed

.env.example

+10
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,13 @@ AWS_SECRET_ACCESS_KEY=
55
PGWIRE_PORT=5432
66
PORT=80
77
TIMEFUSION_TABLE_PREFIX=timefusion
8+
9+
# Batch insert configuration
10+
# Interval between batch inserts in milliseconds (default: 1000)
11+
BATCH_INTERVAL_MS=1000
12+
# Maximum number of rows to process in a single batch (default: 1000)
13+
MAX_BATCH_SIZE=1000
14+
# Set to "true" to enable batching queue (default: false = direct insertion)
15+
ENABLE_BATCH_QUEUE=false
16+
# Maximum number of concurrent PostgreSQL connections (default: 100)
17+
MAX_PG_CONNECTIONS=100

Cargo.lock

+34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,12 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
4040
tracing = "0.1.41"
4141
dotenv = "0.15.0"
4242
task = "0.0.1"
43+
crossbeam = "0.8.4"
4344
sqlparser = "0.55.0"
4445
rustls-pemfile = "2.2.0"
4546
rustls = "0.23.23"
47+
tokio-stream = { version = "0.1.17", features = ["net"] }
48+
tap = "1.0.1"
4649
actix-service = "2.0.2"
4750
lazy_static = "1.5.0"
4851
bcrypt = "0.17.0"

README.md

+13-8
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@ Timefusion speaks the postgres dialect, so you can insert and read from it using
1111

1212
Timefusion can be configured using the following environment variables:
1313

14-
| Variable | Description | Default |
15-
| --------------------- | ----------------------------- | -------------------------- |
16-
| `PORT` | HTTP server port | `80` |
17-
| `PGWIRE_PORT` | PostgreSQL wire protocol port | `5432` |
18-
| `AWS_S3_BUCKET` | AWS S3 bucket name | Required |
19-
| `AWS_S3_ENDPOINT` | AWS S3 endpoint URL | `https://s3.amazonaws.com` |
20-
| AWS_ACCESS_KEY_ID | AWS access key | - |
21-
| AWS_SECRET_ACCESS_KEY | AWS secret key | - |
14+
| Variable | Description | Default |
15+
| ---------------------- | ------------------------------------------------ | --------------------------- |
16+
| `PORT` | HTTP server port | `80` |
17+
| `PGWIRE_PORT` | PostgreSQL wire protocol port | `5432` |
18+
| `AWS_S3_BUCKET` | AWS S3 bucket name | Required |
19+
| `AWS_S3_ENDPOINT` | AWS S3 endpoint URL | `https://s3.amazonaws.com` |
20+
| `AWS_ACCESS_KEY_ID` | AWS access key | - |
21+
| `AWS_SECRET_ACCESS_KEY`| AWS secret key | - |
22+
| `TIMEFUSION_TABLE_PREFIX` | Prefix for Delta tables | `timefusion` |
23+
| `BATCH_INTERVAL_MS` | Interval between batch inserts in milliseconds | `1000` |
24+
| `MAX_BATCH_SIZE` | Maximum number of rows in a single batch | `1000` |
25+
| `ENABLE_BATCH_QUEUE` | Whether to use batch queue for inserts | `false` (direct insertion) |
26+
| `MAX_PG_CONNECTIONS` | Maximum number of concurrent PostgreSQL connections | `100` |
2227

2328
For local development, you can set `QUEUE_DB_PATH` to a location in your development environment.
2429

src/batch_queue.rs

+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use std::sync::Arc;
2+
use std::time::{Duration, Instant};
3+
4+
use anyhow::Result;
5+
use crossbeam::queue::SegQueue;
6+
use delta_kernel::arrow::record_batch::RecordBatch;
7+
use tokio::sync::RwLock;
8+
use tokio::time::interval;
9+
use tracing::{error, info};
10+
11+
/// BatchQueue collects RecordBatches and processes them at intervals
12+
#[derive(Debug)]
13+
pub struct BatchQueue {
14+
queue: Arc<SegQueue<RecordBatch>>,
15+
is_shutting_down: Arc<RwLock<bool>>,
16+
}
17+
18+
impl BatchQueue {
19+
pub fn new(db: Arc<crate::database::Database>, interval_ms: u64, max_rows: usize) -> Self {
20+
let queue = Arc::new(SegQueue::new());
21+
let is_shutting_down = Arc::new(RwLock::new(false));
22+
23+
let queue_clone = Arc::clone(&queue);
24+
let shutdown_flag = Arc::clone(&is_shutting_down);
25+
26+
tokio::spawn(async move {
27+
let mut ticker = interval(Duration::from_millis(interval_ms));
28+
29+
loop {
30+
ticker.tick().await;
31+
32+
if *shutdown_flag.read().await {
33+
process_batches(&db, &queue_clone, max_rows).await;
34+
break;
35+
}
36+
37+
process_batches(&db, &queue_clone, max_rows).await;
38+
}
39+
});
40+
41+
Self { queue, is_shutting_down }
42+
}
43+
44+
/// Add a batch to the queue
45+
pub fn queue(&self, batch: RecordBatch) -> Result<()> {
46+
if let Ok(flag) = self.is_shutting_down.try_read() {
47+
if *flag {
48+
return Err(anyhow::anyhow!("BatchQueue is shutting down"));
49+
}
50+
}
51+
52+
self.queue.push(batch);
53+
Ok(())
54+
}
55+
56+
/// Signal shutdown and wait for queue to drain
57+
pub async fn shutdown(&self) {
58+
let mut guard = self.is_shutting_down.write().await;
59+
*guard = true;
60+
}
61+
}
62+
63+
/// Process batches from the queue
64+
async fn process_batches(db: &Arc<crate::database::Database>, queue: &Arc<SegQueue<RecordBatch>>, max_rows: usize) {
65+
if queue.is_empty() {
66+
return;
67+
}
68+
69+
let mut batches = Vec::new();
70+
let mut total_rows = 0;
71+
72+
// Take batches up to max_rows
73+
while !queue.is_empty() && total_rows < max_rows {
74+
if let Some(batch) = queue.pop() {
75+
total_rows += batch.num_rows();
76+
batches.push(batch);
77+
} else {
78+
break;
79+
}
80+
}
81+
82+
if batches.is_empty() {
83+
return;
84+
}
85+
86+
// Measure and log the insertion performance
87+
let start = Instant::now();
88+
89+
// Use skip_queue=true to force direct insertion and avoid infinite loop
90+
match db.insert_records_batch("", batches.clone(), true).await {
91+
Ok(_) => {
92+
let elapsed = start.elapsed();
93+
info!(
94+
batches_count = batches.len(),
95+
rows_count = total_rows,
96+
duration_ms = elapsed.as_millis(),
97+
"Batch insert completed"
98+
);
99+
}
100+
Err(e) => {
101+
error!("Failed to insert batches: {}", e);
102+
}
103+
}
104+
}
105+
106+
#[cfg(test)]
107+
mod tests {
108+
use super::*;
109+
use crate::database::Database;
110+
use crate::persistent_queue::OtelLogsAndSpans;
111+
use chrono::Utc;
112+
use serde_arrow::schema::SchemaLike;
113+
use std::sync::Arc;
114+
use tokio::time::sleep;
115+
116+
#[tokio::test]
117+
async fn test_batch_queue() -> Result<()> {
118+
dotenv::dotenv().ok();
119+
let test_prefix = format!("test-batch-{}", uuid::Uuid::new_v4());
120+
unsafe {
121+
std::env::set_var("TIMEFUSION_TABLE_PREFIX", &test_prefix);
122+
}
123+
124+
// Initialize DB
125+
let db = Arc::new(Database::new().await?);
126+
127+
// Create batch queue with short interval for testing
128+
let batch_queue = BatchQueue::new(Arc::clone(&db), 100, 10);
129+
130+
// Create test records and convert to RecordBatch
131+
let now = Utc::now();
132+
let records = (0..5)
133+
.map(|i| OtelLogsAndSpans {
134+
project_id: "default".to_string(),
135+
timestamp: now,
136+
id: format!("test-{}", i),
137+
hashes: vec![],
138+
date: now.date_naive(),
139+
..Default::default()
140+
})
141+
.collect::<Vec<_>>();
142+
143+
let fields = OtelLogsAndSpans::fields()?;
144+
let batch = serde_arrow::to_record_batch(&fields, &records)?;
145+
146+
// Queue and process the batch
147+
batch_queue.queue(batch)?;
148+
sleep(Duration::from_millis(200)).await;
149+
150+
// Shutdown queue
151+
batch_queue.shutdown().await;
152+
sleep(Duration::from_millis(200)).await;
153+
154+
Ok(())
155+
}
156+
}
157+

0 commit comments

Comments
 (0)