From 3bfc262d4e2a14341f02323bfa3f86cde7f003bd Mon Sep 17 00:00:00 2001 From: Isanka Date: Wed, 3 Jul 2024 17:20:37 +0900 Subject: [PATCH] dynamic batch processing --- historical_data/src/main.rs | 79 ++++++++++++++++++++++++++++++++----- 1 file changed, 70 insertions(+), 9 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index ccc0ba0..757f02d 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -7,7 +7,7 @@ use mongodb::{ options::FindOptions, Client as MongoClient, }; -use rayon::prelude::*; + use regex::Regex; use serde::Deserialize; use serde_json::to_string; @@ -22,12 +22,15 @@ use std::{ fmt, sync::{Arc, Mutex}, time::Duration, + time::Instant, }; use tokio::{signal, sync::mpsc}; use tokio::{ sync::{broadcast, oneshot}, task::JoinHandle, }; + + type PgPool = Pool>; lazy_static::lazy_static! { @@ -36,6 +39,40 @@ lazy_static::lazy_static! { static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3"); } +struct BatchSizeManager { + current_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, // in documents per second +} + +impl BatchSizeManager { + fn new(initial_size: usize, min_size: usize, max_size: usize, performance_threshold: f64) -> Self { + BatchSizeManager { + current_size: initial_size, + min_size, + max_size, + performance_threshold, + } + } + + fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: std::time::Duration) { + let performance = docs_processed as f64 / time_taken.as_secs_f64(); + + if performance > self.performance_threshold { + self.current_size = (self.current_size * 2).min(self.max_size); + } else { + self.current_size = (self.current_size / 2).max(self.min_size); + } + } + + fn get_current_size(&self) -> usize { + self.current_size + } +} + + + const MAX_BATCH_SIZE: usize = 10000; const MAX_RETRIES: u32 = 5; const INITIAL_RETRY_DELAY: u64 = 1000; @@ -153,9 +190,14 @@ info!("Total documents to process: {}", total_docs); .find(filter, None) .await .context("Failed to create MongoDB cursor")?; - let mut batch = Vec::with_capacity(MAX_BATCH_SIZE); + + + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + let mut batch = Vec::with_capacity(batch_manager.get_current_size()); let mut processed_docs = 0; + let start_time = Instant::now(); + while let Some(result) = cursor.next().await { let doc = result.context("Failed to get next document from MongoDB cursor")?; let record_id = doc @@ -179,7 +221,9 @@ info!("Total documents to process: {}", total_docs); let statement_str = to_string(&statement).context("Failed to serialize statement to JSON")?; batch.push((record_id_str, statement_str)); - if batch.len() >= MAX_BATCH_SIZE { + if batch.len() >= batch_manager.get_current_size() { + let batch_start_time = Instant::now(); + insert_into_clickhouse( &ch_pool, &batch, @@ -187,13 +231,18 @@ info!("Total documents to process: {}", total_docs); &tenant_config.clickhouse_table, &app_state.pg_pool, &tenant_config.name, + &mut batch_manager, // Pass batch_manager as mutable reference ) .await .context("Failed to insert batch into ClickHouse")?; + + let batch_duration = batch_start_time.elapsed(); + batch_manager.adjust_batch_size(batch.len(), batch_duration); + processed_docs += batch.len(); info!( - "Processed {} out of {} documents", - processed_docs, total_docs + "Processed {} out of {} documents. Current batch size: {}", + processed_docs, total_docs, batch_manager.get_current_size() ); batch.clear(); } @@ -208,15 +257,17 @@ info!("Total documents to process: {}", total_docs); &tenant_config.clickhouse_table, &app_state.pg_pool, &tenant_config.name, + &mut batch_manager, ) .await .context("Failed to insert final batch into ClickHouse")?; processed_docs += batch.len(); } + let total_duration = start_time.elapsed(); info!( - "Completed processing {} out of {} documents", - processed_docs, total_docs + "Completed processing {} documents in {:?}. Final batch size: {}", + processed_docs, total_duration, batch_manager.get_current_size() ); if processed_docs < total_docs { @@ -271,10 +322,11 @@ async fn insert_into_clickhouse( clickhouse_table: &str, pg_pool: &PgPool, tenant_name: &str, + batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - for (chunk_index, chunk) in bulk_insert_values.chunks(MAX_BATCH_SIZE).enumerate() { + for (chunk_index, chunk) in bulk_insert_values.chunks(batch_manager.get_current_size()).enumerate() { let mut retry_count = 0; let mut delay = INITIAL_RETRY_DELAY; @@ -317,6 +369,14 @@ async fn insert_into_clickhouse( } } } + + // Add logging for batch size changes here + let old_size = batch_manager.get_current_size(); + batch_manager.adjust_batch_size(chunk.len(), Duration::from_secs(1)); // Assume 1 second per batch, adjust as needed + let new_size = batch_manager.get_current_size(); + if old_size != new_size { + info!("Batch size adjusted from {} to {}", old_size, new_size); + } } Ok(()) @@ -475,7 +535,7 @@ async fn log_failed_batch( let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); let bulk_insert_values: Vec<(String, String)> = serde_json::from_str(&failed_batch).context("Failed to deserialize failed batch")?; - + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); if let Err(e) = insert_into_clickhouse( &ch_pool, &bulk_insert_values, @@ -483,6 +543,7 @@ async fn log_failed_batch( &clickhouse_table, pg_pool, &tenant_name, + &mut batch_manager, ) .await {