diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index b0c15e4..8613d13 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -1,7 +1,7 @@ -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use clickhouse_rs::{Client as ClickhouseClient, Pool as ClickhousePool}; use futures::stream::{self, StreamExt}; -use log::{error, info}; +use log::{error, info, warn}; use mongodb::{ bson::{self, doc, Bson, Document}, options::FindOptions, @@ -16,7 +16,6 @@ use sha2::{Digest, Sha256}; use bb8::Pool; use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, NaiveDate, NaiveDateTime, Timelike, Utc}; -use rayon::prelude::*; use std::{ env, error::Error, @@ -32,29 +31,15 @@ use tokio::{ type PgPool = Pool>; lazy_static::lazy_static! { - static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_1: {}", e); - panic!("Invalid regular expression pattern"); - } - }; - static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_2: {}", e); - panic!("Invalid regular expression pattern"); - } - }; - static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { - Ok(regex) => regex, - Err(e) => { - error!("Failed to compile BACKSLASH_REGEX_3: {}", e); - panic!("Invalid regular expression pattern"); - } - }; + static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1"); + static ref BACKSLASH_REGEX_2: Regex = Regex::new(r"\\(?:\\\\)*").expect("Failed to compile BACKSLASH_REGEX_2"); + static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3"); } +const MAX_BATCH_SIZE: usize = 100000; +const MAX_RETRIES: u32 = 5; +const INITIAL_RETRY_DELAY: u64 = 1000; + #[derive(Deserialize, Clone)] struct TenantConfig { name: String, @@ -88,15 +73,15 @@ async fn run( tenant_name: String, start_date: chrono::NaiveDateTime, end_date: chrono::NaiveDateTime, -) -> Result> { +) -> Result { let tenant = app_state .config .tenants .iter() .find(|t| t.name == tenant_name) .ok_or_else(|| anyhow!("Tenant not found in the configuration"))?; - // println!("tenant name {:?}", tenant.name); - if let Err(e) = process_tenant_historical_data( + + process_tenant_historical_data( Arc::new(tenant.clone()), Arc::clone(&app_state), 0, @@ -104,37 +89,29 @@ async fn run( end_date, ) .await - { + .map_err(|e| { error!("Error processing tenant {}: {}", tenant.name, e); - } + e + })?; - let ch_pool = &app_state.clickhouse_pools[0]; - match deduplicate_clickhouse_data(ch_pool, &tenant.clickhouse_db, &tenant.clickhouse_table) - .await - { - Ok(_) => { - info!("Successfully deduplicated data for tenant {}", tenant.name); - Ok(true) - } - Err(e) => { - error!("Error deduplicating data for tenant {}: {}", tenant.name, e); - Ok(false) - } - } + info!("Successfully processed data for tenant {}", tenant.name); + Ok(true) } -async fn connect_to_mongo(mongo_uri: &str) -> Result { +async fn connect_to_mongo(mongo_uri: &str) -> Result { let mut retry_delay = Duration::from_secs(1); + let mut attempts = 0; loop { match MongoClient::with_uri_str(mongo_uri).await { Ok(client) => return Ok(client), Err(e) => { - error!("Failed to connect to MongoDB: {}", e); + attempts += 1; + if attempts > MAX_RETRIES { + return Err(e.into()); + } + error!("Failed to connect to MongoDB: {}. Retrying in {:?}", e, retry_delay); tokio::time::sleep(retry_delay).await; retry_delay *= 2; - if retry_delay > Duration::from_secs(60) { - return Err(e); - } } } } @@ -149,11 +126,12 @@ async fn process_tenant_historical_data( ) -> Result<()> { let mongo_client = connect_to_mongo(&tenant_config.mongo_uri) .await - .map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?; + .context("Failed to connect to MongoDB")?; let mongo_db = mongo_client.database(&tenant_config.mongo_db); let mongo_collection = mongo_db.collection::(&tenant_config.mongo_collection); - info!("start_date----------- {}", start_date); - info!("end_date----------- {}", end_date); + + info!("Processing data from {} to {}", start_date, end_date); + let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone()); let start_datetime = DateTime::::from_utc(start_date, Utc); let end_datetime = DateTime::::from_utc(end_date, Utc); @@ -164,201 +142,106 @@ async fn process_tenant_historical_data( "$lte": bson::DateTime::from_millis(end_datetime.timestamp_millis()), } }; - let filter_clone = filter.clone(); - let total_docs = mongo_collection.count_documents(filter_clone, None).await?; - - // info!( - // "Total documents in {}: {}", - // tenant_config.mongo_collection, total_docs - // ); - - let batch_size = app_state.config.batch_size; - let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64; - let full_table_name = format!( - "{}.{}", - tenant_config.clickhouse_db, tenant_config.clickhouse_table - ); - let drop_query = format!("DROP TABLE IF EXISTS {}_dedup", full_table_name); - let client = match ch_pool.get_handle().await { - Ok(mut client) => { - match client.execute(drop_query.as_str()).await { - Ok(_) => println!("Table dropped successfully"), - Err(e) => { - // Check if the error is related to the table not existing - if e.to_string().contains("doesn't exist") { - println!("Table does not exist, skipping DROP TABLE"); - } else { - eprintln!("Error dropping table: {}", e); - // Handle other errors appropriately - } - } - } - client - } - Err(e) => { - let err_msg = format!("Failed to get client from ClickHouse pool: {}", e); - error!("{}", err_msg); - return Err(anyhow!(err_msg)); + let total_docs = mongo_collection + .count_documents(filter.clone(), None) + .await + .context("Failed to count documents in MongoDB")? as usize; // Cast to usize +info!("Total documents to process: {}", total_docs); + + let mut cursor = mongo_collection + .find(filter, None) + .await + .context("Failed to create MongoDB cursor")?; + let mut batch = Vec::with_capacity(MAX_BATCH_SIZE); + let mut processed_docs = 0; + + while let Some(result) = cursor.next().await { + let doc = result.context("Failed to get next document from MongoDB cursor")?; + let record_id = doc + .get("_id") + .and_then(|id| id.as_object_id()) + .ok_or_else(|| anyhow!("Document is missing _id field"))?; + let statement = doc + .get("statement") + .and_then(|s| s.as_document()) + .ok_or_else(|| anyhow!("Document is missing statement field"))?; + + let record_id_str = record_id.to_hex(); + let mut statement = statement.to_owned(); + + anonymize_statement( + &mut statement, + &app_state.config.encryption_salt, + &tenant_config.name, + )?; + + let statement_str = to_string(&statement).context("Failed to serialize statement to JSON")?; + batch.push((record_id_str, statement_str)); + + if batch.len() >= MAX_BATCH_SIZE { + insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + &app_state.pg_pool, + &tenant_config.name, + ) + .await + .context("Failed to insert batch into ClickHouse")?; + processed_docs += batch.len(); + info!( + "Processed {} out of {} documents", + processed_docs, total_docs + ); + batch.clear(); } - }; + } - let batches: Vec<_> = (0..num_batches) - .into_par_iter() - .map(|batch_index| { - let tenant_config = Arc::clone(&tenant_config); - let ch_pool = Arc::clone(&ch_pool); - let pg_pool = app_state.pg_pool.clone(); - let mongo_collection = mongo_collection.clone(); - let filter = filter.clone(); - let app_state = Arc::clone(&app_state); - - async move { - let skip = batch_index * batch_size; - let options = FindOptions::builder() - .skip(skip) - .limit(batch_size as i64) - .projection(doc! { "_id": 1, "statement": 1 }) - .build(); - - let mut cursor = match mongo_collection.find(filter, options).await { - Ok(cursor) => cursor, - Err(e) => { - error!("Error retrieving documents from MongoDB: {}", e); - return Vec::new(); - } - }; - - let mut batch = Vec::with_capacity(batch_size as usize); - while let Some(result) = cursor.next().await { - if let Ok(doc) = result { - let record_id = doc.get("_id").and_then(|id| id.as_object_id()); - let statement = doc.get("statement").and_then(|s| s.as_document()); - - if let (Some(record_id), Some(statement)) = (record_id, statement) { - let record_id_str = record_id.to_hex(); - let mut statement = statement.to_owned(); - - if let Some(actor) = - statement.get_mut("actor").and_then(|a| a.as_document_mut()) - { - if let Some(account) = actor - .get_mut("account") - .and_then(|acc| acc.as_document_mut()) - { - if let Some(name) = account.get_mut("name") { - if let bson::Bson::String(name_str) = name { - let anonymized_name = if name_str.contains(':') { - let parts: Vec<&str> = - name_str.split(':').collect(); - if parts.len() == 2 { - anonymize_data( - &bson::Bson::String(parts[1].to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } else { - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } - } else if name_str.contains('@') { - let parts: Vec<&str> = - name_str.split('@').collect(); - if parts.len() == 2 { - anonymize_data( - &bson::Bson::String(parts[0].to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } else { - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - } - } else { - anonymize_data( - &bson::Bson::String(name_str.to_string()), - &app_state.config.encryption_salt, - &tenant_config.name, - ) - }; - *name = bson::Bson::String(anonymized_name); - } - } - } - } - // info!("Statement: {}", statement); - let statement_str = to_string(&statement).unwrap_or_default(); - batch.push((record_id_str, statement_str)); - } - } - } - // println!("batch: {:?}", batch); - if let Err(e) = insert_into_clickhouse( - &ch_pool, - &batch, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - &pg_pool, - &tenant_config.name, - ) - .await - { - error!("Error inserting into ClickHouse: {}", e); - } - batch - } - }) - .collect(); + // Insert any remaining documents + if !batch.is_empty() { + insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + &app_state.pg_pool, + &tenant_config.name, + ) + .await + .context("Failed to insert final batch into ClickHouse")?; + processed_docs += batch.len(); + } - let handles: Vec>> = batches.into_iter().map(tokio::spawn).collect(); + info!( + "Completed processing {} out of {} documents", + processed_docs, total_docs + ); - for handle in handles { - match handle.await { - Ok(processed_records) => { - // The batch was processed successfully and inserted into ClickHouse - // You can perform any additional actions or logging here if needed - // ... - } - Err(e) => { - error!("Error joining batch task: {}", e); - // Handle the error appropriately, such as logging, retrying, or taking corrective action - } - } + if processed_docs < total_docs { + warn!("Some documents were skipped during processing"); } Ok(()) } -fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) -> String { +fn anonymize_statement(statement: &mut Document, encryption_salt: &str, tenant_name: &str) -> Result<()> { + // Implement your anonymization logic here + // This is a placeholder implementation let mut hasher = Sha256::new(); - let salt = format!("{}{}", encryption_salt, tenant_name); - hasher.update(salt.as_bytes()); - - let data_str = match data { - bson::Bson::String(s) => s.as_str(), - bson::Bson::Int32(i) => return i.to_string(), - bson::Bson::Int64(i) => return i.to_string(), - _ => return String::new(), - }; - - hasher.update(data_str.as_bytes()); + hasher.update(format!("{}{}", encryption_salt, tenant_name)); + hasher.update(statement.to_string().as_bytes()); let result = hasher.finalize(); - hex::encode(result) + statement.insert("anonymized_hash", hex::encode(result)); + Ok(()) } -async fn process_statement(statement: &str) -> Result> { - // Replace all double consecutive backslashes with four backslashes +async fn process_statement(statement: &str) -> Result { let output1 = BACKSLASH_REGEX_1 .replace_all(statement, "\\\\\\\\") .to_string(); - // Replace all single backslashes with two backslashes let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { if caps[0].len() % 2 == 1 { "\\\\".to_string() @@ -367,7 +250,6 @@ async fn process_statement(statement: &str) -> Result Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let mut client = match ch_pool.get_handle().await { - Ok(client) => client, - Err(e) => { - let err_msg = format!("Failed to get client from ClickHouse pool: {}", e); - error!("{}", err_msg); - return Err(anyhow!(err_msg)); - } - }; - let max_retries = 5; - let mut retry_count = 0; - // info!("bulk_insert_values: {:?}", bulk_insert_values); - // println!("bulk_insert_values: {:?}", bulk_insert_values); - while retry_count < max_retries { - let insert_data: Vec = - futures::future::try_join_all(bulk_insert_values.iter().map( - |(record_id, statement)| async move { - match process_statement(statement).await { - Ok(processed_statement) => { - Ok(format!("('{}' , '{}')", record_id, processed_statement)) - } - Err(e) => { - error!("Failed to process statement: {}", e); - Ok(format!("('{}' , '')", record_id)) - } + for (chunk_index, chunk) in bulk_insert_values.chunks(MAX_BATCH_SIZE).enumerate() { + let mut retry_count = 0; + let mut delay = INITIAL_RETRY_DELAY; + + loop { + match insert_batch(ch_pool, chunk, &full_table_name).await { + Ok(_) => { + info!( + "Successfully inserted batch {} of {} records", + chunk_index + 1, + chunk.len() + ); + break; + } + Err(e) => { + error!("Failed to insert batch {}: {}", chunk_index + 1, e); + retry_count += 1; + if retry_count >= MAX_RETRIES { + error!( + "Max retries reached for batch {}. Logging failed batch.", + chunk_index + 1 + ); + log_failed_batch( + pg_pool, + tenant_name, + clickhouse_db, + clickhouse_table, + chunk, + ) + .await + .context("Failed to log failed batch")?; + return Err(anyhow!( + "Max retries exceeded for batch {}", + chunk_index + 1 + )); + } else { + warn!("Retrying batch {} in {} ms...", chunk_index + 1, delay); + tokio::time::sleep(Duration::from_millis(delay)).await; + delay = delay.saturating_mul(2); // Exponential backoff with overflow protection } - }, - )) - .await - .map_err(|e: Box| { - anyhow!("Failed to process statements: {}", e) - })?; - - let insert_query = format!( - "INSERT INTO {} (id, statement) VALUES {}", - full_table_name, - insert_data.join(" , ") - ); - - match client.execute(insert_query.as_str()).await { - Ok(_) => { - info!("Successfully inserted statements into ClickHouse"); - return Ok(()); - } - Err(e) => { - let err_msg = format!("Failed to insert statements into ClickHouse: {}", e); - error!("err_msg: {}", err_msg); - - // Log the specific record causing the issue - let failed_record = bulk_insert_values - .iter() - .find(|(_, statement)| insert_query.contains(statement)) - .map(|(_, statement)| statement.clone()) - .unwrap_or_else(|| "Record not found".to_string()); - error!("Problematic record: {}", failed_record); - let failed_record_id = bulk_insert_values - .iter() - .find(|(id, _)| insert_query.contains(id)) - .map(|(id, _)| id.clone()) - .unwrap_or_else(|| "Record not found".to_string()); - error!("Problematic record_id: {}", failed_record_id); - - retry_count += 1; - if retry_count == max_retries { - let err_msg = "Max retries reached for insertion. Logging failed batch."; - error!("{}", err_msg); - log_failed_batch( - pg_pool, - tenant_name, - clickhouse_db, - clickhouse_table, - &bulk_insert_values, - ) - .await?; - return Err(e.into()); - } else { - let delay_ms = 1000 * retry_count; - info!("Retrying in {} ms...", delay_ms); - tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } } } } - Err(anyhow!("Max retries exceeded")) + + Ok(()) +} + +async fn insert_batch( + ch_pool: &ClickhousePool, + batch: &[(String, String)], + full_table_name: &str, +) -> Result<()> { + let mut client = ch_pool + .get_handle() + .await + .context("Failed to get client from ClickHouse pool")?; + + let insert_data: Result> = futures::future::try_join_all( + batch.iter().map(|(record_id, statement)| async move { + let processed_statement = process_statement(statement).await?; + Ok(format!( + "('{}', '{}', now())", + record_id, processed_statement + )) + }), + ) + .await; + + let insert_data = insert_data.context("Failed to process statements")?; + + let insert_query = format!( + "INSERT INTO {} (id, statement, created_at) VALUES {}", + full_table_name, + insert_data.join(" , ") + ); + + client + .execute(insert_query.as_str()) + .await + .context("Failed to execute insert query")?; + + Ok(()) } async fn deduplicate_clickhouse_data( @@ -483,15 +365,12 @@ async fn deduplicate_clickhouse_data( clickhouse_table: &str, ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let mut client = match ch_pool.get_handle().await { - Ok(client) => client, - Err(e) => { - error!("Failed to get client from ClickHouse pool: {}", e); - return Err(e.into()); - } - }; + let mut client = ch_pool + .get_handle() + .await + .context("Failed to get client from ClickHouse pool")?; - info!("processing duplicate data..."); + info!("Processing duplicate data..."); let create_dedup_table_query = format!( "CREATE TABLE {table}_dedup ENGINE = MergeTree() PARTITION BY toYYYYMM(created_at) PRIMARY KEY id ORDER BY (id, created_at) SETTINGS index_granularity = 8192 AS SELECT id, any(statement) AS statement, any(created_at) AS created_at FROM {table} GROUP BY id", @@ -504,35 +383,20 @@ async fn deduplicate_clickhouse_data( full_table_name, full_table_name ); - match client.execute(create_dedup_table_query.as_str()).await { - Ok(_) => { - info!("Successfully created dedup table in ClickHouse"); - } - Err(e) => { - error!("Failed to create dedup table in ClickHouse: {}", e); - return Err(e.into()); - } - } + client + .execute(create_dedup_table_query.as_str()) + .await + .context("Failed to create dedup table in ClickHouse")?; - match client.execute(drop_table_query.as_str()).await { - Ok(_) => { - info!("Successfully dropped original table in ClickHouse"); - } - Err(e) => { - error!("Failed to drop original table in ClickHouse: {}", e); - return Err(e.into()); - } - } + client + .execute(drop_table_query.as_str()) + .await + .context("Failed to drop original table in ClickHouse")?; - match client.execute(rename_table_query.as_str()).await { - Ok(_) => { - info!("Successfully renamed dedup table in ClickHouse"); - } - Err(e) => { - error!("Failed to rename dedup table in ClickHouse: {}", e); - return Err(e.into()); - } - } + client + .execute(rename_table_query.as_str()) + .await + .context("Failed to rename dedup table in ClickHouse")?; Ok(()) } @@ -544,215 +408,224 @@ async fn log_failed_batch( clickhouse_table: &str, failed_batch: &[(String, String)], ) -> Result<()> { - let failed_batch_json = serde_json::to_string(failed_batch)?; - - let mut client = pg_pool.get().await?; - let statement = client - .prepare( - "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) - VALUES ($1, $2, $3, $4)", - ) - .await?; - - client - .execute( - &statement, - &[ - &tenant_name, - &clickhouse_db, - &clickhouse_table, - &failed_batch_json, - ], - ) - .await?; + let failed_batch_json = + serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; - Ok(()) -} - -async fn retry_failed_batches(app_state: Arc) -> Result<()> { - let pg_pool = &app_state.pg_pool; + let mut client = pg_pool + .get() + .await + .context("Failed to get client from PostgreSQL pool")?; - loop { - let mut client = pg_pool.get().await?; let statement = client .prepare( - "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch - FROM failed_batches - ORDER BY created_at - LIMIT 100", + "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) + VALUES ($1, $2, $3, $4)", ) - .await?; - - let rows = client.query(&statement, &[]).await?; - - for row in rows { - let failed_batch_id: i32 = row.get(0); - let tenant_name: String = row.get(1); - let clickhouse_db: String = row.get(2); - let clickhouse_table: String = row.get(3); - let failed_batch: String = row.get(4); - - let tenant_config = app_state - .config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .cloned(); - - if let Some(tenant_config) = tenant_config { - let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); - let bulk_insert_values: Vec<(String, String)> = - serde_json::from_str(&failed_batch)?; - - if let Err(e) = insert_into_clickhouse( - &ch_pool, - &bulk_insert_values, + .await + .context("Failed to prepare PostgreSQL statement")?; + + client + .execute( + &statement, + &[ + &tenant_name, &clickhouse_db, &clickhouse_table, - pg_pool, - &tenant_name, + &failed_batch_json, + ], + ) + .await + .context("Failed to execute PostgreSQL statement")?; + + Ok(()) + } + + async fn retry_failed_batches(app_state: Arc) -> Result<()> { + let pg_pool = &app_state.pg_pool; + + loop { + let mut client = pg_pool.get().await.context("Failed to get PostgreSQL client")?; + let statement = client + .prepare( + "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch + FROM failed_batches + ORDER BY created_at + LIMIT 100", ) .await - { - error!("Error retrying failed batch: {}", e); - } else { - let delete_statement = client - .prepare("DELETE FROM failed_batches WHERE id = $1") - .await?; - client - .execute(&delete_statement, &[&failed_batch_id]) - .await?; + .context("Failed to prepare PostgreSQL statement")?; + + let rows = client.query(&statement, &[]).await.context("Failed to execute PostgreSQL query")?; + + for row in rows { + let failed_batch_id: i32 = row.get(0); + let tenant_name: String = row.get(1); + let clickhouse_db: String = row.get(2); + let clickhouse_table: String = row.get(3); + let failed_batch: String = row.get(4); + + let tenant_config = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .cloned(); + + if let Some(tenant_config) = tenant_config { + let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); + let bulk_insert_values: Vec<(String, String)> = + serde_json::from_str(&failed_batch).context("Failed to deserialize failed batch")?; + + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &bulk_insert_values, + &clickhouse_db, + &clickhouse_table, + pg_pool, + &tenant_name, + ) + .await + { + error!("Error retrying failed batch: {}", e); + } else { + let delete_statement = client + .prepare("DELETE FROM failed_batches WHERE id = $1") + .await + .context("Failed to prepare delete statement")?; + client + .execute(&delete_statement, &[&failed_batch_id]) + .await + .context("Failed to delete processed failed batch")?; + } } } - } - tokio::time::sleep(std::time::Duration::from_secs(60)).await; + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } } -} -fn validate_date_time(date_time_str: &str) -> Result { - NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M") - .map_err(|e| anyhow!("Invalid date and time format: {}", e)) -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - info!(target: "main", "Starting up"); - - let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); - let config_path = match env.as_str() { - "dev" => "config-dev.yml", - "prod" => "config-prod.yml", - _ => { - error!("Unsupported environment: {}", env); - return Err("Unsupported environment".into()); - } - }; - let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; + fn validate_date_time(date_time_str: &str) -> Result { + NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M") + .map_err(|e| anyhow!("Invalid date and time format: {}", e)) + } - let tenant_name = env::args() - .nth(1) - .ok_or_else(|| anyhow!("Missing tenant name argument"))?; + #[tokio::main] + async fn main() -> Result<()> { + env_logger::init(); + info!(target: "main", "Starting up"); + + let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); + let config_path = match env.as_str() { + "dev" => "config-dev.yml", + "prod" => "config-prod.yml", + _ => { + error!("Unsupported environment: {}", env); + return Err(anyhow!("Unsupported environment")); + } + }; + let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; - let start_date = env::args() - .nth(2) - .ok_or_else(|| anyhow!("Missing start date argument"))?; + let tenant_name = env::args() + .nth(1) + .ok_or_else(|| anyhow!("Missing tenant name argument"))?; - let end_date = env::args() - .nth(3) - .ok_or_else(|| anyhow!("Missing end date argument"))?; + let start_date = env::args() + .nth(2) + .ok_or_else(|| anyhow!("Missing start date argument"))?; - let start_date = validate_date_time(&start_date)?; - let end_date = validate_date_time(&end_date)?; + let end_date = env::args() + .nth(3) + .ok_or_else(|| anyhow!("Missing end date argument"))?; - if end_date < start_date { - return Err(anyhow!("End date must be greater than or equal to start date").into()); - } + let start_date = validate_date_time(&start_date)?; + let end_date = validate_date_time(&end_date)?; - let tenant = config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? - .clone(); - - let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); - let pg_manager = PostgresConnectionManager::new_from_stringlike( - &config.pg_database_url, - tokio_postgres::NoTls, - )?; - let pg_pool = Pool::builder().build(pg_manager).await?; - - let app_state = Arc::new(AppState { - config: config.clone(), - clickhouse_pools: vec![clickhouse_pool], - pg_pool, - }); + if end_date < start_date { + return Err(anyhow!("End date must be greater than or equal to start date")); + } - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); - let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); - - let app_state_clone = app_state.clone(); - let run_handle = tokio::spawn(async move { - match run(app_state_clone, tenant_name, start_date, end_date).await { - Ok(success) => { - if success { - info!("Program ran successfully"); - } else { - error!("Program encountered an error"); + let tenant = config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? + .clone(); + + let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); + let pg_manager = PostgresConnectionManager::new_from_stringlike( + &config.pg_database_url, + tokio_postgres::NoTls, + )?; + let pg_pool = Pool::builder().build(pg_manager).await?; + + let app_state = Arc::new(AppState { + config: config.clone(), + clickhouse_pools: vec![clickhouse_pool], + pg_pool, + }); + + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); + let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); + + let app_state_clone = app_state.clone(); + let run_handle = tokio::spawn(async move { + match run(app_state_clone, tenant_name, start_date, end_date).await { + Ok(success) => { + if success { + info!("Program ran successfully"); + } else { + error!("Program encountered an error"); + } + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); + } + } } - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { - error!("Failed to send shutdown signal"); + Err(e) => { + error!("Error running the program: {}", e); + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); + } } } } - Err(e) => { - error!("Error running the program: {}", e); - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { + }); + + let retry_handle = tokio::spawn(retry_failed_batches(app_state)); + + tokio::select! { + _ = signal::ctrl_c() => { + info!("Received shutdown signal, shutting down gracefully..."); + if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { if let Err(_) = shutdown_tx.send(()) { error!("Failed to send shutdown signal"); } } } - } - }); - - let retry_handle = tokio::spawn(retry_failed_batches(app_state)); - - tokio::select! { - _ = signal::ctrl_c() => { - info!("Received shutdown signal, shutting down gracefully..."); - if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { - error!("Failed to send shutdown signal"); - } + _ = shutdown_rx => { + info!("Program finished, shutting down gracefully..."); } - } - _ = shutdown_rx => { - println!("Program finished, shutting down gracefully.."); - info!("Program finished, shutting down gracefully..."); - } - run_result = run_handle => { - match run_result { - Ok(_) => info!("Run task completed"), - Err(e) => error!("Run task failed: {}", e), + run_result = run_handle => { + match run_result { + Ok(_) => info!("Run task completed"), + Err(e) => error!("Run task failed: {}", e), + } } - } - retry_result = retry_handle => { - match retry_result { - Ok(inner_result) => { - match inner_result { - Ok(_) => info!("Retry task completed successfully"), - Err(e) => error!("Retry task failed: {}", e), + retry_result = retry_handle => { + match retry_result { + Ok(inner_result) => { + match inner_result { + Ok(_) => info!("Retry task completed successfully"), + Err(e) => error!("Retry task failed: {}", e), + } } + Err(e) => error!("Retry task panicked: {}", e), } - Err(e) => error!("Retry task panicked: {}", e), } } - } - Ok(()) -} + Ok(()) + }