From a67bb1e3ee14a99b9b2087b9ecd0278a8eaa0a68 Mon Sep 17 00:00:00 2001 From: Isanka Date: Tue, 6 Aug 2024 00:26:33 +0900 Subject: [PATCH 1/4] working means inserting but hashing not working --- historical_data/src/main.rs | 410 +++++++++++++++++++----------------- 1 file changed, 217 insertions(+), 193 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 757f02d..93135b9 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -30,7 +30,6 @@ use tokio::{ task::JoinHandle, }; - type PgPool = Pool>; lazy_static::lazy_static! { @@ -47,7 +46,12 @@ struct BatchSizeManager { } impl BatchSizeManager { - fn new(initial_size: usize, min_size: usize, max_size: usize, performance_threshold: f64) -> Self { + fn new( + initial_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, + ) -> Self { BatchSizeManager { current_size: initial_size, min_size, @@ -71,8 +75,6 @@ impl BatchSizeManager { } } - - const MAX_BATCH_SIZE: usize = 10000; const MAX_RETRIES: u32 = 5; const INITIAL_RETRY_DELAY: u64 = 1000; @@ -146,7 +148,10 @@ async fn connect_to_mongo(mongo_uri: &str) -> Result { if attempts > MAX_RETRIES { return Err(e.into()); } - error!("Failed to connect to MongoDB: {}. Retrying in {:?}", e, retry_delay); + error!( + "Failed to connect to MongoDB: {}. Retrying in {:?}", + e, retry_delay + ); tokio::time::sleep(retry_delay).await; retry_delay *= 2; } @@ -181,17 +186,16 @@ async fn process_tenant_historical_data( }; let total_docs = mongo_collection - .count_documents(filter.clone(), None) - .await - .context("Failed to count documents in MongoDB")? as usize; // Cast to usize -info!("Total documents to process: {}", total_docs); + .count_documents(filter.clone(), None) + .await + .context("Failed to count documents in MongoDB")? as usize; // Cast to usize + info!("Total documents to process: {}", total_docs); let mut cursor = mongo_collection .find(filter, None) .await .context("Failed to create MongoDB cursor")?; - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); let mut batch = Vec::with_capacity(batch_manager.get_current_size()); let mut processed_docs = 0; @@ -218,7 +222,8 @@ info!("Total documents to process: {}", total_docs); &tenant_config.name, )?; - let statement_str = to_string(&statement).context("Failed to serialize statement to JSON")?; + let statement_str = + to_string(&statement).context("Failed to serialize statement to JSON")?; batch.push((record_id_str, statement_str)); if batch.len() >= batch_manager.get_current_size() { @@ -231,7 +236,7 @@ info!("Total documents to process: {}", total_docs); &tenant_config.clickhouse_table, &app_state.pg_pool, &tenant_config.name, - &mut batch_manager, // Pass batch_manager as mutable reference + &mut batch_manager, // Pass batch_manager as mutable reference ) .await .context("Failed to insert batch into ClickHouse")?; @@ -242,7 +247,9 @@ info!("Total documents to process: {}", total_docs); processed_docs += batch.len(); info!( "Processed {} out of {} documents. Current batch size: {}", - processed_docs, total_docs, batch_manager.get_current_size() + processed_docs, + total_docs, + batch_manager.get_current_size() ); batch.clear(); } @@ -267,7 +274,9 @@ info!("Total documents to process: {}", total_docs); let total_duration = start_time.elapsed(); info!( "Completed processing {} documents in {:?}. Final batch size: {}", - processed_docs, total_duration, batch_manager.get_current_size() + processed_docs, + total_duration, + batch_manager.get_current_size() ); if processed_docs < total_docs { @@ -277,7 +286,11 @@ info!("Total documents to process: {}", total_docs); Ok(()) } -fn anonymize_statement(statement: &mut Document, encryption_salt: &str, tenant_name: &str) -> Result<()> { +fn anonymize_statement( + statement: &mut Document, + encryption_salt: &str, + tenant_name: &str, +) -> Result<()> { // Implement your anonymization logic here // This is a placeholder implementation let mut hasher = Sha256::new(); @@ -285,6 +298,7 @@ fn anonymize_statement(statement: &mut Document, encryption_salt: &str, tenant_n hasher.update(statement.to_string().as_bytes()); let result = hasher.finalize(); statement.insert("anonymized_hash", hex::encode(result)); + println!("Anonymized statement---: {}", statement); Ok(()) } @@ -322,11 +336,14 @@ async fn insert_into_clickhouse( clickhouse_table: &str, pg_pool: &PgPool, tenant_name: &str, - batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference + batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - for (chunk_index, chunk) in bulk_insert_values.chunks(batch_manager.get_current_size()).enumerate() { + for (chunk_index, chunk) in bulk_insert_values + .chunks(batch_manager.get_current_size()) + .enumerate() + { let mut retry_count = 0; let mut delay = INITIAL_RETRY_DELAY; @@ -392,16 +409,15 @@ async fn insert_batch( .await .context("Failed to get client from ClickHouse pool")?; - let insert_data: Result> = futures::future::try_join_all( - batch.iter().map(|(record_id, statement)| async move { + let insert_data: Result> = + futures::future::try_join_all(batch.iter().map(|(record_id, statement)| async move { let processed_statement = process_statement(statement).await?; Ok(format!( "('{}', '{}', now())", record_id, processed_statement )) - }), - ) - .await; + })) + .await; let insert_data = insert_data.context("Failed to process statements")?; @@ -476,7 +492,7 @@ async fn log_failed_batch( .await .context("Failed to get client from PostgreSQL pool")?; - let statement = client + let statement = client .prepare( "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) VALUES ($1, $2, $3, $4)", @@ -484,209 +500,217 @@ async fn log_failed_batch( .await .context("Failed to prepare PostgreSQL statement")?; - client - .execute( - &statement, - &[ - &tenant_name, - &clickhouse_db, - &clickhouse_table, - &failed_batch_json, - ], - ) - .await - .context("Failed to execute PostgreSQL statement")?; + client + .execute( + &statement, + &[ + &tenant_name, + &clickhouse_db, + &clickhouse_table, + &failed_batch_json, + ], + ) + .await + .context("Failed to execute PostgreSQL statement")?; - Ok(()) - } + Ok(()) +} - async fn retry_failed_batches(app_state: Arc) -> Result<()> { - let pg_pool = &app_state.pg_pool; +async fn retry_failed_batches(app_state: Arc) -> Result<()> { + let pg_pool = &app_state.pg_pool; - loop { - let mut client = pg_pool.get().await.context("Failed to get PostgreSQL client")?; - let statement = client - .prepare( - "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch + loop { + let mut client = pg_pool + .get() + .await + .context("Failed to get PostgreSQL client")?; + let statement = client + .prepare( + "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch FROM failed_batches ORDER BY created_at LIMIT 100", + ) + .await + .context("Failed to prepare PostgreSQL statement")?; + + let rows = client + .query(&statement, &[]) + .await + .context("Failed to execute PostgreSQL query")?; + + for row in rows { + let failed_batch_id: i32 = row.get(0); + let tenant_name: String = row.get(1); + let clickhouse_db: String = row.get(2); + let clickhouse_table: String = row.get(3); + let failed_batch: String = row.get(4); + + let tenant_config = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .cloned(); + + if let Some(tenant_config) = tenant_config { + let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); + let bulk_insert_values: Vec<(String, String)> = serde_json::from_str(&failed_batch) + .context("Failed to deserialize failed batch")?; + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &bulk_insert_values, + &clickhouse_db, + &clickhouse_table, + pg_pool, + &tenant_name, + &mut batch_manager, ) .await - .context("Failed to prepare PostgreSQL statement")?; - - let rows = client.query(&statement, &[]).await.context("Failed to execute PostgreSQL query")?; - - for row in rows { - let failed_batch_id: i32 = row.get(0); - let tenant_name: String = row.get(1); - let clickhouse_db: String = row.get(2); - let clickhouse_table: String = row.get(3); - let failed_batch: String = row.get(4); - - let tenant_config = app_state - .config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .cloned(); - - if let Some(tenant_config) = tenant_config { - let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); - let bulk_insert_values: Vec<(String, String)> = - serde_json::from_str(&failed_batch).context("Failed to deserialize failed batch")?; - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); - if let Err(e) = insert_into_clickhouse( - &ch_pool, - &bulk_insert_values, - &clickhouse_db, - &clickhouse_table, - pg_pool, - &tenant_name, - &mut batch_manager, - ) - .await - { - error!("Error retrying failed batch: {}", e); - } else { - let delete_statement = client - .prepare("DELETE FROM failed_batches WHERE id = $1") - .await - .context("Failed to prepare delete statement")?; - client - .execute(&delete_statement, &[&failed_batch_id]) - .await - .context("Failed to delete processed failed batch")?; - } + { + error!("Error retrying failed batch: {}", e); + } else { + let delete_statement = client + .prepare("DELETE FROM failed_batches WHERE id = $1") + .await + .context("Failed to prepare delete statement")?; + client + .execute(&delete_statement, &[&failed_batch_id]) + .await + .context("Failed to delete processed failed batch")?; } } - - tokio::time::sleep(std::time::Duration::from_secs(60)).await; } - } - fn validate_date_time(date_time_str: &str) -> Result { - NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M") - .map_err(|e| anyhow!("Invalid date and time format: {}", e)) + tokio::time::sleep(std::time::Duration::from_secs(60)).await; } +} - #[tokio::main] - async fn main() -> Result<()> { - env_logger::init(); - info!(target: "main", "Starting up"); - - let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); - let config_path = match env.as_str() { - "dev" => "config-dev.yml", - "prod" => "config-prod.yml", - _ => { - error!("Unsupported environment: {}", env); - return Err(anyhow!("Unsupported environment")); - } - }; - let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; +fn validate_date_time(date_time_str: &str) -> Result { + NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M") + .map_err(|e| anyhow!("Invalid date and time format: {}", e)) +} - let tenant_name = env::args() - .nth(1) - .ok_or_else(|| anyhow!("Missing tenant name argument"))?; +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + info!(target: "main", "Starting up"); + + let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); + let config_path = match env.as_str() { + "dev" => "config-dev.yml", + "prod" => "config-prod.yml", + _ => { + error!("Unsupported environment: {}", env); + return Err(anyhow!("Unsupported environment")); + } + }; + let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; - let start_date = env::args() - .nth(2) - .ok_or_else(|| anyhow!("Missing start date argument"))?; + let tenant_name = env::args() + .nth(1) + .ok_or_else(|| anyhow!("Missing tenant name argument"))?; - let end_date = env::args() - .nth(3) - .ok_or_else(|| anyhow!("Missing end date argument"))?; + let start_date = env::args() + .nth(2) + .ok_or_else(|| anyhow!("Missing start date argument"))?; - let start_date = validate_date_time(&start_date)?; - let end_date = validate_date_time(&end_date)?; + let end_date = env::args() + .nth(3) + .ok_or_else(|| anyhow!("Missing end date argument"))?; - if end_date < start_date { - return Err(anyhow!("End date must be greater than or equal to start date")); - } + let start_date = validate_date_time(&start_date)?; + let end_date = validate_date_time(&end_date)?; - let tenant = config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? - .clone(); - - let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); - let pg_manager = PostgresConnectionManager::new_from_stringlike( - &config.pg_database_url, - tokio_postgres::NoTls, - )?; - let pg_pool = Pool::builder().build(pg_manager).await?; - - let app_state = Arc::new(AppState { - config: config.clone(), - clickhouse_pools: vec![clickhouse_pool], - pg_pool, - }); - - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); - let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); - - let app_state_clone = app_state.clone(); - let run_handle = tokio::spawn(async move { - match run(app_state_clone, tenant_name, start_date, end_date).await { - Ok(success) => { - if success { - info!("Program ran successfully"); - } else { - error!("Program encountered an error"); - } - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { - error!("Failed to send shutdown signal"); - } - } + if end_date < start_date { + return Err(anyhow!( + "End date must be greater than or equal to start date" + )); + } + + let tenant = config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? + .clone(); + + let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); + let pg_manager = PostgresConnectionManager::new_from_stringlike( + &config.pg_database_url, + tokio_postgres::NoTls, + )?; + let pg_pool = Pool::builder().build(pg_manager).await?; + + let app_state = Arc::new(AppState { + config: config.clone(), + clickhouse_pools: vec![clickhouse_pool], + pg_pool, + }); + + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); + let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); + + let app_state_clone = app_state.clone(); + let run_handle = tokio::spawn(async move { + match run(app_state_clone, tenant_name, start_date, end_date).await { + Ok(success) => { + if success { + info!("Program ran successfully"); + } else { + error!("Program encountered an error"); } - Err(e) => { - error!("Error running the program: {}", e); - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { - error!("Failed to send shutdown signal"); - } + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); } } } - }); - - let retry_handle = tokio::spawn(retry_failed_batches(app_state)); - - tokio::select! { - _ = signal::ctrl_c() => { - info!("Received shutdown signal, shutting down gracefully..."); - if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { + Err(e) => { + error!("Error running the program: {}", e); + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { if let Err(_) = shutdown_tx.send(()) { error!("Failed to send shutdown signal"); } } } - _ = shutdown_rx => { - info!("Program finished, shutting down gracefully..."); - } - run_result = run_handle => { - match run_result { - Ok(_) => info!("Run task completed"), - Err(e) => error!("Run task failed: {}", e), + } + }); + + let retry_handle = tokio::spawn(retry_failed_batches(app_state)); + + tokio::select! { + _ = signal::ctrl_c() => { + info!("Received shutdown signal, shutting down gracefully..."); + if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); } } - retry_result = retry_handle => { - match retry_result { - Ok(inner_result) => { - match inner_result { - Ok(_) => info!("Retry task completed successfully"), - Err(e) => error!("Retry task failed: {}", e), - } + } + _ = shutdown_rx => { + info!("Program finished, shutting down gracefully..."); + } + run_result = run_handle => { + match run_result { + Ok(_) => info!("Run task completed"), + Err(e) => error!("Run task failed: {}", e), + } + } + retry_result = retry_handle => { + match retry_result { + Ok(inner_result) => { + match inner_result { + Ok(_) => info!("Retry task completed successfully"), + Err(e) => error!("Retry task failed: {}", e), } - Err(e) => error!("Retry task panicked: {}", e), } + Err(e) => error!("Retry task panicked: {}", e), } } - - Ok(()) } + + Ok(()) +} From 7af6f659c3b772f27af47b0bc168c0c73bb6fc98 Mon Sep 17 00:00:00 2001 From: Isanka Date: Tue, 6 Aug 2024 00:59:38 +0900 Subject: [PATCH 2/4] seems perfectly working fine code --- historical_data/src/main.rs | 268 ++++++++++++++++++++---------------- 1 file changed, 152 insertions(+), 116 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 93135b9..76a7105 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -1,10 +1,9 @@ use anyhow::{anyhow, Context, Result}; -use clickhouse_rs::{Client as ClickhouseClient, Pool as ClickhousePool}; -use futures::stream::{self, StreamExt}; +use clickhouse_rs::Pool as ClickhousePool; +use futures::stream::StreamExt; use log::{error, info, warn}; use mongodb::{ - bson::{self, doc, Bson, Document}, - options::FindOptions, + bson::{self, doc, Document}, Client as MongoClient, }; @@ -15,21 +14,16 @@ use sha2::{Digest, Sha256}; use bb8::Pool; use bb8_postgres::PostgresConnectionManager; -use chrono::{DateTime, NaiveDate, NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; + use std::{ env, - error::Error, - fmt, sync::{Arc, Mutex}, time::Duration, time::Instant, }; -use tokio::{signal, sync::mpsc}; -use tokio::{ - sync::{broadcast, oneshot}, - task::JoinHandle, -}; - +use tokio::signal; +use tokio::sync::oneshot; type PgPool = Pool>; lazy_static::lazy_static! { @@ -188,7 +182,7 @@ async fn process_tenant_historical_data( let total_docs = mongo_collection .count_documents(filter.clone(), None) .await - .context("Failed to count documents in MongoDB")? as usize; // Cast to usize + .context("Failed to count documents in MongoDB")? as usize; info!("Total documents to process: {}", total_docs); let mut cursor = mongo_collection @@ -199,65 +193,103 @@ async fn process_tenant_historical_data( let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); let mut batch = Vec::with_capacity(batch_manager.get_current_size()); let mut processed_docs = 0; - + let mut failed_docs = 0; let start_time = Instant::now(); while let Some(result) = cursor.next().await { - let doc = result.context("Failed to get next document from MongoDB cursor")?; - let record_id = doc - .get("_id") - .and_then(|id| id.as_object_id()) - .ok_or_else(|| anyhow!("Document is missing _id field"))?; - let statement = doc - .get("statement") - .and_then(|s| s.as_document()) - .ok_or_else(|| anyhow!("Document is missing statement field"))?; - - let record_id_str = record_id.to_hex(); - let mut statement = statement.to_owned(); - - anonymize_statement( - &mut statement, - &app_state.config.encryption_salt, - &tenant_config.name, - )?; - - let statement_str = - to_string(&statement).context("Failed to serialize statement to JSON")?; - batch.push((record_id_str, statement_str)); - - if batch.len() >= batch_manager.get_current_size() { - let batch_start_time = Instant::now(); - - insert_into_clickhouse( - &ch_pool, - &batch, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - &app_state.pg_pool, - &tenant_config.name, - &mut batch_manager, // Pass batch_manager as mutable reference - ) - .await - .context("Failed to insert batch into ClickHouse")?; - - let batch_duration = batch_start_time.elapsed(); - batch_manager.adjust_batch_size(batch.len(), batch_duration); + match result { + Ok(doc) => { + let record_id = match doc.get("_id").and_then(|id| id.as_object_id()) { + Some(id) => id.to_hex(), + None => { + warn!("Document is missing _id field, skipping"); + failed_docs += 1; + continue; + } + }; + + let statement = match doc.get("statement").and_then(|s| s.as_document()) { + Some(s) => s.to_owned(), + None => { + warn!( + "Document {} is missing statement field, skipping", + record_id + ); + failed_docs += 1; + continue; + } + }; + + let mut statement = statement.to_owned(); + + if let Err(e) = anonymize_statement( + &mut statement, + &app_state.config.encryption_salt, + &tenant_config.name, + ) { + warn!( + "Failed to anonymize statement for document {}: {}", + record_id, e + ); + failed_docs += 1; + continue; + } - processed_docs += batch.len(); - info!( - "Processed {} out of {} documents. Current batch size: {}", - processed_docs, - total_docs, - batch_manager.get_current_size() - ); - batch.clear(); + let statement_str = match to_string(&statement) { + Ok(s) => s, + Err(e) => { + warn!( + "Failed to serialize statement to JSON for document {}: {}", + record_id, e + ); + failed_docs += 1; + continue; + } + }; + + batch.push((record_id, statement_str)); + + if batch.len() >= batch_manager.get_current_size() { + let batch_start_time = Instant::now(); + + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + &app_state.pg_pool, + &tenant_config.name, + &mut batch_manager, + ) + .await + { + error!("Failed to insert batch into ClickHouse: {}", e); + failed_docs += batch.len(); + } else { + let batch_duration = batch_start_time.elapsed(); + batch_manager.adjust_batch_size(batch.len(), batch_duration); + + processed_docs += batch.len(); + info!( + "Processed {} out of {} documents. Current batch size: {}", + processed_docs, + total_docs, + batch_manager.get_current_size() + ); + } + batch.clear(); + } + } + Err(e) => { + warn!("Error fetching document from cursor: {}", e); + failed_docs += 1; + } } } // Insert any remaining documents if !batch.is_empty() { - insert_into_clickhouse( + if let Err(e) = insert_into_clickhouse( &ch_pool, &batch, &tenant_config.clickhouse_db, @@ -267,19 +299,24 @@ async fn process_tenant_historical_data( &mut batch_manager, ) .await - .context("Failed to insert final batch into ClickHouse")?; - processed_docs += batch.len(); + { + error!("Failed to insert final batch into ClickHouse: {}", e); + failed_docs += batch.len(); + } else { + processed_docs += batch.len(); + } } let total_duration = start_time.elapsed(); info!( - "Completed processing {} documents in {:?}. Final batch size: {}", + "Completed processing. Total processed: {}, Total failed: {}, Duration: {:?}, Final batch size: {}", processed_docs, + failed_docs, total_duration, batch_manager.get_current_size() ); - if processed_docs < total_docs { + if processed_docs + failed_docs < total_docs { warn!("Some documents were skipped during processing"); } @@ -291,14 +328,55 @@ fn anonymize_statement( encryption_salt: &str, tenant_name: &str, ) -> Result<()> { - // Implement your anonymization logic here - // This is a placeholder implementation + // Create a deep copy of the statement + let mut statement_copy = statement.clone(); + + // Check if all required fields exist + if !statement_copy.contains_key("actor") + || !statement_copy + .get_document("actor")? + .contains_key("account") + || !statement_copy + .get_document("actor")? + .get_document("account")? + .contains_key("name") + { + return Err(anyhow!("Statement is missing required fields")); + } + + let name = statement_copy + .get_document("actor")? + .get_document("account")? + .get_str("name")?; + + let value_to_hash = if name.contains('@') { + name.split('@').next().unwrap_or("") + } else if name.contains(':') { + name.split(':').last().unwrap_or("") + } else { + name + }; + + if value_to_hash.is_empty() { + return Err(anyhow!("Empty value to hash for name: {}", name)); + } + let mut hasher = Sha256::new(); - hasher.update(format!("{}{}", encryption_salt, tenant_name)); - hasher.update(statement.to_string().as_bytes()); + hasher.update(encryption_salt.as_bytes()); + hasher.update(tenant_name.as_bytes()); + hasher.update(value_to_hash.as_bytes()); let result = hasher.finalize(); - statement.insert("anonymized_hash", hex::encode(result)); - println!("Anonymized statement---: {}", statement); + let hashed_value = hex::encode(result); + + // Update the copy + statement_copy + .get_document_mut("actor")? + .get_document_mut("account")? + .insert("name", hashed_value); + + // If we've made it this far without errors, update the original statement + *statement = statement_copy; + println!("Anonymized statement: {:?}", statement); Ok(()) } @@ -435,48 +513,6 @@ async fn insert_batch( Ok(()) } -async fn deduplicate_clickhouse_data( - ch_pool: &ClickhousePool, - clickhouse_db: &str, - clickhouse_table: &str, -) -> Result<()> { - let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let mut client = ch_pool - .get_handle() - .await - .context("Failed to get client from ClickHouse pool")?; - - info!("Processing duplicate data..."); - - let create_dedup_table_query = format!( - "CREATE TABLE {table}_dedup ENGINE = MergeTree() PARTITION BY toYYYYMM(created_at) PRIMARY KEY id ORDER BY (id, created_at) SETTINGS index_granularity = 8192 AS SELECT id, any(statement) AS statement, any(created_at) AS created_at FROM {table} GROUP BY id", - table = full_table_name - ); - - let drop_table_query = format!("DROP TABLE {}", full_table_name); - let rename_table_query = format!( - "RENAME TABLE {}_dedup TO {}", - full_table_name, full_table_name - ); - - client - .execute(create_dedup_table_query.as_str()) - .await - .context("Failed to create dedup table in ClickHouse")?; - - client - .execute(drop_table_query.as_str()) - .await - .context("Failed to drop original table in ClickHouse")?; - - client - .execute(rename_table_query.as_str()) - .await - .context("Failed to rename dedup table in ClickHouse")?; - - Ok(()) -} - async fn log_failed_batch( pg_pool: &PgPool, tenant_name: &str, From ca246c585c917b24cb58c5db9bbd4cbcd5c8ab51 Mon Sep 17 00:00:00 2001 From: Isanka Date: Tue, 6 Aug 2024 02:39:10 +0900 Subject: [PATCH 3/4] working --- historical_data/src/main.rs | 5 +- src/main.rs | 688 +++++++++++++++++++++++------------- 2 files changed, 453 insertions(+), 240 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 76a7105..a178695 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -1,3 +1,4 @@ +// historical_data/src/main.rs use anyhow::{anyhow, Context, Result}; use clickhouse_rs::Pool as ClickhousePool; use futures::stream::StreamExt; @@ -376,7 +377,7 @@ fn anonymize_statement( // If we've made it this far without errors, update the original statement *statement = statement_copy; - println!("Anonymized statement: {:?}", statement); + // println!("Anonymized statement: {:?}", statement); Ok(()) } @@ -414,7 +415,7 @@ async fn insert_into_clickhouse( clickhouse_table: &str, pg_pool: &PgPool, tenant_name: &str, - batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference + batch_manager: &mut BatchSizeManager, ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); diff --git a/src/main.rs b/src/main.rs index fc0fcf1..39a69eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ -use anyhow::Result; +// src/main.rs - Live data processing +use anyhow::{anyhow, Context, Result}; use bb8::Pool; use bb8_postgres::PostgresConnectionManager; use clickhouse_rs::Pool as ClickhousePool; @@ -6,43 +7,28 @@ use config::{Config, File}; use futures::{future::join_all, stream::StreamExt}; use log::{error, info, warn}; use mongodb::{ - bson::{self, Bson, Document}, + bson::{self, doc, Document}, change_stream::event::{ChangeStreamEvent, ResumeToken}, options::ChangeStreamOptions, Client as MongoClient, }; use regex::Regex; use serde::Deserialize; -use serde_json::{to_string, Value}; +use serde_json::to_string; use sha2::{Digest, Sha256}; -use std::{env, error::Error, sync::Arc, time::Duration}; -use tokio::task; -use tokio_postgres::{Client as PostgresClient, NoTls}; +use std::{env, sync::Arc, time::Duration}; +use tokio::{signal, sync::oneshot, task}; lazy_static::lazy_static! { - static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_1: {}", e); - panic!("Invalid regular expression pattern"); - } - }; - static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_2: {}", e); - panic!("Invalid regular expression pattern"); - } - }; - static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_3: {}", e); - panic!("Invalid regular expression pattern"); - } - }; + static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1"); + static ref BACKSLASH_REGEX_2: Regex = Regex::new(r"\\(?:\\\\)*").expect("Failed to compile BACKSLASH_REGEX_2"); + static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3"); } +const MAX_BATCH_SIZE: usize = 10000; +const MAX_RETRIES: u32 = 5; +const INITIAL_RETRY_DELAY: u64 = 1000; + #[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, @@ -57,36 +43,68 @@ struct TenantConfig { #[derive(Deserialize, Debug)] struct AppConfig { tenants: Vec, - postgres_db: String, pg_database_url: String, encryption_salt: String, + batch_size: usize, + number_of_workers: usize, } -type PostgresPool = Pool>; +type PostgresPool = Pool>; type ClickhousePoolType = ClickhousePool; -#[derive(Debug)] struct AppState { config: AppConfig, postgres_pool: PostgresPool, clickhouse_pools: Vec, } -async fn run(app_state: Arc) -> Result<(), Box> { +struct BatchSizeManager { + current_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, +} + +impl BatchSizeManager { + fn new( + initial_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, + ) -> Self { + BatchSizeManager { + current_size: initial_size, + min_size, + max_size, + performance_threshold, + } + } + + fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: Duration) { + let performance = docs_processed as f64 / time_taken.as_secs_f64(); + + if performance > self.performance_threshold { + self.current_size = (self.current_size * 2).min(self.max_size); + } else { + self.current_size = (self.current_size / 2).max(self.min_size); + } + } + + fn get_current_size(&self) -> usize { + self.current_size + } +} + +async fn run(app_state: Arc) -> Result<()> { let tenants = app_state.config.tenants.clone(); let mut tasks = Vec::new(); for (index, tenant) in tenants.iter().enumerate() { let tenant_config = tenant.clone(); - let tenant_config_cloned = tenant_config.clone(); let app_state = app_state.clone(); let task = task::spawn(async move { - if let Err(e) = process_tenant_records(tenant_config, app_state, index).await { - log::error!( - "Error processing tenant {}: {}", - tenant_config_cloned.name, - e - ); + if let Err(e) = process_tenant_records(tenant_config.clone(), app_state, index).await { + error!("Error processing tenant {}: {}", tenant_config.name, e); } }); tasks.push(task); @@ -96,18 +114,23 @@ async fn run(app_state: Arc) -> Result<(), Box> { Ok(()) } -async fn connect_to_mongo(mongo_uri: &str) -> Result { +async fn connect_to_mongo(mongo_uri: &str) -> Result { let mut retry_delay = Duration::from_secs(1); + let mut attempts = 0; loop { match MongoClient::with_uri_str(mongo_uri).await { Ok(client) => return Ok(client), Err(e) => { - error!("Failed to connect to MongoDB: {}", e); + attempts += 1; + if attempts > MAX_RETRIES { + return Err(e.into()); + } + error!( + "Failed to connect to MongoDB: {}. Retrying in {:?}", + e, retry_delay + ); tokio::time::sleep(retry_delay).await; retry_delay *= 2; - if retry_delay > Duration::from_secs(60) { - return Err(e); - } } } } @@ -119,19 +142,14 @@ async fn process_tenant_records( pool_index: usize, ) -> Result<()> { let mongo_client = connect_to_mongo(&tenant_config.mongo_uri).await?; - //let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; - // println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri); - // println!("<<-- mongo_client {:?}", mongo_client); let mongo_db = mongo_client.database(&tenant_config.mongo_db); - // println!("<<-- mongo_db: {:?}", mongo_db); let mongo_collection: mongodb::Collection = mongo_db.collection(&tenant_config.mongo_collection); let pg_pool = &app_state.postgres_pool; let ch_pool = &app_state.clickhouse_pools[pool_index]; - // println!("pg_pool {:?}", pg_pool); + let pg_conn = pg_pool.get().await?; - // println!("pg_conn {:?}", pg_conn); let row = pg_conn .query_one( "SELECT token FROM resume_token WHERE tenant_name = $1 ORDER BY id DESC LIMIT 1", @@ -139,17 +157,19 @@ async fn process_tenant_records( ) .await .ok(); - // println!("row {:?}", row); - let mut options = mongodb::options::ChangeStreamOptions::default(); + + let mut options = ChangeStreamOptions::default(); if let Some(row) = row { let token_bytes: Vec = row.get("token"); if let Ok(resume_token) = bson::from_slice::(&token_bytes) { options.resume_after = Some(resume_token); } } - // println!("app_state {:?}", &app_state.config); - let change_stream_options = ChangeStreamOptions::default(); - let mut change_stream = mongo_collection.watch(None, change_stream_options).await?; + + let mut change_stream = mongo_collection.watch(None, options).await?; + let mut batch = Vec::with_capacity(app_state.config.batch_size); + let mut batch_manager = + BatchSizeManager::new(app_state.config.batch_size, 1000, MAX_BATCH_SIZE, 5000.0); while let Some(result) = change_stream.next().await { match result { @@ -165,79 +185,15 @@ async fn process_tenant_records( match (record_id, statement) { (Some(record_id), Some(statement)) => { let record_id_str = record_id.to_hex(); - info!("Record ID: {}", record_id_str); - let mut statement = statement.to_owned(); - if let Some(actor) = - statement.get_mut("actor").and_then(|a| a.as_document_mut()) - { - if let Some(account) = actor - .get_mut("account") - .and_then(|acc| acc.as_document_mut()) - { - if let Some(name) = account.get_mut("name") { - if let bson::Bson::String(name_str) = name { - let anonymized_name = if name_str.contains(':') { - let parts: Vec<&str> = - name_str.split(':').collect(); - info!( - "{}", - &bson::Bson::String(parts[1].to_string()) - ); - if parts.len() == 2 { - anonymize_data( - &bson::Bson::String(parts[1].to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } else { - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } - } else if name_str.contains('@') { - let parts: Vec<&str> = - name_str.split('@').collect(); - info!( - "{}", - &bson::Bson::String(parts[0].to_string()) - ); - if parts.len() == 2 { - anonymize_data( - &bson::Bson::String(parts[0].to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } else { - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } - } else { - info!( - "{}", - &bson::Bson::String(name_str.to_string()) - ); - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - }; - *name = bson::Bson::String(anonymized_name); - } else { - warn!("Missing 'name' field in 'actor.account'"); - } - } - } else { - warn!("Missing 'account' field in 'actor'"); - } - } else { - warn!("Missing 'actor' field in 'statement'"); + + if let Err(e) = anonymize_statement( + &mut statement, + &app_state.config.encryption_salt, + &tenant_config.name, + ) { + warn!("Failed to anonymize statement: {}", e); + continue; } let statement_str = match to_string(&statement) { @@ -248,14 +204,35 @@ async fn process_tenant_records( } }; - insert_into_clickhouse( - &ch_pool, - &statement_str, - &record_id_str, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - ) - .await; + batch.push((record_id_str, statement_str)); + + if batch.len() >= batch_manager.get_current_size() { + let batch_start_time = std::time::Instant::now(); + + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + pg_pool, + &tenant_config.name, + &mut batch_manager, + ) + .await + { + error!("Failed to insert batch into ClickHouse: {}", e); + } else { + let batch_duration = batch_start_time.elapsed(); + batch_manager.adjust_batch_size(batch.len(), batch_duration); + + info!( + "Processed {} documents. Current batch size: {}", + batch.len(), + batch_manager.get_current_size() + ); + } + batch.clear(); + } } (None, Some(_)) => { warn!("Missing '_id' field in the document"); @@ -269,35 +246,16 @@ async fn process_tenant_records( } if let Some(resume_token) = change_stream.resume_token() { - let token_bytes = match bson::to_vec(&resume_token) { - Ok(bytes) => bytes, - Err(e) => { - error!("Failed to serialize resume token: {}", e); - continue; - } - }; + let token_bytes = bson::to_vec(&resume_token)?; let tenant_name = tenant_config.name.clone(); let pg_pool = app_state.postgres_pool.clone(); - match pg_pool.get().await { - Ok(pg_conn) => { - if let Err(e) = pg_conn - .execute( - "INSERT INTO resume_token (token, tenant_name) VALUES ($1, $2) ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token", - &[&token_bytes, &tenant_name], - ) - .await - { - error!("Failed to update resume token in PostgreSQL: {}", e); - } - } - Err(e) => { - error!("Failed to get PostgreSQL connection: {}", e); - } - }; + if let Err(e) = + update_resume_token(&pg_pool, &token_bytes, &tenant_name).await + { + error!("Failed to update resume token in PostgreSQL: {}", e); + } } - } else { - warn!("Missing 'full_document' field in the change stream event"); } } Err(e) => { @@ -306,34 +264,100 @@ async fn process_tenant_records( } } + if !batch.is_empty() { + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + pg_pool, + &tenant_config.name, + &mut batch_manager, + ) + .await + { + error!("Failed to insert final batch into ClickHouse: {}", e); + } + } + Ok(()) +} + +async fn update_resume_token( + pg_pool: &PostgresPool, + token_bytes: &[u8], + tenant_name: &str, +) -> Result<()> { + let pg_conn = pg_pool.get().await?; + pg_conn + .execute( + "INSERT INTO resume_token (token, tenant_name) VALUES ($1, $2) ON CONFLICT (tenant_name) DO UPDATE SET token = EXCLUDED.token", + &[&token_bytes, &tenant_name], + ) + .await?; Ok(()) } -fn anonymize_data(data: &Bson, encryption_salt: &str, tenant_name: &str) -> String { - let mut hasher = Sha256::new(); - // let salt = &settings.encryption_salt; - let salt = format!("{}{}", encryption_salt, tenant_name); - hasher.update(salt.as_bytes()); - - let data_str = match data { - Bson::String(s) => s.as_str(), - Bson::Int32(i) => return i.to_string(), - Bson::Int64(i) => return i.to_string(), - _ => return String::new(), +fn anonymize_statement( + statement: &mut Document, + encryption_salt: &str, + tenant_name: &str, +) -> Result<()> { + // Create a deep copy of the statement + let mut statement_copy = statement.clone(); + + // Check if all required fields exist + if !statement_copy.contains_key("actor") + || !statement_copy + .get_document("actor")? + .contains_key("account") + || !statement_copy + .get_document("actor")? + .get_document("account")? + .contains_key("name") + { + return Err(anyhow!("Statement is missing required fields")); + } + + let name = statement_copy + .get_document("actor")? + .get_document("account")? + .get_str("name")?; + + let value_to_hash = if name.contains('@') { + name.split('@').next().unwrap_or("") + } else if name.contains(':') { + name.split(':').last().unwrap_or("") + } else { + name }; - hasher.update(data_str.as_bytes()); + if value_to_hash.is_empty() { + return Err(anyhow!("Empty value to hash for name: {}", name)); + } + + let mut hasher = Sha256::new(); + hasher.update(encryption_salt.as_bytes()); + hasher.update(tenant_name.as_bytes()); + hasher.update(value_to_hash.as_bytes()); let result = hasher.finalize(); - hex::encode(result) + let hashed_value = hex::encode(result); + + // Update the copy + statement_copy + .get_document_mut("actor")? + .get_document_mut("account")? + .insert("name", hashed_value); + + // If we've made it this far without errors, update the original statement + *statement = statement_copy; + Ok(()) } -async fn process_statement(statement: &str) -> Result> { - // Replace all double consecutive backslashes with four backslashes +async fn process_statement(statement: &str) -> Result { let output1 = BACKSLASH_REGEX_1 .replace_all(statement, "\\\\\\\\") .to_string(); - // Replace all single backslashes with two backslashes let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { if caps[0].len() % 2 == 1 { "\\\\".to_string() @@ -342,7 +366,6 @@ async fn process_statement(statement: &str) -> Result Result Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - // let escaped_statement_str = statement_str.replace("'", "\\\'"); - let escaped_statement_str = match process_statement(statement_str).await { - Ok(escaped_str) => escaped_str, - Err(e) => { - error!("Failed to process statement: {}", e); - return; + for (chunk_index, chunk) in bulk_insert_values + .chunks(batch_manager.get_current_size()) + .enumerate() + { + let mut retry_count = 0; + let mut delay = INITIAL_RETRY_DELAY; + + loop { + match insert_batch(ch_pool, chunk, &full_table_name).await { + Ok(_) => { + info!( + "Successfully inserted batch {} of {} records", + chunk_index + 1, + chunk.len() + ); + break; + } + Err(e) => { + error!("Failed to insert batch {}: {}", chunk_index + 1, e); + retry_count += 1; + if retry_count >= MAX_RETRIES { + error!( + "Max retries reached for batch {}. Logging failed batch.", + chunk_index + 1 + ); + log_failed_batch( + pg_pool, + tenant_name, + clickhouse_db, + clickhouse_table, + chunk, + ) + .await + .context("Failed to log failed batch")?; + return Err(anyhow!( + "Max retries exceeded for batch {}", + chunk_index + 1 + )); + } else { + warn!("Retrying batch {} in {} ms...", chunk_index + 1, delay); + tokio::time::sleep(Duration::from_millis(delay)).await; + delay = delay.saturating_mul(2); // Exponential backoff with overflow protection + } + } + } } - }; + + let old_size = batch_manager.get_current_size(); + batch_manager.adjust_batch_size(chunk.len(), Duration::from_secs(1)); // Assume 1 second per batch, adjust as needed + let new_size = batch_manager.get_current_size(); + if old_size != new_size { + info!("Batch size adjusted from {} to {}", old_size, new_size); + } + } + + Ok(()) +} + +async fn insert_batch( + ch_pool: &ClickhousePool, + batch: &[(String, String)], + full_table_name: &str, +) -> Result<()> { + let mut client = ch_pool + .get_handle() + .await + .context("Failed to get client from ClickHouse pool")?; + + let insert_data: Result> = + futures::future::try_join_all(batch.iter().map(|(record_id, statement)| async move { + let processed_statement = process_statement(statement).await?; + Ok(format!( + "('{}', '{}', now())", + record_id, processed_statement + )) + })) + .await; + + let insert_data = insert_data.context("Failed to process statements")?; let insert_query = format!( - "INSERT INTO {} (id, statement) VALUES ('{}', '{}')", - full_table_name, record_id_str, escaped_statement_str + "INSERT INTO {} (id, statement, created_at) VALUES {}", + full_table_name, + insert_data.join(" , ") ); - let mut client = match ch_pool.get_handle().await { - Ok(client) => client, - Err(e) => { - error!("Failed to get client from ClickHouse pool: {}", e); - return; - } - }; + client + .execute(insert_query.as_str()) + .await + .context("Failed to execute insert query")?; - let max_retries = 3; - let mut retry_count = 0; + Ok(()) +} - while retry_count < max_retries { - match client.execute(insert_query.as_str()).await { - Ok(_) => { - info!("Successfully inserted statement into ClickHouse"); - return; - } - Err(e) => { - error!("Failed to insert statement into ClickHouse: {}", e); - retry_count += 1; +async fn log_failed_batch( + pg_pool: &PostgresPool, + tenant_name: &str, + clickhouse_db: &str, + clickhouse_table: &str, + failed_batch: &[(String, String)], +) -> Result<()> { + let failed_batch_json = + serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; + + let mut client = pg_pool + .get() + .await + .context("Failed to get client from PostgreSQL pool")?; - if retry_count == max_retries { - error!("Max retries reached. Giving up."); - return; + let statement = client + .prepare( + "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) + VALUES ($1, $2, $3, $4)", + ) + .await + .context("Failed to prepare PostgreSQL statement")?; + + client + .execute( + &statement, + &[ + &tenant_name, + &clickhouse_db, + &clickhouse_table, + &failed_batch_json, + ], + ) + .await + .context("Failed to execute PostgreSQL statement")?; + + Ok(()) +} + +async fn retry_failed_batches(app_state: Arc) -> Result<()> { + let pg_pool = &app_state.postgres_pool; + + loop { + let mut client = pg_pool + .get() + .await + .context("Failed to get PostgreSQL client")?; + let statement = client + .prepare( + "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch + FROM failed_batches + ORDER BY created_at + LIMIT 100", + ) + .await + .context("Failed to prepare PostgreSQL statement")?; + + let rows = client + .query(&statement, &[]) + .await + .context("Failed to execute PostgreSQL query")?; + if rows.is_empty() { + tokio::time::sleep(Duration::from_secs(60)).await; + continue; + } + for row in rows { + let failed_batch_id: i32 = row.get(0); + let tenant_name: String = row.get(1); + let clickhouse_db: String = row.get(2); + let clickhouse_table: String = row.get(3); + let failed_batch: String = row.get(4); + + let tenant_config = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .cloned(); + + if let Some(tenant_config) = tenant_config { + let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); + let bulk_insert_values: Vec<(String, String)> = serde_json::from_str(&failed_batch) + .context("Failed to deserialize failed batch")?; + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &bulk_insert_values, + &clickhouse_db, + &clickhouse_table, + pg_pool, + &tenant_name, + &mut batch_manager, + ) + .await + { + error!("Error retrying failed batch: {}", e); } else { - let delay_ms = 1000 * retry_count; - info!("Retrying in {} ms...", delay_ms); - tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + let delete_statement = client + .prepare("DELETE FROM failed_batches WHERE id = $1") + .await + .context("Failed to prepare delete statement")?; + client + .execute(&delete_statement, &[&failed_batch_id]) + .await + .context("Failed to delete processed failed batch")?; } } } + + tokio::time::sleep(Duration::from_secs(60)).await; } } #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<()> { env_logger::init(); - info!(target: "main", "Starting up"); let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); @@ -425,32 +608,29 @@ async fn main() -> Result<(), Box> { "dev" => "config-dev.yml", "prod" => "config-prod.yml", _ => { - log::error!("Unsupported environment: {}", env); - return Err("Unsupported environment".into()); - } - }; - - let config_file = File::with_name(config_path); - let config: AppConfig = match Config::builder().add_source(config_file).build() { - Ok(config_builder) => match config_builder.try_deserialize() { - Ok(config) => config, - Err(err) => { - log::error!("Failed to deserialize config: {}", err); - return Err(err.into()); - } - }, - Err(err) => { - log::error!("Failed to build config: {}", err); - return Err(err.into()); + error!("Unsupported environment: {}", env); + return Err(anyhow!("Unsupported environment")); } }; - let postgres_manager = PostgresConnectionManager::new(config.pg_database_url.parse()?, NoTls); + let config: AppConfig = Config::builder() + .add_source(File::with_name(config_path)) + .build()? + .try_deserialize() + .context("Failed to deserialize config")?; + + let postgres_manager = PostgresConnectionManager::new( + config + .pg_database_url + .parse() + .context("Invalid PostgreSQL URL")?, + tokio_postgres::NoTls, + ); let postgres_pool = Pool::builder().build(postgres_manager).await?; let mut clickhouse_pools = Vec::new(); for tenant in &config.tenants { - let clickhouse_pool = ClickhousePool::new(&*tenant.clickhouse_uri); + let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri.as_str()); clickhouse_pools.push(clickhouse_pool); } @@ -459,11 +639,43 @@ async fn main() -> Result<(), Box> { postgres_pool, clickhouse_pools, }); - // println!("app_state_main {:?}", app_state); - let _ = run(app_state).await; - tokio::signal::ctrl_c() - .await - .expect("Failed to listen for ctrl-c signal"); + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let shutdown_tx = Arc::new(tokio::sync::Mutex::new(Some(shutdown_tx))); + + let app_state_clone = app_state.clone(); + let run_handle = tokio::spawn(async move { + if let Err(e) = run(app_state_clone).await { + error!("Error in main run loop: {}", e); + } + }); + + let app_state_clone = app_state.clone(); + let retry_handle = tokio::spawn(async move { + if let Err(e) = retry_failed_batches(app_state_clone).await { + error!("Error in retry failed batches loop: {}", e); + } + }); + + tokio::select! { + _ = signal::ctrl_c() => { + info!("Received shutdown signal, shutting down gracefully..."); + if let Some(tx) = shutdown_tx.lock().await.take() { + let _ = tx.send(()); + } + } + _ = shutdown_rx => { + info!("Shutdown signal received from within the application..."); + } + _ = run_handle => { + info!("Main run loop has completed."); + } + _ = retry_handle => { + info!("Retry failed batches loop has completed."); + } + } + + info!("Shutting down..."); + info!("Application shutdown complete."); Ok(()) } From e9685fba17ef8713081d4b133f1d0e692a8f5bea Mon Sep 17 00:00:00 2001 From: Isanka Date: Tue, 6 Aug 2024 11:28:13 +0900 Subject: [PATCH 4/4] working code --- .gitignore | 1 + Cargo.lock | 345 ++++++++++++++++++++++++------- Cargo.toml | 5 +- Dockerfile | 14 +- Dockerfile.dev | 37 ++++ historical_data/Cargo.toml | 3 +- src/main.rs | 414 +++++++++++++++++++++---------------- 7 files changed, 553 insertions(+), 266 deletions(-) create mode 100644 Dockerfile.dev diff --git a/.gitignore b/.gitignore index 24db069..309bcb6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ config.yml config-dev.yml config-prod.yml +app/ # Compiled files /target/ diff --git a/Cargo.lock b/Cargo.lock index f2a17f1..8ce2073 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,9 +110,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.81" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "async-trait" @@ -167,6 +167,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.6.0" @@ -198,6 +204,26 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.4.2", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.52", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -273,11 +299,35 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cc" -version = "1.0.90" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +dependencies = [ + "jobserver", + "libc", +] + +[[package]] +name = "cexpr" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -321,6 +371,17 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clickhouse-rs" version = "1.1.0-alpha.1" @@ -374,6 +435,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.14.0" @@ -664,7 +734,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21cdad81446a7f7dc43f6a77409efeb9733d2fa65553efef6018ef257c959b73" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "syn 1.0.109", @@ -722,9 +792,14 @@ dependencies = [ [[package]] name = "event-listener" -version = "2.5.3" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] [[package]] name = "fallible-iterator" @@ -734,9 +809,9 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" [[package]] name = "fastrand" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "finl_unicode" @@ -903,6 +978,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "hashbrown" version = "0.13.2" @@ -911,9 +992,9 @@ checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" [[package]] name = "hashbrown" -version = "0.14.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", "allocator-api2", @@ -921,11 +1002,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.8.4" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -933,9 +1014,12 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" @@ -1074,12 +1158,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.5" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -1115,6 +1199,15 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -1144,11 +1237,27 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" -version = "0.2.153" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" + +[[package]] +name = "libloading" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" +dependencies = [ + "cfg-if", + "windows-targets 0.52.4", +] [[package]] name = "libm" @@ -1156,11 +1265,38 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "librocksdb-sys" +version = "0.16.0+8.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce3d60bc059831dc1c83903fb45c103f75db65c5a7bf22272764d9cc683e348c" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + [[package]] name = "libsqlite3-sys" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "libz-sys" +version = "1.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c15da26e5af7e25c90b37a2d75cdbf940cf4a55316de9d84c679c9b8bfabf82e" dependencies = [ "cc", "pkg-config", @@ -1251,9 +1387,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "minimal-lexical" @@ -1301,11 +1437,13 @@ dependencies = [ "r2d2_postgres", "rayon", "regex", + "rocksdb", "serde", "serde_json", "serde_yaml", "sha2", "sqlx", + "time", "tokio", "tokio-postgres", ] @@ -1455,6 +1593,12 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1884,6 +2028,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rocksdb" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd13e55d6d7b8cd0ea569161127567cd587676c99f4472f779a0279aa60a7a7" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "ron" version = "0.8.1" @@ -1932,6 +2086,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.2.3" @@ -2058,9 +2218,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.197" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" dependencies = [ "serde_derive", ] @@ -2076,9 +2236,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.197" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", @@ -2087,22 +2247,35 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.114" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "indexmap", "itoa", + "memchr", "ryu", "serde", ] [[package]] name = "serde_spanned" -version = "0.6.5" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb5b1b31579f3811bf615c144393417496f152e12ac8b7663bf664f4a815306d" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ + "form_urlencoded", + "itoa", + "ryu", "serde", ] @@ -2174,6 +2347,12 @@ dependencies = [ "digest", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2213,6 +2392,9 @@ name = "smallvec" version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +dependencies = [ + "serde", +] [[package]] name = "socket2" @@ -2272,9 +2454,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9a2ccff1a000a5a59cd33da541d9f2fdcd9e6e8229cc200565942bff36d0aaa" +checksum = "27144619c6e5802f1380337a209d2ac1c431002dd74c6e60aebff3c506dc4f0c" dependencies = [ "sqlx-core", "sqlx-macros", @@ -2285,11 +2467,10 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24ba59a9342a3d9bab6c56c118be528b27c9b60e490080e9711a04dccac83ef6" +checksum = "a999083c1af5b5d6c071d34a708a19ba3e02106ad82ef7bbd69f5e48266b613b" dependencies = [ - "ahash", "atoi", "byteorder", "bytes", @@ -2302,6 +2483,7 @@ dependencies = [ "futures-intrusive", "futures-io", "futures-util", + "hashbrown 0.14.5", "hashlink", "hex", "indexmap", @@ -2322,26 +2504,26 @@ dependencies = [ [[package]] name = "sqlx-macros" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea40e2345eb2faa9e1e5e326db8c34711317d2b5e08d0d5741619048a803127" +checksum = "a23217eb7d86c584b8cbe0337b9eacf12ab76fe7673c513141ec42565698bb88" dependencies = [ "proc-macro2", "quote", "sqlx-core", "sqlx-macros-core", - "syn 1.0.109", + "syn 2.0.52", ] [[package]] name = "sqlx-macros-core" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5833ef53aaa16d860e92123292f1f6a3d53c34ba8b1969f152ef1a7bb803f3c8" +checksum = "1a099220ae541c5db479c6424bdf1b200987934033c2584f79a0e1693601e776" dependencies = [ "dotenvy", "either", - "heck", + "heck 0.5.0", "hex", "once_cell", "proc-macro2", @@ -2351,20 +2533,21 @@ dependencies = [ "sha2", "sqlx-core", "sqlx-mysql", + "sqlx-postgres", "sqlx-sqlite", - "syn 1.0.109", + "syn 2.0.52", "tempfile", "url", ] [[package]] name = "sqlx-mysql" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ed31390216d20e538e447a7a9b959e06ed9fc51c37b514b46eb758016ecd418" +checksum = "5afe4c38a9b417b6a9a5eeffe7235d0a106716495536e7727d1c7f4b1ff3eba6" dependencies = [ "atoi", - "base64 0.21.7", + "base64 0.22.1", "bitflags 2.4.2", "byteorder", "bytes", @@ -2401,12 +2584,12 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e" +checksum = "b1dbb157e65f10dbe01f729339c06d239120221c9ad9fa0ba8408c4cc18ecf21" dependencies = [ "atoi", - "base64 0.21.7", + "base64 0.22.1", "bitflags 2.4.2", "byteorder", "crc", @@ -2439,9 +2622,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" +checksum = "9b2cdd83c008a622d94499c0006d8ee5f821f36c89b7d625c900e5dc30b5c5ee" dependencies = [ "atoi", "flume", @@ -2454,10 +2637,10 @@ dependencies = [ "log", "percent-encoding", "serde", + "serde_urlencoded", "sqlx-core", "tracing", "url", - "urlencoding", ] [[package]] @@ -2531,18 +2714,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.58" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.58" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", @@ -2551,9 +2734,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.34" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", @@ -2572,9 +2755,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ "num-conv", "time-core", @@ -2687,9 +2870,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.11" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af06656561d28735e9c1cd63dfd57132c8155426aa6af24f36a00a351f88c48e" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" dependencies = [ "serde", "serde_spanned", @@ -2699,18 +2882,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.5" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" dependencies = [ "serde", ] [[package]] name = "toml_edit" -version = "0.22.7" +version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18769cd1cec395d70860ceb4d932812a0b4d06b1a4bb336745a4d21b9496e992" +checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ "indexmap", "serde", @@ -2866,21 +3049,15 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.0" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", "idna 0.5.0", "percent-encoding", ] -[[package]] -name = "urlencoding" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" - [[package]] name = "utf8parse" version = "0.2.1" @@ -3173,9 +3350,9 @@ checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" [[package]] name = "winnow" -version = "0.6.5" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8" +checksum = "68a9bda4691f099d435ad181000724da8e5899daa10713c2d432552b9ccd3a6f" dependencies = [ "memchr", ] @@ -3233,3 +3410,13 @@ name = "zeroize" version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index c35fc55..a243475 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,10 +25,13 @@ bb8-postgres = "0.8.1" serde_json = "1.0.114" serde_yaml = "0.9.33" rayon = "1.9.0" -sqlx = "0.7.4" +sqlx = "0.8.0" chrono = "0.4.35" regex = "1.10.4" lazy_static = "1.4.0" +rocksdb = "0.22.0" +time = "0.3.36" + [[bin]] diff --git a/Dockerfile b/Dockerfile index 32bebde..4609591 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,12 @@ # Build Stage FROM rust:latest as builder +RUN apt-get update && apt-get install -y \ + librocksdb-dev \ + libclang-dev \ + clang \ + && rm -rf /var/lib/apt/lists/* + WORKDIR /app COPY Cargo.toml Cargo.lock ./ @@ -13,7 +19,11 @@ ENV RUSTFLAGS="-C target-cpu=native" RUN cargo build --release # Production Stage -FROM rust:slim +FROM rust:latest + +RUN apt-get update && apt-get install -y \ + librocksdb-dev \ + && rm -rf /var/lib/apt/lists/* WORKDIR /app @@ -22,7 +32,7 @@ COPY --from=builder /app/target/release/historical_data . COPY config-prod.yml ./config-prod.yml COPY config-dev.yml ./config-dev.yml - +RUN mkdir -p /data/rocksdb ENV RUSTFLAGS="-C target-cpu=native" ENV RUST_ENV=prod CMD ["./mongo-to-clickhouse"] \ No newline at end of file diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 0000000..656a2d8 --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,37 @@ +# Development Stage +FROM rust:latest + +RUN apt-get update && apt-get install -y \ + librocksdb-dev \ + libclang-dev \ + clang \ + && rm -rf /var/lib/apt/lists/* + +# Install cargo-watch for auto-reloading +RUN cargo install cargo-watch + +WORKDIR /app + +# Copy only the dependency files first to leverage Docker cache +COPY Cargo.toml Cargo.lock ./ + +# Create dummy src directory and file to build dependencies +RUN mkdir src && echo "fn main() {}" > src/main.rs + +# Build dependencies +RUN cargo build + +# Remove the dummy src directory and file +RUN rm -rf src + +# Copy the rest of the source code +COPY src ./src +COPY historical_data ./historical_data +COPY config-prod.yml ./config-prod.yml +COPY config-dev.yml ./config-dev.yml + +ENV RUSTFLAGS="-C target-cpu=native" +ENV RUST_ENV=dev + +# Use cargo-watch to auto-reload on file changes +CMD ["cargo", "watch", "-x", "run"] \ No newline at end of file diff --git a/historical_data/Cargo.toml b/historical_data/Cargo.toml index 255f504..5c11764 100644 --- a/historical_data/Cargo.toml +++ b/historical_data/Cargo.toml @@ -25,7 +25,8 @@ bb8-postgres = "0.8.1" serde_json = "1.0.114" serde_yaml = "0.9.33" rayon = "1.9.0" -sqlx = "0.7.4" +sqlx = "0.8.0" chrono = "0.4.35" regex = "1.10.4" lazy_static = "1.4.0" + diff --git a/src/main.rs b/src/main.rs index 39a69eb..7763d89 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,12 +13,14 @@ use mongodb::{ Client as MongoClient, }; use regex::Regex; +use rocksdb::{Options, DB}; use serde::Deserialize; use serde_json::to_string; use sha2::{Digest, Sha256}; -use std::{env, sync::Arc, time::Duration}; -use tokio::{signal, sync::oneshot, task}; +use std::path::Path; +use std::{env, sync::Arc, time::Duration, time::Instant}; +use tokio::{signal, sync::oneshot, task}; lazy_static::lazy_static! { static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1"); static ref BACKSLASH_REGEX_2: Regex = Regex::new(r"\\(?:\\\\)*").expect("Failed to compile BACKSLASH_REGEX_2"); @@ -28,7 +30,29 @@ lazy_static::lazy_static! { const MAX_BATCH_SIZE: usize = 10000; const MAX_RETRIES: u32 = 5; const INITIAL_RETRY_DELAY: u64 = 1000; +struct RocksDBResumeTokenStore { + db: Arc, +} + +impl RocksDBResumeTokenStore { + pub fn new>(path: P) -> Result { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_max_open_files(10000); + opts.set_use_fsync(true); + let db = DB::open(&opts, path)?; + Ok(Self { db: Arc::new(db) }) + } + pub fn update_resume_token(&self, token_bytes: &[u8], tenant_name: &str) -> Result<()> { + self.db.put(tenant_name.as_bytes(), token_bytes)?; + Ok(()) + } + + pub fn get_resume_token(&self, tenant_name: &str) -> Result>> { + Ok(self.db.get(tenant_name.as_bytes())?) + } +} #[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, @@ -43,7 +67,6 @@ struct TenantConfig { #[derive(Deserialize, Debug)] struct AppConfig { tenants: Vec, - pg_database_url: String, encryption_salt: String, batch_size: usize, number_of_workers: usize, @@ -54,8 +77,8 @@ type ClickhousePoolType = ClickhousePool; struct AppState { config: AppConfig, - postgres_pool: PostgresPool, clickhouse_pools: Vec, + resume_token_store: Arc, } struct BatchSizeManager { @@ -91,11 +114,12 @@ impl BatchSizeManager { } fn get_current_size(&self) -> usize { - self.current_size + self.current_size // if enable this applicaitn will automatically adjust the batch size based on the system performance. } } async fn run(app_state: Arc) -> Result<()> { + info!("Starting main run loop"); let tenants = app_state.config.tenants.clone(); let mut tasks = Vec::new(); @@ -103,37 +127,43 @@ async fn run(app_state: Arc) -> Result<()> { let tenant_config = tenant.clone(); let app_state = app_state.clone(); let task = task::spawn(async move { - if let Err(e) = process_tenant_records(tenant_config.clone(), app_state, index).await { - error!("Error processing tenant {}: {}", tenant_config.name, e); + loop { + match process_tenant_records(tenant_config.clone(), app_state.clone(), index).await + { + Ok(_) => { + info!("Tenant {} processing completed", tenant_config.name); + break; + } + Err(e) => { + error!( + "Error processing tenant {}: {}. Retrying in 60 seconds...", + tenant_config.name, e + ); + tokio::time::sleep(Duration::from_secs(60)).await; + } + } } }); tasks.push(task); } - join_all(tasks).await; + // Wait for all tasks to complete, but don't stop if one fails + for task in tasks { + if let Err(e) = task.await { + error!("A tenant processing task panicked: {:?}", e); + } + } + Ok(()) } async fn connect_to_mongo(mongo_uri: &str) -> Result { - let mut retry_delay = Duration::from_secs(1); - let mut attempts = 0; - loop { - match MongoClient::with_uri_str(mongo_uri).await { - Ok(client) => return Ok(client), - Err(e) => { - attempts += 1; - if attempts > MAX_RETRIES { - return Err(e.into()); - } - error!( - "Failed to connect to MongoDB: {}. Retrying in {:?}", - e, retry_delay - ); - tokio::time::sleep(retry_delay).await; - retry_delay *= 2; - } - } - } + let client = MongoClient::with_uri_str(mongo_uri).await?; + + // Perform a simple operation to check if the connection is actually working + client.list_database_names(None, None).await?; + info!("Connected to MongoDB at {}", mongo_uri); + Ok(client) } async fn process_tenant_records( @@ -141,37 +171,57 @@ async fn process_tenant_records( app_state: Arc, pool_index: usize, ) -> Result<()> { - let mongo_client = connect_to_mongo(&tenant_config.mongo_uri).await?; + let mongo_client = match connect_to_mongo(&tenant_config.mongo_uri).await { + Ok(client) => client, + Err(e) => { + error!( + "Failed to connect to MongoDB for tenant {}: {}", + tenant_config.name, e + ); + return Err(e.into()); + } + }; + let mongo_db = mongo_client.database(&tenant_config.mongo_db); let mongo_collection: mongodb::Collection = mongo_db.collection(&tenant_config.mongo_collection); - let pg_pool = &app_state.postgres_pool; let ch_pool = &app_state.clickhouse_pools[pool_index]; + let resume_token_store = &app_state.resume_token_store; - let pg_conn = pg_pool.get().await?; - let row = pg_conn - .query_one( - "SELECT token FROM resume_token WHERE tenant_name = $1 ORDER BY id DESC LIMIT 1", - &[&tenant_config.name], - ) - .await - .ok(); + let resume_token = resume_token_store.get_resume_token(&tenant_config.name)?; + info!("Resuming change stream for tenant: {}", tenant_config.name); + info!("Resume token: {:?}", resume_token); let mut options = ChangeStreamOptions::default(); - if let Some(row) = row { - let token_bytes: Vec = row.get("token"); + if let Some(token_bytes) = resume_token { if let Ok(resume_token) = bson::from_slice::(&token_bytes) { options.resume_after = Some(resume_token); } } - let mut change_stream = mongo_collection.watch(None, options).await?; + let mut change_stream = match mongo_collection.watch(None, options).await { + Ok(stream) => stream, + Err(e) => { + error!( + "Failed to create change stream for tenant {}: {}", + tenant_config.name, e + ); + return Err(e.into()); + } + }; + let mut batch = Vec::with_capacity(app_state.config.batch_size); + let mut batch_start_time = Instant::now(); + info!( + "Starting change stream processing batch size: {}", + batch.capacity() + ); let mut batch_manager = - BatchSizeManager::new(app_state.config.batch_size, 1000, MAX_BATCH_SIZE, 5000.0); + BatchSizeManager::new(app_state.config.batch_size, 1, MAX_BATCH_SIZE, 5000.0); while let Some(result) = change_stream.next().await { + info!("Processing change event"); match result { Ok(change_event) => { if let ChangeStreamEvent { @@ -205,33 +255,40 @@ async fn process_tenant_records( }; batch.push((record_id_str, statement_str)); + let should_process = batch.len() >= batch_manager.get_current_size() + || batch_start_time.elapsed() >= Duration::from_secs(5); - if batch.len() >= batch_manager.get_current_size() { - let batch_start_time = std::time::Instant::now(); - - if let Err(e) = insert_into_clickhouse( + if should_process { + let batch_duration = batch_start_time.elapsed(); + if let Err(e) = process_batch( &ch_pool, &batch, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - pg_pool, - &tenant_config.name, + &tenant_config, + resume_token_store, &mut batch_manager, + batch_duration, ) .await { - error!("Failed to insert batch into ClickHouse: {}", e); - } else { - let batch_duration = batch_start_time.elapsed(); - batch_manager.adjust_batch_size(batch.len(), batch_duration); - - info!( - "Processed {} documents. Current batch size: {}", - batch.len(), - batch_manager.get_current_size() - ); + error!("Failed to process batch: {}", e); + // Don't update resume token if batch processing failed + continue; + } + + // Update resume token only after successful batch processing + if let Some(resume_token) = change_stream.resume_token() { + let token_bytes = bson::to_vec(&resume_token)?; + let tenant_name = tenant_config.name.clone(); + + if let Err(e) = resume_token_store + .update_resume_token(&token_bytes, &tenant_name) + { + error!("Failed to update resume token in RocksDB: {}", e); + } } + batch.clear(); + batch_start_time = Instant::now(); } } (None, Some(_)) => { @@ -244,22 +301,14 @@ async fn process_tenant_records( warn!("Missing both '_id' and 'statement' fields in the document"); } } - - if let Some(resume_token) = change_stream.resume_token() { - let token_bytes = bson::to_vec(&resume_token)?; - let tenant_name = tenant_config.name.clone(); - - let pg_pool = app_state.postgres_pool.clone(); - if let Err(e) = - update_resume_token(&pg_pool, &token_bytes, &tenant_name).await - { - error!("Failed to update resume token in PostgreSQL: {}", e); - } - } } } Err(e) => { - error!("Change stream error: {}", e); + error!( + "Change stream error for tenant {}: {}. Restarting stream...", + tenant_config.name, e + ); + return Err(e.into()); } } } @@ -270,30 +319,57 @@ async fn process_tenant_records( &batch, &tenant_config.clickhouse_db, &tenant_config.clickhouse_table, - pg_pool, + resume_token_store, &tenant_config.name, &mut batch_manager, ) .await { error!("Failed to insert final batch into ClickHouse: {}", e); + } else { + // Update resume token after successful processing of the final batch + if let Some(resume_token) = change_stream.resume_token() { + let token_bytes = bson::to_vec(&resume_token)?; + let tenant_name = tenant_config.name.clone(); + + if let Err(e) = resume_token_store.update_resume_token(&token_bytes, &tenant_name) { + error!("Failed to update final resume token in RocksDB: {}", e); + } + } } } Ok(()) } -async fn update_resume_token( - pg_pool: &PostgresPool, - token_bytes: &[u8], - tenant_name: &str, +async fn process_batch( + ch_pool: &ClickhousePoolType, + batch: &[(String, String)], + tenant_config: &TenantConfig, + resume_token_store: &Arc, + batch_manager: &mut BatchSizeManager, + batch_duration: Duration, ) -> Result<()> { - let pg_conn = pg_pool.get().await?; - pg_conn - .execute( - "INSERT INTO resume_token (token, tenant_name) VALUES ($1, $2) ON CONFLICT (tenant_name) DO UPDATE SET token = EXCLUDED.token", - &[&token_bytes, &tenant_name], - ) - .await?; + if let Err(e) = insert_into_clickhouse( + ch_pool, + batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + resume_token_store, + &tenant_config.name, + batch_manager, + ) + .await + { + error!("Failed to insert batch into ClickHouse: {}", e); + } else { + batch_manager.adjust_batch_size(batch.len(), batch_duration); + info!( + "Processed {} documents in {:?}. Current batch size: {}", + batch.len(), + batch_duration, + batch_manager.get_current_size() + ); + } Ok(()) } @@ -385,7 +461,7 @@ async fn insert_into_clickhouse( bulk_insert_values: &[(String, String)], clickhouse_db: &str, clickhouse_table: &str, - pg_pool: &PostgresPool, + resume_token_store: &RocksDBResumeTokenStore, tenant_name: &str, batch_manager: &mut BatchSizeManager, ) -> Result<()> { @@ -417,7 +493,7 @@ async fn insert_into_clickhouse( chunk_index + 1 ); log_failed_batch( - pg_pool, + resume_token_store, tenant_name, clickhouse_db, clickhouse_table, @@ -486,7 +562,7 @@ async fn insert_batch( } async fn log_failed_batch( - pg_pool: &PostgresPool, + resume_token_store: &RocksDBResumeTokenStore, tenant_name: &str, clickhouse_db: &str, clickhouse_table: &str, @@ -495,101 +571,80 @@ async fn log_failed_batch( let failed_batch_json = serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; - let mut client = pg_pool - .get() - .await - .context("Failed to get client from PostgreSQL pool")?; - - let statement = client - .prepare( - "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) - VALUES ($1, $2, $3, $4)", - ) - .await - .context("Failed to prepare PostgreSQL statement")?; - - client - .execute( - &statement, - &[ - &tenant_name, - &clickhouse_db, - &clickhouse_table, - &failed_batch_json, - ], + resume_token_store.db.put( + format!( + "failed_batch:{}:{}:{}", + tenant_name, clickhouse_db, clickhouse_table ) - .await - .context("Failed to execute PostgreSQL statement")?; + .as_bytes(), + failed_batch_json.as_bytes(), + )?; Ok(()) } async fn retry_failed_batches(app_state: Arc) -> Result<()> { - let pg_pool = &app_state.postgres_pool; + let resume_token_store = &app_state.resume_token_store; loop { - let mut client = pg_pool - .get() - .await - .context("Failed to get PostgreSQL client")?; - let statement = client - .prepare( - "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch - FROM failed_batches - ORDER BY created_at - LIMIT 100", - ) - .await - .context("Failed to prepare PostgreSQL statement")?; - - let rows = client - .query(&statement, &[]) - .await - .context("Failed to execute PostgreSQL query")?; - if rows.is_empty() { - tokio::time::sleep(Duration::from_secs(60)).await; - continue; - } - for row in rows { - let failed_batch_id: i32 = row.get(0); - let tenant_name: String = row.get(1); - let clickhouse_db: String = row.get(2); - let clickhouse_table: String = row.get(3); - let failed_batch: String = row.get(4); - - let tenant_config = app_state - .config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .cloned(); - - if let Some(tenant_config) = tenant_config { - let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); - let bulk_insert_values: Vec<(String, String)> = serde_json::from_str(&failed_batch) - .context("Failed to deserialize failed batch")?; - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); - if let Err(e) = insert_into_clickhouse( - &ch_pool, - &bulk_insert_values, - &clickhouse_db, - &clickhouse_table, - pg_pool, - &tenant_name, - &mut batch_manager, - ) - .await - { - error!("Error retrying failed batch: {}", e); + let iter = resume_token_store.db.iterator(rocksdb::IteratorMode::Start); + let failed_batch_prefix = b"failed_batch:"; + + for item in iter { + let (key, value) = item?; + if key.starts_with(failed_batch_prefix) { + let key_str = String::from_utf8_lossy(&key); + let parts: Vec<&str> = key_str.splitn(4, ':').collect(); + if parts.len() != 4 { + error!("Invalid failed batch key format: {}", key_str); + continue; + } + + let tenant_name = parts[1]; + let clickhouse_db = parts[2]; + let clickhouse_table = parts[3]; + + let failed_batch: Vec<(String, String)> = + serde_json::from_slice(&value).context("Failed to deserialize failed batch")?; + + let tenant_config = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .cloned(); + + if let Some(tenant_config) = tenant_config { + let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri.as_str()); + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + + match insert_into_clickhouse( + &ch_pool, + &failed_batch, + clickhouse_db, + clickhouse_table, + resume_token_store, + tenant_name, + &mut batch_manager, + ) + .await + { + Ok(_) => { + info!( + "Successfully retried failed batch for tenant: {}", + tenant_name + ); + resume_token_store.db.delete(key)?; + } + Err(e) => { + error!( + "Error retrying failed batch for tenant {}: {}", + tenant_name, e + ); + } + } } else { - let delete_statement = client - .prepare("DELETE FROM failed_batches WHERE id = $1") - .await - .context("Failed to prepare delete statement")?; - client - .execute(&delete_statement, &[&failed_batch_id]) - .await - .context("Failed to delete processed failed batch")?; + error!("Tenant config not found for tenant: {}", tenant_name); } } } @@ -619,25 +674,18 @@ async fn main() -> Result<()> { .try_deserialize() .context("Failed to deserialize config")?; - let postgres_manager = PostgresConnectionManager::new( - config - .pg_database_url - .parse() - .context("Invalid PostgreSQL URL")?, - tokio_postgres::NoTls, - ); - let postgres_pool = Pool::builder().build(postgres_manager).await?; - let mut clickhouse_pools = Vec::new(); for tenant in &config.tenants { let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri.as_str()); clickhouse_pools.push(clickhouse_pool); } + let resume_token_store = Arc::new(RocksDBResumeTokenStore::new("/app/data/rocksdb")?); + let app_state = Arc::new(AppState { config, - postgres_pool, clickhouse_pools, + resume_token_store, }); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();