Skip to content

Commit

Permalink
dynamic batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
isankadn committed Jul 3, 2024
1 parent b49fc63 commit 3bfc262
Showing 1 changed file with 70 additions and 9 deletions.
79 changes: 70 additions & 9 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use mongodb::{
options::FindOptions,
Client as MongoClient,
};
use rayon::prelude::*;

use regex::Regex;
use serde::Deserialize;
use serde_json::to_string;
Expand All @@ -22,12 +22,15 @@ use std::{
fmt,
sync::{Arc, Mutex},
time::Duration,
time::Instant,
};
use tokio::{signal, sync::mpsc};
use tokio::{
sync::{broadcast, oneshot},
task::JoinHandle,
};


type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

lazy_static::lazy_static! {
Expand All @@ -36,6 +39,40 @@ lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3");
}

struct BatchSizeManager {
current_size: usize,
min_size: usize,
max_size: usize,
performance_threshold: f64, // in documents per second
}

impl BatchSizeManager {
fn new(initial_size: usize, min_size: usize, max_size: usize, performance_threshold: f64) -> Self {
BatchSizeManager {
current_size: initial_size,
min_size,
max_size,
performance_threshold,
}
}

fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: std::time::Duration) {
let performance = docs_processed as f64 / time_taken.as_secs_f64();

if performance > self.performance_threshold {
self.current_size = (self.current_size * 2).min(self.max_size);
} else {
self.current_size = (self.current_size / 2).max(self.min_size);
}
}

fn get_current_size(&self) -> usize {
self.current_size
}
}



const MAX_BATCH_SIZE: usize = 10000;
const MAX_RETRIES: u32 = 5;
const INITIAL_RETRY_DELAY: u64 = 1000;
Expand Down Expand Up @@ -153,9 +190,14 @@ info!("Total documents to process: {}", total_docs);
.find(filter, None)
.await
.context("Failed to create MongoDB cursor")?;
let mut batch = Vec::with_capacity(MAX_BATCH_SIZE);


let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0);
let mut batch = Vec::with_capacity(batch_manager.get_current_size());
let mut processed_docs = 0;

let start_time = Instant::now();

while let Some(result) = cursor.next().await {
let doc = result.context("Failed to get next document from MongoDB cursor")?;
let record_id = doc
Expand All @@ -179,21 +221,28 @@ info!("Total documents to process: {}", total_docs);
let statement_str = to_string(&statement).context("Failed to serialize statement to JSON")?;
batch.push((record_id_str, statement_str));

if batch.len() >= MAX_BATCH_SIZE {
if batch.len() >= batch_manager.get_current_size() {
let batch_start_time = Instant::now();

insert_into_clickhouse(
&ch_pool,
&batch,
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&app_state.pg_pool,
&tenant_config.name,
&mut batch_manager, // Pass batch_manager as mutable reference
)
.await
.context("Failed to insert batch into ClickHouse")?;

let batch_duration = batch_start_time.elapsed();
batch_manager.adjust_batch_size(batch.len(), batch_duration);

processed_docs += batch.len();
info!(
"Processed {} out of {} documents",
processed_docs, total_docs
"Processed {} out of {} documents. Current batch size: {}",
processed_docs, total_docs, batch_manager.get_current_size()
);
batch.clear();
}
Expand All @@ -208,15 +257,17 @@ info!("Total documents to process: {}", total_docs);
&tenant_config.clickhouse_table,
&app_state.pg_pool,
&tenant_config.name,
&mut batch_manager,
)
.await
.context("Failed to insert final batch into ClickHouse")?;
processed_docs += batch.len();
}

let total_duration = start_time.elapsed();
info!(
"Completed processing {} out of {} documents",
processed_docs, total_docs
"Completed processing {} documents in {:?}. Final batch size: {}",
processed_docs, total_duration, batch_manager.get_current_size()
);

if processed_docs < total_docs {
Expand Down Expand Up @@ -271,10 +322,11 @@ async fn insert_into_clickhouse(
clickhouse_table: &str,
pg_pool: &PgPool,
tenant_name: &str,
batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference
) -> Result<()> {
let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table);

for (chunk_index, chunk) in bulk_insert_values.chunks(MAX_BATCH_SIZE).enumerate() {
for (chunk_index, chunk) in bulk_insert_values.chunks(batch_manager.get_current_size()).enumerate() {
let mut retry_count = 0;
let mut delay = INITIAL_RETRY_DELAY;

Expand Down Expand Up @@ -317,6 +369,14 @@ async fn insert_into_clickhouse(
}
}
}

// Add logging for batch size changes here
let old_size = batch_manager.get_current_size();
batch_manager.adjust_batch_size(chunk.len(), Duration::from_secs(1)); // Assume 1 second per batch, adjust as needed
let new_size = batch_manager.get_current_size();
if old_size != new_size {
info!("Batch size adjusted from {} to {}", old_size, new_size);
}
}

Ok(())
Expand Down Expand Up @@ -475,14 +535,15 @@ async fn log_failed_batch(
let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri);
let bulk_insert_values: Vec<(String, String)> =
serde_json::from_str(&failed_batch).context("Failed to deserialize failed batch")?;

let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0);
if let Err(e) = insert_into_clickhouse(
&ch_pool,
&bulk_insert_values,
&clickhouse_db,
&clickhouse_table,
pg_pool,
&tenant_name,
&mut batch_manager,
)
.await
{
Expand Down

0 comments on commit 3bfc262

Please sign in to comment.