Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic batch processing #33

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 70 additions & 9 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use mongodb::{
options::FindOptions,
Client as MongoClient,
};
use rayon::prelude::*;

use regex::Regex;
use serde::Deserialize;
use serde_json::to_string;
Expand All @@ -22,12 +22,15 @@ use std::{
fmt,
sync::{Arc, Mutex},
time::Duration,
time::Instant,
};
use tokio::{signal, sync::mpsc};
use tokio::{
sync::{broadcast, oneshot},
task::JoinHandle,
};


type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

lazy_static::lazy_static! {
Expand All @@ -36,6 +39,40 @@ lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3");
}

struct BatchSizeManager {
current_size: usize,
min_size: usize,
max_size: usize,
performance_threshold: f64, // in documents per second
}

impl BatchSizeManager {
fn new(initial_size: usize, min_size: usize, max_size: usize, performance_threshold: f64) -> Self {
BatchSizeManager {
current_size: initial_size,
min_size,
max_size,
performance_threshold,
}
}

fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: std::time::Duration) {
let performance = docs_processed as f64 / time_taken.as_secs_f64();

if performance > self.performance_threshold {
self.current_size = (self.current_size * 2).min(self.max_size);
} else {
self.current_size = (self.current_size / 2).max(self.min_size);
}
}

fn get_current_size(&self) -> usize {
self.current_size
}
}



const MAX_BATCH_SIZE: usize = 10000;
const MAX_RETRIES: u32 = 5;
const INITIAL_RETRY_DELAY: u64 = 1000;
Expand Down Expand Up @@ -153,9 +190,14 @@ info!("Total documents to process: {}", total_docs);
.find(filter, None)
.await
.context("Failed to create MongoDB cursor")?;
let mut batch = Vec::with_capacity(MAX_BATCH_SIZE);


let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0);
let mut batch = Vec::with_capacity(batch_manager.get_current_size());
let mut processed_docs = 0;

let start_time = Instant::now();

while let Some(result) = cursor.next().await {
let doc = result.context("Failed to get next document from MongoDB cursor")?;
let record_id = doc
Expand All @@ -179,21 +221,28 @@ info!("Total documents to process: {}", total_docs);
let statement_str = to_string(&statement).context("Failed to serialize statement to JSON")?;
batch.push((record_id_str, statement_str));

if batch.len() >= MAX_BATCH_SIZE {
if batch.len() >= batch_manager.get_current_size() {
let batch_start_time = Instant::now();

insert_into_clickhouse(
&ch_pool,
&batch,
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&app_state.pg_pool,
&tenant_config.name,
&mut batch_manager, // Pass batch_manager as mutable reference
)
.await
.context("Failed to insert batch into ClickHouse")?;

let batch_duration = batch_start_time.elapsed();
batch_manager.adjust_batch_size(batch.len(), batch_duration);

processed_docs += batch.len();
info!(
"Processed {} out of {} documents",
processed_docs, total_docs
"Processed {} out of {} documents. Current batch size: {}",
processed_docs, total_docs, batch_manager.get_current_size()
);
batch.clear();
}
Expand All @@ -208,15 +257,17 @@ info!("Total documents to process: {}", total_docs);
&tenant_config.clickhouse_table,
&app_state.pg_pool,
&tenant_config.name,
&mut batch_manager,
)
.await
.context("Failed to insert final batch into ClickHouse")?;
processed_docs += batch.len();
}

let total_duration = start_time.elapsed();
info!(
"Completed processing {} out of {} documents",
processed_docs, total_docs
"Completed processing {} documents in {:?}. Final batch size: {}",
processed_docs, total_duration, batch_manager.get_current_size()
);

if processed_docs < total_docs {
Expand Down Expand Up @@ -271,10 +322,11 @@ async fn insert_into_clickhouse(
clickhouse_table: &str,
pg_pool: &PgPool,
tenant_name: &str,
batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference
) -> Result<()> {
let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table);

for (chunk_index, chunk) in bulk_insert_values.chunks(MAX_BATCH_SIZE).enumerate() {
for (chunk_index, chunk) in bulk_insert_values.chunks(batch_manager.get_current_size()).enumerate() {
let mut retry_count = 0;
let mut delay = INITIAL_RETRY_DELAY;

Expand Down Expand Up @@ -317,6 +369,14 @@ async fn insert_into_clickhouse(
}
}
}

// Add logging for batch size changes here
let old_size = batch_manager.get_current_size();
batch_manager.adjust_batch_size(chunk.len(), Duration::from_secs(1)); // Assume 1 second per batch, adjust as needed
let new_size = batch_manager.get_current_size();
if old_size != new_size {
info!("Batch size adjusted from {} to {}", old_size, new_size);
}
}

Ok(())
Expand Down Expand Up @@ -475,14 +535,15 @@ async fn log_failed_batch(
let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri);
let bulk_insert_values: Vec<(String, String)> =
serde_json::from_str(&failed_batch).context("Failed to deserialize failed batch")?;

let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0);
if let Err(e) = insert_into_clickhouse(
&ch_pool,
&bulk_insert_values,
&clickhouse_db,
&clickhouse_table,
pg_pool,
&tenant_name,
&mut batch_manager,
)
.await
{
Expand Down
Loading