From a67bb1e3ee14a99b9b2087b9ecd0278a8eaa0a68 Mon Sep 17 00:00:00 2001 From: Isanka Date: Tue, 6 Aug 2024 00:26:33 +0900 Subject: [PATCH] working means inserting but hashing not working --- historical_data/src/main.rs | 410 +++++++++++++++++++----------------- 1 file changed, 217 insertions(+), 193 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 757f02d..93135b9 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -30,7 +30,6 @@ use tokio::{ task::JoinHandle, }; - type PgPool = Pool>; lazy_static::lazy_static! { @@ -47,7 +46,12 @@ struct BatchSizeManager { } impl BatchSizeManager { - fn new(initial_size: usize, min_size: usize, max_size: usize, performance_threshold: f64) -> Self { + fn new( + initial_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, + ) -> Self { BatchSizeManager { current_size: initial_size, min_size, @@ -71,8 +75,6 @@ impl BatchSizeManager { } } - - const MAX_BATCH_SIZE: usize = 10000; const MAX_RETRIES: u32 = 5; const INITIAL_RETRY_DELAY: u64 = 1000; @@ -146,7 +148,10 @@ async fn connect_to_mongo(mongo_uri: &str) -> Result { if attempts > MAX_RETRIES { return Err(e.into()); } - error!("Failed to connect to MongoDB: {}. Retrying in {:?}", e, retry_delay); + error!( + "Failed to connect to MongoDB: {}. Retrying in {:?}", + e, retry_delay + ); tokio::time::sleep(retry_delay).await; retry_delay *= 2; } @@ -181,17 +186,16 @@ async fn process_tenant_historical_data( }; let total_docs = mongo_collection - .count_documents(filter.clone(), None) - .await - .context("Failed to count documents in MongoDB")? as usize; // Cast to usize -info!("Total documents to process: {}", total_docs); + .count_documents(filter.clone(), None) + .await + .context("Failed to count documents in MongoDB")? as usize; // Cast to usize + info!("Total documents to process: {}", total_docs); let mut cursor = mongo_collection .find(filter, None) .await .context("Failed to create MongoDB cursor")?; - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); let mut batch = Vec::with_capacity(batch_manager.get_current_size()); let mut processed_docs = 0; @@ -218,7 +222,8 @@ info!("Total documents to process: {}", total_docs); &tenant_config.name, )?; - let statement_str = to_string(&statement).context("Failed to serialize statement to JSON")?; + let statement_str = + to_string(&statement).context("Failed to serialize statement to JSON")?; batch.push((record_id_str, statement_str)); if batch.len() >= batch_manager.get_current_size() { @@ -231,7 +236,7 @@ info!("Total documents to process: {}", total_docs); &tenant_config.clickhouse_table, &app_state.pg_pool, &tenant_config.name, - &mut batch_manager, // Pass batch_manager as mutable reference + &mut batch_manager, // Pass batch_manager as mutable reference ) .await .context("Failed to insert batch into ClickHouse")?; @@ -242,7 +247,9 @@ info!("Total documents to process: {}", total_docs); processed_docs += batch.len(); info!( "Processed {} out of {} documents. Current batch size: {}", - processed_docs, total_docs, batch_manager.get_current_size() + processed_docs, + total_docs, + batch_manager.get_current_size() ); batch.clear(); } @@ -267,7 +274,9 @@ info!("Total documents to process: {}", total_docs); let total_duration = start_time.elapsed(); info!( "Completed processing {} documents in {:?}. Final batch size: {}", - processed_docs, total_duration, batch_manager.get_current_size() + processed_docs, + total_duration, + batch_manager.get_current_size() ); if processed_docs < total_docs { @@ -277,7 +286,11 @@ info!("Total documents to process: {}", total_docs); Ok(()) } -fn anonymize_statement(statement: &mut Document, encryption_salt: &str, tenant_name: &str) -> Result<()> { +fn anonymize_statement( + statement: &mut Document, + encryption_salt: &str, + tenant_name: &str, +) -> Result<()> { // Implement your anonymization logic here // This is a placeholder implementation let mut hasher = Sha256::new(); @@ -285,6 +298,7 @@ fn anonymize_statement(statement: &mut Document, encryption_salt: &str, tenant_n hasher.update(statement.to_string().as_bytes()); let result = hasher.finalize(); statement.insert("anonymized_hash", hex::encode(result)); + println!("Anonymized statement---: {}", statement); Ok(()) } @@ -322,11 +336,14 @@ async fn insert_into_clickhouse( clickhouse_table: &str, pg_pool: &PgPool, tenant_name: &str, - batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference + batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - for (chunk_index, chunk) in bulk_insert_values.chunks(batch_manager.get_current_size()).enumerate() { + for (chunk_index, chunk) in bulk_insert_values + .chunks(batch_manager.get_current_size()) + .enumerate() + { let mut retry_count = 0; let mut delay = INITIAL_RETRY_DELAY; @@ -392,16 +409,15 @@ async fn insert_batch( .await .context("Failed to get client from ClickHouse pool")?; - let insert_data: Result> = futures::future::try_join_all( - batch.iter().map(|(record_id, statement)| async move { + let insert_data: Result> = + futures::future::try_join_all(batch.iter().map(|(record_id, statement)| async move { let processed_statement = process_statement(statement).await?; Ok(format!( "('{}', '{}', now())", record_id, processed_statement )) - }), - ) - .await; + })) + .await; let insert_data = insert_data.context("Failed to process statements")?; @@ -476,7 +492,7 @@ async fn log_failed_batch( .await .context("Failed to get client from PostgreSQL pool")?; - let statement = client + let statement = client .prepare( "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) VALUES ($1, $2, $3, $4)", @@ -484,209 +500,217 @@ async fn log_failed_batch( .await .context("Failed to prepare PostgreSQL statement")?; - client - .execute( - &statement, - &[ - &tenant_name, - &clickhouse_db, - &clickhouse_table, - &failed_batch_json, - ], - ) - .await - .context("Failed to execute PostgreSQL statement")?; + client + .execute( + &statement, + &[ + &tenant_name, + &clickhouse_db, + &clickhouse_table, + &failed_batch_json, + ], + ) + .await + .context("Failed to execute PostgreSQL statement")?; - Ok(()) - } + Ok(()) +} - async fn retry_failed_batches(app_state: Arc) -> Result<()> { - let pg_pool = &app_state.pg_pool; +async fn retry_failed_batches(app_state: Arc) -> Result<()> { + let pg_pool = &app_state.pg_pool; - loop { - let mut client = pg_pool.get().await.context("Failed to get PostgreSQL client")?; - let statement = client - .prepare( - "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch + loop { + let mut client = pg_pool + .get() + .await + .context("Failed to get PostgreSQL client")?; + let statement = client + .prepare( + "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch FROM failed_batches ORDER BY created_at LIMIT 100", + ) + .await + .context("Failed to prepare PostgreSQL statement")?; + + let rows = client + .query(&statement, &[]) + .await + .context("Failed to execute PostgreSQL query")?; + + for row in rows { + let failed_batch_id: i32 = row.get(0); + let tenant_name: String = row.get(1); + let clickhouse_db: String = row.get(2); + let clickhouse_table: String = row.get(3); + let failed_batch: String = row.get(4); + + let tenant_config = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .cloned(); + + if let Some(tenant_config) = tenant_config { + let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); + let bulk_insert_values: Vec<(String, String)> = serde_json::from_str(&failed_batch) + .context("Failed to deserialize failed batch")?; + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &bulk_insert_values, + &clickhouse_db, + &clickhouse_table, + pg_pool, + &tenant_name, + &mut batch_manager, ) .await - .context("Failed to prepare PostgreSQL statement")?; - - let rows = client.query(&statement, &[]).await.context("Failed to execute PostgreSQL query")?; - - for row in rows { - let failed_batch_id: i32 = row.get(0); - let tenant_name: String = row.get(1); - let clickhouse_db: String = row.get(2); - let clickhouse_table: String = row.get(3); - let failed_batch: String = row.get(4); - - let tenant_config = app_state - .config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .cloned(); - - if let Some(tenant_config) = tenant_config { - let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); - let bulk_insert_values: Vec<(String, String)> = - serde_json::from_str(&failed_batch).context("Failed to deserialize failed batch")?; - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); - if let Err(e) = insert_into_clickhouse( - &ch_pool, - &bulk_insert_values, - &clickhouse_db, - &clickhouse_table, - pg_pool, - &tenant_name, - &mut batch_manager, - ) - .await - { - error!("Error retrying failed batch: {}", e); - } else { - let delete_statement = client - .prepare("DELETE FROM failed_batches WHERE id = $1") - .await - .context("Failed to prepare delete statement")?; - client - .execute(&delete_statement, &[&failed_batch_id]) - .await - .context("Failed to delete processed failed batch")?; - } + { + error!("Error retrying failed batch: {}", e); + } else { + let delete_statement = client + .prepare("DELETE FROM failed_batches WHERE id = $1") + .await + .context("Failed to prepare delete statement")?; + client + .execute(&delete_statement, &[&failed_batch_id]) + .await + .context("Failed to delete processed failed batch")?; } } - - tokio::time::sleep(std::time::Duration::from_secs(60)).await; } - } - fn validate_date_time(date_time_str: &str) -> Result { - NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M") - .map_err(|e| anyhow!("Invalid date and time format: {}", e)) + tokio::time::sleep(std::time::Duration::from_secs(60)).await; } +} - #[tokio::main] - async fn main() -> Result<()> { - env_logger::init(); - info!(target: "main", "Starting up"); - - let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); - let config_path = match env.as_str() { - "dev" => "config-dev.yml", - "prod" => "config-prod.yml", - _ => { - error!("Unsupported environment: {}", env); - return Err(anyhow!("Unsupported environment")); - } - }; - let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; +fn validate_date_time(date_time_str: &str) -> Result { + NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M") + .map_err(|e| anyhow!("Invalid date and time format: {}", e)) +} - let tenant_name = env::args() - .nth(1) - .ok_or_else(|| anyhow!("Missing tenant name argument"))?; +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + info!(target: "main", "Starting up"); + + let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); + let config_path = match env.as_str() { + "dev" => "config-dev.yml", + "prod" => "config-prod.yml", + _ => { + error!("Unsupported environment: {}", env); + return Err(anyhow!("Unsupported environment")); + } + }; + let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; - let start_date = env::args() - .nth(2) - .ok_or_else(|| anyhow!("Missing start date argument"))?; + let tenant_name = env::args() + .nth(1) + .ok_or_else(|| anyhow!("Missing tenant name argument"))?; - let end_date = env::args() - .nth(3) - .ok_or_else(|| anyhow!("Missing end date argument"))?; + let start_date = env::args() + .nth(2) + .ok_or_else(|| anyhow!("Missing start date argument"))?; - let start_date = validate_date_time(&start_date)?; - let end_date = validate_date_time(&end_date)?; + let end_date = env::args() + .nth(3) + .ok_or_else(|| anyhow!("Missing end date argument"))?; - if end_date < start_date { - return Err(anyhow!("End date must be greater than or equal to start date")); - } + let start_date = validate_date_time(&start_date)?; + let end_date = validate_date_time(&end_date)?; - let tenant = config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? - .clone(); - - let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); - let pg_manager = PostgresConnectionManager::new_from_stringlike( - &config.pg_database_url, - tokio_postgres::NoTls, - )?; - let pg_pool = Pool::builder().build(pg_manager).await?; - - let app_state = Arc::new(AppState { - config: config.clone(), - clickhouse_pools: vec![clickhouse_pool], - pg_pool, - }); - - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); - let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); - - let app_state_clone = app_state.clone(); - let run_handle = tokio::spawn(async move { - match run(app_state_clone, tenant_name, start_date, end_date).await { - Ok(success) => { - if success { - info!("Program ran successfully"); - } else { - error!("Program encountered an error"); - } - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { - error!("Failed to send shutdown signal"); - } - } + if end_date < start_date { + return Err(anyhow!( + "End date must be greater than or equal to start date" + )); + } + + let tenant = config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? + .clone(); + + let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); + let pg_manager = PostgresConnectionManager::new_from_stringlike( + &config.pg_database_url, + tokio_postgres::NoTls, + )?; + let pg_pool = Pool::builder().build(pg_manager).await?; + + let app_state = Arc::new(AppState { + config: config.clone(), + clickhouse_pools: vec![clickhouse_pool], + pg_pool, + }); + + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); + let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); + + let app_state_clone = app_state.clone(); + let run_handle = tokio::spawn(async move { + match run(app_state_clone, tenant_name, start_date, end_date).await { + Ok(success) => { + if success { + info!("Program ran successfully"); + } else { + error!("Program encountered an error"); } - Err(e) => { - error!("Error running the program: {}", e); - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { - error!("Failed to send shutdown signal"); - } + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); } } } - }); - - let retry_handle = tokio::spawn(retry_failed_batches(app_state)); - - tokio::select! { - _ = signal::ctrl_c() => { - info!("Received shutdown signal, shutting down gracefully..."); - if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { + Err(e) => { + error!("Error running the program: {}", e); + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { if let Err(_) = shutdown_tx.send(()) { error!("Failed to send shutdown signal"); } } } - _ = shutdown_rx => { - info!("Program finished, shutting down gracefully..."); - } - run_result = run_handle => { - match run_result { - Ok(_) => info!("Run task completed"), - Err(e) => error!("Run task failed: {}", e), + } + }); + + let retry_handle = tokio::spawn(retry_failed_batches(app_state)); + + tokio::select! { + _ = signal::ctrl_c() => { + info!("Received shutdown signal, shutting down gracefully..."); + if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); } } - retry_result = retry_handle => { - match retry_result { - Ok(inner_result) => { - match inner_result { - Ok(_) => info!("Retry task completed successfully"), - Err(e) => error!("Retry task failed: {}", e), - } + } + _ = shutdown_rx => { + info!("Program finished, shutting down gracefully..."); + } + run_result = run_handle => { + match run_result { + Ok(_) => info!("Run task completed"), + Err(e) => error!("Run task failed: {}", e), + } + } + retry_result = retry_handle => { + match retry_result { + Ok(inner_result) => { + match inner_result { + Ok(_) => info!("Retry task completed successfully"), + Err(e) => error!("Retry task failed: {}", e), } - Err(e) => error!("Retry task panicked: {}", e), } + Err(e) => error!("Retry task panicked: {}", e), } } - - Ok(()) } + + Ok(()) +}