diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index e8da472..9e887c0 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -26,7 +26,7 @@ struct TenantConfig { clickhouse_table: String, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] struct AppConfig { tenants: Vec, encryption_salt: String, @@ -41,20 +41,25 @@ struct AppState { clickhouse_pools: Vec, } -async fn run(app_state: Arc) -> Result<(), Box> { - let tenants = app_state.config.tenants.clone(); - - let futures = tenants.into_iter().enumerate().map(|(index, tenant)| { - let app_state = app_state.clone(); - let tenant_cloned = tenant.clone(); - tokio::spawn(async move { - if let Err(e) = process_tenant_historical_data(tenant, app_state, index).await { - error!("Error processing tenant {}: {}", tenant_cloned.name, e); - } - }) - }); +async fn run(app_state: Arc, tenant_name: String) -> Result<(), Box> { + let tenant = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .ok_or_else(|| anyhow!("Tenant not found in the configuration"))?; + println!("tenant name {:?}", tenant.name); + if let Err(e) = process_tenant_historical_data(tenant.clone(), Arc::clone(&app_state), 0).await + { + error!("Error processing tenant {}: {}", tenant.name, e); + } - futures::future::join_all(futures).await; + let ch_pool = &app_state.clickhouse_pools[0]; + if let Err(e) = + deduplicate_clickhouse_data(ch_pool, &tenant.clickhouse_db, &tenant.clickhouse_table).await + { + error!("Error deduplicating data for tenant {}: {}", tenant.name, e); + } Ok(()) } @@ -89,6 +94,7 @@ async fn process_tenant_historical_data( match mongo_collection.find(None, options).await { Ok(mut cursor) => { while let Some(result) = cursor.next().await { + print!("tent info {:?}", &tenant_config.name); if let Ok(doc) = result { let record_id = doc.get("_id").and_then(|id| id.as_object_id()); let statement = doc.get("statement").and_then(|s| s.as_document()); @@ -217,6 +223,48 @@ async fn insert_into_clickhouse( Err(anyhow!("Max retries exceeded")) } +async fn deduplicate_clickhouse_data( + ch_pool: &ClickhousePool, + clickhouse_db: &str, + clickhouse_table: &str, +) -> Result<()> { + let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); + let mut client = match ch_pool.get_handle().await { + Ok(client) => client, + Err(e) => { + error!("Failed to get client from ClickHouse pool: {}", e); + return Err(e.into()); + } + }; + + let deduplicate_query = format!( + " + CREATE TABLE {table}_dedup AS + SELECT * FROM ( + SELECT *, row_number() OVER (PARTITION BY id ORDER BY id) AS row_num + FROM {table} + ) + WHERE row_num = 1; + + DROP TABLE {table}; + + RENAME TABLE {table}_dedup TO {table}; + ", + table = full_table_name + ); + + match client.execute(deduplicate_query.as_str()).await { + Ok(_) => { + info!("Successfully deduplicated data in ClickHouse"); + Ok(()) + } + Err(e) => { + error!("Failed to deduplicate data in ClickHouse: {}", e); + Err(e.into()) + } + } +} + #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); @@ -233,19 +281,25 @@ async fn main() -> Result<(), Box> { }; let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; + let tenant_name = env::args() + .nth(1) + .ok_or_else(|| anyhow!("Missing tenant name argument"))?; - let mut clickhouse_pools = Vec::new(); - for tenant in &config.tenants { - let clickhouse_pool = ClickhousePool::new(&*tenant.clickhouse_uri); - clickhouse_pools.push(clickhouse_pool); - } + let tenant = config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? + .clone(); + + let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); let app_state = Arc::new(AppState { - config, - clickhouse_pools, + config: config.clone(), + clickhouse_pools: vec![clickhouse_pool], }); - run(app_state).await?; + run(app_state, tenant_name).await?; Ok(()) } diff --git a/src/main.rs b/src/main.rs index bbb8a1b..ca37d4d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -134,8 +134,11 @@ async fn process_tenant_records( .and_then(|acc| acc.as_document_mut()) { if let Some(name) = account.get_mut("name") { - let anonymized_name = - anonymize_data(name, &app_state.config.encryption_salt, &tenant_config.name); + let anonymized_name = anonymize_data( + name, + &app_state.config.encryption_salt, + &tenant_config.name, + ); *name = Bson::String(anonymized_name); info!("<<-- Modified statement: {:?}", statement); } else { @@ -286,7 +289,7 @@ async fn insert_into_clickhouse( error!("Max retries reached. Giving up."); return; } else { - let delay_ms = 1000 * retry_count; + let delay_ms = 1000 * retry_count; info!("Retrying in {} ms...", delay_ms); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } @@ -295,6 +298,48 @@ async fn insert_into_clickhouse( } } +async fn deduplicate_clickhouse_data( + ch_pool: &ClickhousePool, + clickhouse_db: &str, + clickhouse_table: &str, +) -> Result<()> { + let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); + let mut client = match ch_pool.get_handle().await { + Ok(client) => client, + Err(e) => { + error!("Failed to get client from ClickHouse pool: {}", e); + return Err(e.into()); + } + }; + + let deduplicate_query = format!( + " + CREATE TABLE {table}_dedup AS + SELECT * FROM ( + SELECT *, row_number() OVER (PARTITION BY id ORDER BY id) AS row_num + FROM {table} + ) + WHERE row_num = 1; + + DROP TABLE {table}; + + RENAME TABLE {table}_dedup TO {table}; + ", + table = full_table_name + ); + + match client.execute(deduplicate_query.as_str()).await { + Ok(_) => { + info!("Successfully deduplicated data in ClickHouse"); + Ok(()) + } + Err(e) => { + error!("Failed to deduplicate data in ClickHouse: {}", e); + Err(e.into()) + } + } +} + #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init();