From ca246c585c917b24cb58c5db9bbd4cbcd5c8ab51 Mon Sep 17 00:00:00 2001 From: Isanka Date: Tue, 6 Aug 2024 02:39:10 +0900 Subject: [PATCH] working --- historical_data/src/main.rs | 5 +- src/main.rs | 688 +++++++++++++++++++++++------------- 2 files changed, 453 insertions(+), 240 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 76a7105..a178695 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -1,3 +1,4 @@ +// historical_data/src/main.rs use anyhow::{anyhow, Context, Result}; use clickhouse_rs::Pool as ClickhousePool; use futures::stream::StreamExt; @@ -376,7 +377,7 @@ fn anonymize_statement( // If we've made it this far without errors, update the original statement *statement = statement_copy; - println!("Anonymized statement: {:?}", statement); + // println!("Anonymized statement: {:?}", statement); Ok(()) } @@ -414,7 +415,7 @@ async fn insert_into_clickhouse( clickhouse_table: &str, pg_pool: &PgPool, tenant_name: &str, - batch_manager: &mut BatchSizeManager, // Added BatchSizeManager as mutable reference + batch_manager: &mut BatchSizeManager, ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); diff --git a/src/main.rs b/src/main.rs index fc0fcf1..39a69eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ -use anyhow::Result; +// src/main.rs - Live data processing +use anyhow::{anyhow, Context, Result}; use bb8::Pool; use bb8_postgres::PostgresConnectionManager; use clickhouse_rs::Pool as ClickhousePool; @@ -6,43 +7,28 @@ use config::{Config, File}; use futures::{future::join_all, stream::StreamExt}; use log::{error, info, warn}; use mongodb::{ - bson::{self, Bson, Document}, + bson::{self, doc, Document}, change_stream::event::{ChangeStreamEvent, ResumeToken}, options::ChangeStreamOptions, Client as MongoClient, }; use regex::Regex; use serde::Deserialize; -use serde_json::{to_string, Value}; +use serde_json::to_string; use sha2::{Digest, Sha256}; -use std::{env, error::Error, sync::Arc, time::Duration}; -use tokio::task; -use tokio_postgres::{Client as PostgresClient, NoTls}; +use std::{env, sync::Arc, time::Duration}; +use tokio::{signal, sync::oneshot, task}; lazy_static::lazy_static! { - static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_1: {}", e); - panic!("Invalid regular expression pattern"); - } - }; - static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_2: {}", e); - panic!("Invalid regular expression pattern"); - } - }; - static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_3: {}", e); - panic!("Invalid regular expression pattern"); - } - }; + static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1"); + static ref BACKSLASH_REGEX_2: Regex = Regex::new(r"\\(?:\\\\)*").expect("Failed to compile BACKSLASH_REGEX_2"); + static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3"); } +const MAX_BATCH_SIZE: usize = 10000; +const MAX_RETRIES: u32 = 5; +const INITIAL_RETRY_DELAY: u64 = 1000; + #[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, @@ -57,36 +43,68 @@ struct TenantConfig { #[derive(Deserialize, Debug)] struct AppConfig { tenants: Vec, - postgres_db: String, pg_database_url: String, encryption_salt: String, + batch_size: usize, + number_of_workers: usize, } -type PostgresPool = Pool>; +type PostgresPool = Pool>; type ClickhousePoolType = ClickhousePool; -#[derive(Debug)] struct AppState { config: AppConfig, postgres_pool: PostgresPool, clickhouse_pools: Vec, } -async fn run(app_state: Arc) -> Result<(), Box> { +struct BatchSizeManager { + current_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, +} + +impl BatchSizeManager { + fn new( + initial_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, + ) -> Self { + BatchSizeManager { + current_size: initial_size, + min_size, + max_size, + performance_threshold, + } + } + + fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: Duration) { + let performance = docs_processed as f64 / time_taken.as_secs_f64(); + + if performance > self.performance_threshold { + self.current_size = (self.current_size * 2).min(self.max_size); + } else { + self.current_size = (self.current_size / 2).max(self.min_size); + } + } + + fn get_current_size(&self) -> usize { + self.current_size + } +} + +async fn run(app_state: Arc) -> Result<()> { let tenants = app_state.config.tenants.clone(); let mut tasks = Vec::new(); for (index, tenant) in tenants.iter().enumerate() { let tenant_config = tenant.clone(); - let tenant_config_cloned = tenant_config.clone(); let app_state = app_state.clone(); let task = task::spawn(async move { - if let Err(e) = process_tenant_records(tenant_config, app_state, index).await { - log::error!( - "Error processing tenant {}: {}", - tenant_config_cloned.name, - e - ); + if let Err(e) = process_tenant_records(tenant_config.clone(), app_state, index).await { + error!("Error processing tenant {}: {}", tenant_config.name, e); } }); tasks.push(task); @@ -96,18 +114,23 @@ async fn run(app_state: Arc) -> Result<(), Box> { Ok(()) } -async fn connect_to_mongo(mongo_uri: &str) -> Result { +async fn connect_to_mongo(mongo_uri: &str) -> Result { let mut retry_delay = Duration::from_secs(1); + let mut attempts = 0; loop { match MongoClient::with_uri_str(mongo_uri).await { Ok(client) => return Ok(client), Err(e) => { - error!("Failed to connect to MongoDB: {}", e); + attempts += 1; + if attempts > MAX_RETRIES { + return Err(e.into()); + } + error!( + "Failed to connect to MongoDB: {}. Retrying in {:?}", + e, retry_delay + ); tokio::time::sleep(retry_delay).await; retry_delay *= 2; - if retry_delay > Duration::from_secs(60) { - return Err(e); - } } } } @@ -119,19 +142,14 @@ async fn process_tenant_records( pool_index: usize, ) -> Result<()> { let mongo_client = connect_to_mongo(&tenant_config.mongo_uri).await?; - //let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; - // println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri); - // println!("<<-- mongo_client {:?}", mongo_client); let mongo_db = mongo_client.database(&tenant_config.mongo_db); - // println!("<<-- mongo_db: {:?}", mongo_db); let mongo_collection: mongodb::Collection = mongo_db.collection(&tenant_config.mongo_collection); let pg_pool = &app_state.postgres_pool; let ch_pool = &app_state.clickhouse_pools[pool_index]; - // println!("pg_pool {:?}", pg_pool); + let pg_conn = pg_pool.get().await?; - // println!("pg_conn {:?}", pg_conn); let row = pg_conn .query_one( "SELECT token FROM resume_token WHERE tenant_name = $1 ORDER BY id DESC LIMIT 1", @@ -139,17 +157,19 @@ async fn process_tenant_records( ) .await .ok(); - // println!("row {:?}", row); - let mut options = mongodb::options::ChangeStreamOptions::default(); + + let mut options = ChangeStreamOptions::default(); if let Some(row) = row { let token_bytes: Vec = row.get("token"); if let Ok(resume_token) = bson::from_slice::(&token_bytes) { options.resume_after = Some(resume_token); } } - // println!("app_state {:?}", &app_state.config); - let change_stream_options = ChangeStreamOptions::default(); - let mut change_stream = mongo_collection.watch(None, change_stream_options).await?; + + let mut change_stream = mongo_collection.watch(None, options).await?; + let mut batch = Vec::with_capacity(app_state.config.batch_size); + let mut batch_manager = + BatchSizeManager::new(app_state.config.batch_size, 1000, MAX_BATCH_SIZE, 5000.0); while let Some(result) = change_stream.next().await { match result { @@ -165,79 +185,15 @@ async fn process_tenant_records( match (record_id, statement) { (Some(record_id), Some(statement)) => { let record_id_str = record_id.to_hex(); - info!("Record ID: {}", record_id_str); - let mut statement = statement.to_owned(); - if let Some(actor) = - statement.get_mut("actor").and_then(|a| a.as_document_mut()) - { - if let Some(account) = actor - .get_mut("account") - .and_then(|acc| acc.as_document_mut()) - { - if let Some(name) = account.get_mut("name") { - if let bson::Bson::String(name_str) = name { - let anonymized_name = if name_str.contains(':') { - let parts: Vec<&str> = - name_str.split(':').collect(); - info!( - "{}", - &bson::Bson::String(parts[1].to_string()) - ); - if parts.len() == 2 { - anonymize_data( - &bson::Bson::String(parts[1].to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } else { - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } - } else if name_str.contains('@') { - let parts: Vec<&str> = - name_str.split('@').collect(); - info!( - "{}", - &bson::Bson::String(parts[0].to_string()) - ); - if parts.len() == 2 { - anonymize_data( - &bson::Bson::String(parts[0].to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } else { - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } - } else { - info!( - "{}", - &bson::Bson::String(name_str.to_string()) - ); - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - }; - *name = bson::Bson::String(anonymized_name); - } else { - warn!("Missing 'name' field in 'actor.account'"); - } - } - } else { - warn!("Missing 'account' field in 'actor'"); - } - } else { - warn!("Missing 'actor' field in 'statement'"); + + if let Err(e) = anonymize_statement( + &mut statement, + &app_state.config.encryption_salt, + &tenant_config.name, + ) { + warn!("Failed to anonymize statement: {}", e); + continue; } let statement_str = match to_string(&statement) { @@ -248,14 +204,35 @@ async fn process_tenant_records( } }; - insert_into_clickhouse( - &ch_pool, - &statement_str, - &record_id_str, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - ) - .await; + batch.push((record_id_str, statement_str)); + + if batch.len() >= batch_manager.get_current_size() { + let batch_start_time = std::time::Instant::now(); + + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + pg_pool, + &tenant_config.name, + &mut batch_manager, + ) + .await + { + error!("Failed to insert batch into ClickHouse: {}", e); + } else { + let batch_duration = batch_start_time.elapsed(); + batch_manager.adjust_batch_size(batch.len(), batch_duration); + + info!( + "Processed {} documents. Current batch size: {}", + batch.len(), + batch_manager.get_current_size() + ); + } + batch.clear(); + } } (None, Some(_)) => { warn!("Missing '_id' field in the document"); @@ -269,35 +246,16 @@ async fn process_tenant_records( } if let Some(resume_token) = change_stream.resume_token() { - let token_bytes = match bson::to_vec(&resume_token) { - Ok(bytes) => bytes, - Err(e) => { - error!("Failed to serialize resume token: {}", e); - continue; - } - }; + let token_bytes = bson::to_vec(&resume_token)?; let tenant_name = tenant_config.name.clone(); let pg_pool = app_state.postgres_pool.clone(); - match pg_pool.get().await { - Ok(pg_conn) => { - if let Err(e) = pg_conn - .execute( - "INSERT INTO resume_token (token, tenant_name) VALUES ($1, $2) ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token", - &[&token_bytes, &tenant_name], - ) - .await - { - error!("Failed to update resume token in PostgreSQL: {}", e); - } - } - Err(e) => { - error!("Failed to get PostgreSQL connection: {}", e); - } - }; + if let Err(e) = + update_resume_token(&pg_pool, &token_bytes, &tenant_name).await + { + error!("Failed to update resume token in PostgreSQL: {}", e); + } } - } else { - warn!("Missing 'full_document' field in the change stream event"); } } Err(e) => { @@ -306,34 +264,100 @@ async fn process_tenant_records( } } + if !batch.is_empty() { + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + pg_pool, + &tenant_config.name, + &mut batch_manager, + ) + .await + { + error!("Failed to insert final batch into ClickHouse: {}", e); + } + } + Ok(()) +} + +async fn update_resume_token( + pg_pool: &PostgresPool, + token_bytes: &[u8], + tenant_name: &str, +) -> Result<()> { + let pg_conn = pg_pool.get().await?; + pg_conn + .execute( + "INSERT INTO resume_token (token, tenant_name) VALUES ($1, $2) ON CONFLICT (tenant_name) DO UPDATE SET token = EXCLUDED.token", + &[&token_bytes, &tenant_name], + ) + .await?; Ok(()) } -fn anonymize_data(data: &Bson, encryption_salt: &str, tenant_name: &str) -> String { - let mut hasher = Sha256::new(); - // let salt = &settings.encryption_salt; - let salt = format!("{}{}", encryption_salt, tenant_name); - hasher.update(salt.as_bytes()); - - let data_str = match data { - Bson::String(s) => s.as_str(), - Bson::Int32(i) => return i.to_string(), - Bson::Int64(i) => return i.to_string(), - _ => return String::new(), +fn anonymize_statement( + statement: &mut Document, + encryption_salt: &str, + tenant_name: &str, +) -> Result<()> { + // Create a deep copy of the statement + let mut statement_copy = statement.clone(); + + // Check if all required fields exist + if !statement_copy.contains_key("actor") + || !statement_copy + .get_document("actor")? + .contains_key("account") + || !statement_copy + .get_document("actor")? + .get_document("account")? + .contains_key("name") + { + return Err(anyhow!("Statement is missing required fields")); + } + + let name = statement_copy + .get_document("actor")? + .get_document("account")? + .get_str("name")?; + + let value_to_hash = if name.contains('@') { + name.split('@').next().unwrap_or("") + } else if name.contains(':') { + name.split(':').last().unwrap_or("") + } else { + name }; - hasher.update(data_str.as_bytes()); + if value_to_hash.is_empty() { + return Err(anyhow!("Empty value to hash for name: {}", name)); + } + + let mut hasher = Sha256::new(); + hasher.update(encryption_salt.as_bytes()); + hasher.update(tenant_name.as_bytes()); + hasher.update(value_to_hash.as_bytes()); let result = hasher.finalize(); - hex::encode(result) + let hashed_value = hex::encode(result); + + // Update the copy + statement_copy + .get_document_mut("actor")? + .get_document_mut("account")? + .insert("name", hashed_value); + + // If we've made it this far without errors, update the original statement + *statement = statement_copy; + Ok(()) } -async fn process_statement(statement: &str) -> Result> { - // Replace all double consecutive backslashes with four backslashes +async fn process_statement(statement: &str) -> Result { let output1 = BACKSLASH_REGEX_1 .replace_all(statement, "\\\\\\\\") .to_string(); - // Replace all single backslashes with two backslashes let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { if caps[0].len() % 2 == 1 { "\\\\".to_string() @@ -342,7 +366,6 @@ async fn process_statement(statement: &str) -> Result Result Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - // let escaped_statement_str = statement_str.replace("'", "\\\'"); - let escaped_statement_str = match process_statement(statement_str).await { - Ok(escaped_str) => escaped_str, - Err(e) => { - error!("Failed to process statement: {}", e); - return; + for (chunk_index, chunk) in bulk_insert_values + .chunks(batch_manager.get_current_size()) + .enumerate() + { + let mut retry_count = 0; + let mut delay = INITIAL_RETRY_DELAY; + + loop { + match insert_batch(ch_pool, chunk, &full_table_name).await { + Ok(_) => { + info!( + "Successfully inserted batch {} of {} records", + chunk_index + 1, + chunk.len() + ); + break; + } + Err(e) => { + error!("Failed to insert batch {}: {}", chunk_index + 1, e); + retry_count += 1; + if retry_count >= MAX_RETRIES { + error!( + "Max retries reached for batch {}. Logging failed batch.", + chunk_index + 1 + ); + log_failed_batch( + pg_pool, + tenant_name, + clickhouse_db, + clickhouse_table, + chunk, + ) + .await + .context("Failed to log failed batch")?; + return Err(anyhow!( + "Max retries exceeded for batch {}", + chunk_index + 1 + )); + } else { + warn!("Retrying batch {} in {} ms...", chunk_index + 1, delay); + tokio::time::sleep(Duration::from_millis(delay)).await; + delay = delay.saturating_mul(2); // Exponential backoff with overflow protection + } + } + } } - }; + + let old_size = batch_manager.get_current_size(); + batch_manager.adjust_batch_size(chunk.len(), Duration::from_secs(1)); // Assume 1 second per batch, adjust as needed + let new_size = batch_manager.get_current_size(); + if old_size != new_size { + info!("Batch size adjusted from {} to {}", old_size, new_size); + } + } + + Ok(()) +} + +async fn insert_batch( + ch_pool: &ClickhousePool, + batch: &[(String, String)], + full_table_name: &str, +) -> Result<()> { + let mut client = ch_pool + .get_handle() + .await + .context("Failed to get client from ClickHouse pool")?; + + let insert_data: Result> = + futures::future::try_join_all(batch.iter().map(|(record_id, statement)| async move { + let processed_statement = process_statement(statement).await?; + Ok(format!( + "('{}', '{}', now())", + record_id, processed_statement + )) + })) + .await; + + let insert_data = insert_data.context("Failed to process statements")?; let insert_query = format!( - "INSERT INTO {} (id, statement) VALUES ('{}', '{}')", - full_table_name, record_id_str, escaped_statement_str + "INSERT INTO {} (id, statement, created_at) VALUES {}", + full_table_name, + insert_data.join(" , ") ); - let mut client = match ch_pool.get_handle().await { - Ok(client) => client, - Err(e) => { - error!("Failed to get client from ClickHouse pool: {}", e); - return; - } - }; + client + .execute(insert_query.as_str()) + .await + .context("Failed to execute insert query")?; - let max_retries = 3; - let mut retry_count = 0; + Ok(()) +} - while retry_count < max_retries { - match client.execute(insert_query.as_str()).await { - Ok(_) => { - info!("Successfully inserted statement into ClickHouse"); - return; - } - Err(e) => { - error!("Failed to insert statement into ClickHouse: {}", e); - retry_count += 1; +async fn log_failed_batch( + pg_pool: &PostgresPool, + tenant_name: &str, + clickhouse_db: &str, + clickhouse_table: &str, + failed_batch: &[(String, String)], +) -> Result<()> { + let failed_batch_json = + serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; + + let mut client = pg_pool + .get() + .await + .context("Failed to get client from PostgreSQL pool")?; - if retry_count == max_retries { - error!("Max retries reached. Giving up."); - return; + let statement = client + .prepare( + "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) + VALUES ($1, $2, $3, $4)", + ) + .await + .context("Failed to prepare PostgreSQL statement")?; + + client + .execute( + &statement, + &[ + &tenant_name, + &clickhouse_db, + &clickhouse_table, + &failed_batch_json, + ], + ) + .await + .context("Failed to execute PostgreSQL statement")?; + + Ok(()) +} + +async fn retry_failed_batches(app_state: Arc) -> Result<()> { + let pg_pool = &app_state.postgres_pool; + + loop { + let mut client = pg_pool + .get() + .await + .context("Failed to get PostgreSQL client")?; + let statement = client + .prepare( + "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch + FROM failed_batches + ORDER BY created_at + LIMIT 100", + ) + .await + .context("Failed to prepare PostgreSQL statement")?; + + let rows = client + .query(&statement, &[]) + .await + .context("Failed to execute PostgreSQL query")?; + if rows.is_empty() { + tokio::time::sleep(Duration::from_secs(60)).await; + continue; + } + for row in rows { + let failed_batch_id: i32 = row.get(0); + let tenant_name: String = row.get(1); + let clickhouse_db: String = row.get(2); + let clickhouse_table: String = row.get(3); + let failed_batch: String = row.get(4); + + let tenant_config = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .cloned(); + + if let Some(tenant_config) = tenant_config { + let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); + let bulk_insert_values: Vec<(String, String)> = serde_json::from_str(&failed_batch) + .context("Failed to deserialize failed batch")?; + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &bulk_insert_values, + &clickhouse_db, + &clickhouse_table, + pg_pool, + &tenant_name, + &mut batch_manager, + ) + .await + { + error!("Error retrying failed batch: {}", e); } else { - let delay_ms = 1000 * retry_count; - info!("Retrying in {} ms...", delay_ms); - tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + let delete_statement = client + .prepare("DELETE FROM failed_batches WHERE id = $1") + .await + .context("Failed to prepare delete statement")?; + client + .execute(&delete_statement, &[&failed_batch_id]) + .await + .context("Failed to delete processed failed batch")?; } } } + + tokio::time::sleep(Duration::from_secs(60)).await; } } #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<()> { env_logger::init(); - info!(target: "main", "Starting up"); let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); @@ -425,32 +608,29 @@ async fn main() -> Result<(), Box> { "dev" => "config-dev.yml", "prod" => "config-prod.yml", _ => { - log::error!("Unsupported environment: {}", env); - return Err("Unsupported environment".into()); - } - }; - - let config_file = File::with_name(config_path); - let config: AppConfig = match Config::builder().add_source(config_file).build() { - Ok(config_builder) => match config_builder.try_deserialize() { - Ok(config) => config, - Err(err) => { - log::error!("Failed to deserialize config: {}", err); - return Err(err.into()); - } - }, - Err(err) => { - log::error!("Failed to build config: {}", err); - return Err(err.into()); + error!("Unsupported environment: {}", env); + return Err(anyhow!("Unsupported environment")); } }; - let postgres_manager = PostgresConnectionManager::new(config.pg_database_url.parse()?, NoTls); + let config: AppConfig = Config::builder() + .add_source(File::with_name(config_path)) + .build()? + .try_deserialize() + .context("Failed to deserialize config")?; + + let postgres_manager = PostgresConnectionManager::new( + config + .pg_database_url + .parse() + .context("Invalid PostgreSQL URL")?, + tokio_postgres::NoTls, + ); let postgres_pool = Pool::builder().build(postgres_manager).await?; let mut clickhouse_pools = Vec::new(); for tenant in &config.tenants { - let clickhouse_pool = ClickhousePool::new(&*tenant.clickhouse_uri); + let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri.as_str()); clickhouse_pools.push(clickhouse_pool); } @@ -459,11 +639,43 @@ async fn main() -> Result<(), Box> { postgres_pool, clickhouse_pools, }); - // println!("app_state_main {:?}", app_state); - let _ = run(app_state).await; - tokio::signal::ctrl_c() - .await - .expect("Failed to listen for ctrl-c signal"); + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let shutdown_tx = Arc::new(tokio::sync::Mutex::new(Some(shutdown_tx))); + + let app_state_clone = app_state.clone(); + let run_handle = tokio::spawn(async move { + if let Err(e) = run(app_state_clone).await { + error!("Error in main run loop: {}", e); + } + }); + + let app_state_clone = app_state.clone(); + let retry_handle = tokio::spawn(async move { + if let Err(e) = retry_failed_batches(app_state_clone).await { + error!("Error in retry failed batches loop: {}", e); + } + }); + + tokio::select! { + _ = signal::ctrl_c() => { + info!("Received shutdown signal, shutting down gracefully..."); + if let Some(tx) = shutdown_tx.lock().await.take() { + let _ = tx.send(()); + } + } + _ = shutdown_rx => { + info!("Shutdown signal received from within the application..."); + } + _ = run_handle => { + info!("Main run loop has completed."); + } + _ = retry_handle => { + info!("Retry failed batches loop has completed."); + } + } + + info!("Shutting down..."); + info!("Application shutdown complete."); Ok(()) }