From e76c9b378b2d768dda5e41d749cea1ebbcb8761f Mon Sep 17 00:00:00 2001 From: Isanka Date: Mon, 25 Mar 2024 14:38:06 +0900 Subject: [PATCH] na --- historical_data/src/main.rs | 95 ++++++++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 22 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index fdfa5fd..cf0b816 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -16,7 +16,11 @@ use bb8::Pool; use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, NaiveDate, Timelike, Utc}; use rayon::prelude::*; -use std::{env, error::Error, sync::Arc}; +use std::{ + env, + error::Error, + sync::{Arc, Mutex}, +}; use tokio::{signal, sync::mpsc}; use tokio::{ sync::{broadcast, oneshot}, @@ -57,7 +61,7 @@ async fn run( tenant_name: String, start_date: chrono::NaiveDate, end_date: chrono::NaiveDate, -) -> Result<(), Box> { +) -> Result> { let tenant = app_state .config .tenants @@ -78,13 +82,18 @@ async fn run( } let ch_pool = &app_state.clickhouse_pools[0]; - if let Err(e) = - deduplicate_clickhouse_data(ch_pool, &tenant.clickhouse_db, &tenant.clickhouse_table).await + match deduplicate_clickhouse_data(ch_pool, &tenant.clickhouse_db, &tenant.clickhouse_table) + .await { - error!("Error deduplicating data for tenant {}: {}", tenant.name, e); + Ok(_) => { + info!("Successfully deduplicated data for tenant {}", tenant.name); + Ok(true) + } + Err(e) => { + error!("Error deduplicating data for tenant {}: {}", tenant.name, e); + Ok(false) + } } - - Ok(()) } async fn process_tenant_historical_data( @@ -468,6 +477,7 @@ async fn main() -> Result<(), Box> { }; let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; + let tenant_name = env::args() .nth(1) .ok_or_else(|| anyhow!("Missing tenant name argument"))?; @@ -495,7 +505,6 @@ async fn main() -> Result<(), Box> { .clone(); let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); - let pg_manager = PostgresConnectionManager::new_from_stringlike( &config.pg_database_url, tokio_postgres::NoTls, @@ -507,27 +516,69 @@ async fn main() -> Result<(), Box> { clickhouse_pools: vec![clickhouse_pool], pg_pool, }); + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); + let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); let app_state_clone = app_state.clone(); - tokio::spawn(async move { - run(app_state_clone, tenant_name, start_date, end_date) - .await - .unwrap(); + let run_handle = tokio::spawn(async move { + match run(app_state_clone, tenant_name, start_date, end_date).await { + Ok(success) => { + if success { + info!("Program ran successfully"); + } else { + error!("Program encountered an error"); + } + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); + } + } + } + Err(e) => { + error!("Error running the program: {}", e); + if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); + } + } + } + } }); - let handle = tokio::spawn(retry_failed_batches(app_state)); + let retry_handle = tokio::spawn(retry_failed_batches(app_state)); - if let Err(e) = handle.await { - error!("retry_failed_batches task failed: {}", e); - } else { - info!("Retry finished!"); + tokio::select! { + _ = signal::ctrl_c() => { + info!("Received shutdown signal, shutting down gracefully..."); + if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { + if let Err(_) = shutdown_tx.send(()) { + error!("Failed to send shutdown signal"); + } + } + } + _ = shutdown_rx => { + info!("Program finished, shutting down gracefully..."); + } + run_result = run_handle => { + match run_result { + Ok(_) => info!("Run task completed"), + Err(e) => error!("Run task failed: {}", e), + } + } + retry_result = retry_handle => { + match retry_result { + Ok(inner_result) => { + match inner_result { + Ok(_) => info!("Retry task completed successfully"), + Err(e) => error!("Retry task failed: {}", e), + } + } + Err(e) => error!("Retry task panicked: {}", e), + } + } } - signal::ctrl_c().await?; - info!("Received shutdown signal, shutting down gracefully..."); - - shutdown_tx.send(()).unwrap(); - Ok(()) }