Skip to content

Commit

Permalink
Merge pull request #27 from isankadn/dev
Browse files Browse the repository at this point in the history
na
  • Loading branch information
isankadn authored Mar 25, 2024
2 parents 7e99dcf + e76c9b3 commit 487ef8b
Showing 1 changed file with 73 additions and 22 deletions.
95 changes: 73 additions & 22 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use rayon::prelude::*;
use std::{env, error::Error, sync::Arc};
use std::{
env,
error::Error,
sync::{Arc, Mutex},
};
use tokio::{signal, sync::mpsc};
use tokio::{
sync::{broadcast, oneshot},
Expand Down Expand Up @@ -57,7 +61,7 @@ async fn run(
tenant_name: String,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
) -> Result<(), Box<dyn Error>> {
) -> Result<bool, Box<dyn Error>> {
let tenant = app_state
.config
.tenants
Expand All @@ -78,13 +82,18 @@ async fn run(
}

let ch_pool = &app_state.clickhouse_pools[0];
if let Err(e) =
deduplicate_clickhouse_data(ch_pool, &tenant.clickhouse_db, &tenant.clickhouse_table).await
match deduplicate_clickhouse_data(ch_pool, &tenant.clickhouse_db, &tenant.clickhouse_table)
.await
{
error!("Error deduplicating data for tenant {}: {}", tenant.name, e);
Ok(_) => {
info!("Successfully deduplicated data for tenant {}", tenant.name);
Ok(true)
}
Err(e) => {
error!("Error deduplicating data for tenant {}: {}", tenant.name, e);
Ok(false)
}
}

Ok(())
}

async fn process_tenant_historical_data(
Expand Down Expand Up @@ -468,6 +477,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
};

let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?;

let tenant_name = env::args()
.nth(1)
.ok_or_else(|| anyhow!("Missing tenant name argument"))?;
Expand Down Expand Up @@ -495,7 +505,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
.clone();

let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri);

let pg_manager = PostgresConnectionManager::new_from_stringlike(
&config.pg_database_url,
tokio_postgres::NoTls,
Expand All @@ -507,27 +516,69 @@ async fn main() -> Result<(), Box<dyn Error>> {
clickhouse_pools: vec![clickhouse_pool],
pg_pool,
});

let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx)));
let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt);

let app_state_clone = app_state.clone();
tokio::spawn(async move {
run(app_state_clone, tenant_name, start_date, end_date)
.await
.unwrap();
let run_handle = tokio::spawn(async move {
match run(app_state_clone, tenant_name, start_date, end_date).await {
Ok(success) => {
if success {
info!("Program ran successfully");
} else {
error!("Program encountered an error");
}
if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() {
if let Err(_) = shutdown_tx.send(()) {
error!("Failed to send shutdown signal");
}
}
}
Err(e) => {
error!("Error running the program: {}", e);
if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() {
if let Err(_) = shutdown_tx.send(()) {
error!("Failed to send shutdown signal");
}
}
}
}
});

let handle = tokio::spawn(retry_failed_batches(app_state));
let retry_handle = tokio::spawn(retry_failed_batches(app_state));

if let Err(e) = handle.await {
error!("retry_failed_batches task failed: {}", e);
} else {
info!("Retry finished!");
tokio::select! {
_ = signal::ctrl_c() => {
info!("Received shutdown signal, shutting down gracefully...");
if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() {
if let Err(_) = shutdown_tx.send(()) {
error!("Failed to send shutdown signal");
}
}
}
_ = shutdown_rx => {
info!("Program finished, shutting down gracefully...");
}
run_result = run_handle => {
match run_result {
Ok(_) => info!("Run task completed"),
Err(e) => error!("Run task failed: {}", e),
}
}
retry_result = retry_handle => {
match retry_result {
Ok(inner_result) => {
match inner_result {
Ok(_) => info!("Retry task completed successfully"),
Err(e) => error!("Retry task failed: {}", e),
}
}
Err(e) => error!("Retry task panicked: {}", e),
}
}
}

signal::ctrl_c().await?;
info!("Received shutdown signal, shutting down gracefully...");

shutdown_tx.send(()).unwrap();

Ok(())
}

0 comments on commit 487ef8b

Please sign in to comment.