diff --git a/Cargo.lock b/Cargo.lock index 75cd269..69919e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -287,9 +287,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.35" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1308,9 +1308,9 @@ dependencies = [ [[package]] name = "mongodb" -version = "2.8.1" +version = "2.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de59562e5c71656c098d8e966641b31da87b89dc3dcb6e761d3b37dcdfa0cb72" +checksum = "ef206acb1b72389b49bc9985efe7eb1f8a9bb18e5680d262fac26c07f44025f1" dependencies = [ "async-trait", "base64 0.13.1", @@ -2126,9 +2126,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.33" +version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0623d197252096520c6f2a5e1171ee436e5af99a5d7caa2891e55e61950e6d9" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ "indexmap", "itoa", @@ -2602,9 +2602,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index ec035a6..6619b4c 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -14,11 +14,12 @@ use sha2::{Digest, Sha256}; use bb8::Pool; use bb8_postgres::PostgresConnectionManager; -use chrono::{DateTime, NaiveDate, Timelike, Utc}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, Timelike, Utc}; use rayon::prelude::*; use std::{ env, error::Error, + fmt, sync::{Arc, Mutex}, }; use tokio::{signal, sync::mpsc}; @@ -59,8 +60,8 @@ struct AppState { async fn run( app_state: Arc, tenant_name: String, - start_date: chrono::NaiveDate, - end_date: chrono::NaiveDate, + start_date: chrono::NaiveDateTime, + end_date: chrono::NaiveDateTime, ) -> Result> { let tenant = app_state .config @@ -100,18 +101,19 @@ async fn process_tenant_historical_data( tenant_config: Arc, app_state: Arc, pool_index: usize, - start_date: NaiveDate, - end_date: NaiveDate, + start_date: NaiveDateTime, + end_date: NaiveDateTime, ) -> Result<()> { let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri) .await .map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?; let mongo_db = mongo_client.database(&tenant_config.mongo_db); let mongo_collection = mongo_db.collection::(&tenant_config.mongo_collection); - + println!("start_date----------- {}", start_date); + println!("end_date----------- {}", end_date); let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone()); - let start_datetime = DateTime::::from_utc(start_date.and_hms(0, 0, 0), Utc); - let end_datetime = DateTime::::from_utc(end_date.and_hms(23, 59, 59), Utc); + let start_datetime = DateTime::::from_utc(start_date, Utc); + let end_datetime = DateTime::::from_utc(end_date, Utc); let filter = doc! { "timestamp": { @@ -164,8 +166,8 @@ async fn process_tenant_historical_data( if let (Some(record_id), Some(statement)) = (record_id, statement) { let record_id_str = record_id.to_hex(); - let mut statement = statement.to_owned(); + if let Some(actor) = statement.get_mut("actor").and_then(|a| a.as_document_mut()) { @@ -178,10 +180,6 @@ async fn process_tenant_historical_data( let anonymized_name = if name_str.contains(':') { let parts: Vec<&str> = name_str.split(':').collect(); - info!( - "{}", - &bson::Bson::String(parts[1].to_string()) - ); if parts.len() == 2 { anonymize_data( &bson::Bson::String(parts[1].to_string()), @@ -198,10 +196,6 @@ async fn process_tenant_historical_data( } else if name_str.contains('@') { let parts: Vec<&str> = name_str.split('@').collect(); - info!( - "{}", - &bson::Bson::String(parts[0].to_string()) - ); if parts.len() == 2 { anonymize_data( &bson::Bson::String(parts[0].to_string()), @@ -216,10 +210,6 @@ async fn process_tenant_historical_data( ) } } else { - info!( - "{}", - &bson::Bson::String(name_str.to_string()) - ); anonymize_data( &bson::Bson::String(name_str.to_string()), &app_state.config.encryption_salt, @@ -231,13 +221,13 @@ async fn process_tenant_historical_data( } } } - + // info!("Statement: {}", statement); let statement_str = to_string(&statement).unwrap_or_default(); batch.push((record_id_str, statement_str)); } } } - + // println!("batch: {:?}", batch); if let Err(e) = insert_into_clickhouse( &ch_pool, &batch, @@ -250,7 +240,6 @@ async fn process_tenant_historical_data( { error!("Error inserting into ClickHouse: {}", e); } - batch } }) @@ -259,7 +248,17 @@ async fn process_tenant_historical_data( let handles: Vec>> = batches.into_iter().map(tokio::spawn).collect(); for handle in handles { - handle.await.unwrap(); + match handle.await { + Ok(processed_records) => { + // The batch was processed successfully and inserted into ClickHouse + // You can perform any additional actions or logging here if needed + // ... + } + Err(e) => { + error!("Error joining batch task: {}", e); + // Handle the error appropriately, such as logging, retrying, or taking corrective action + } + } } Ok(()) @@ -303,21 +302,35 @@ async fn insert_into_clickhouse( let max_retries = 5; let mut retry_count = 0; // info!("bulk_insert_values: {:?}", bulk_insert_values); + // println!("bulk_insert_values: {:?}", bulk_insert_values); while retry_count < max_retries { let insert_data: Vec = bulk_insert_values .iter() - .map(|(record_id, statement)| { - let escaped_statement = statement.replace("'", "\\'"); - format!("('{}', '{}')", record_id, escaped_statement) - }) + .map( + |(record_id, statement)| match serde_json::to_string(statement) { + Ok(serialized_statement) => { + let trimmed_statement = statement + .trim_start_matches('"') + .trim_end_matches('"') + .replace("'", "\\'") + .replace("\\", "\\"); + // println!("Inserting record: {}", trimmed_statement); + format!("('{}' , '{}')", record_id, trimmed_statement) + } + Err(e) => { + error!("Failed to serialize JSON: {}", e); + format!("('{}' , '')", record_id) + } + }, + ) .collect(); let insert_query = format!( "INSERT INTO {} (id, statement) VALUES {}", full_table_name, - insert_data.join(",") + insert_data.join(" , ") ); - // info!("insert_query: {:?}", insert_query); + match client.execute(insert_query.as_str()).await { Ok(_) => { info!("Successfully inserted statements into ClickHouse"); @@ -325,7 +338,22 @@ async fn insert_into_clickhouse( } Err(e) => { let err_msg = format!("Failed to insert statements into ClickHouse: {}", e); - error!("{}", err_msg); + error!("err_msg: {}", err_msg); + + // Log the specific record causing the issue + let failed_record = bulk_insert_values + .iter() + .find(|(_, statement)| insert_query.contains(statement)) + .map(|(_, statement)| statement.clone()) + .unwrap_or_else(|| "Record not found".to_string()); + error!("Problematic record: {}", failed_record); + let failed_record_id = bulk_insert_values + .iter() + .find(|(id, _)| insert_query.contains(id)) + .map(|(id, _)| id.clone()) + .unwrap_or_else(|| "Record not found".to_string()); + error!("Problematic record_id: {}", failed_record_id); + retry_count += 1; if retry_count == max_retries { let err_msg = "Max retries reached for insertion. Logging failed batch."; @@ -347,7 +375,6 @@ async fn insert_into_clickhouse( } } } - Err(anyhow!("Max retries exceeded")) } @@ -504,9 +531,9 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { } } -fn validate_date(date_str: &str) -> Result { - chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") - .map_err(|e| anyhow!("Invalid date format: {}", e)) +fn validate_date_time(date_time_str: &str) -> Result { + NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M") + .map_err(|e| anyhow!("Invalid date and time format: {}", e)) } #[tokio::main] @@ -523,7 +550,6 @@ async fn main() -> Result<(), Box> { return Err("Unsupported environment".into()); } }; - let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; let tenant_name = env::args() @@ -538,8 +564,8 @@ async fn main() -> Result<(), Box> { .nth(3) .ok_or_else(|| anyhow!("Missing end date argument"))?; - let start_date = validate_date(&start_date)?; - let end_date = validate_date(&end_date)?; + let start_date = validate_date_time(&start_date)?; + let end_date = validate_date_time(&end_date)?; if end_date < start_date { return Err(anyhow!("End date must be greater than or equal to start date").into()); diff --git a/src/main.rs b/src/main.rs index 881d542..b3a1aef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,7 +109,6 @@ async fn process_tenant_records( let mut change_stream = mongo_collection.watch(None, change_stream_options).await?; while let Some(result) = change_stream.next().await { - // println!(">>--- Change event: {:?}", result); match result { Ok(change_event) => { if let ChangeStreamEvent { @@ -119,16 +118,12 @@ async fn process_tenant_records( { let record_id = doc.get("_id").and_then(|id| id.as_object_id()); let statement = doc.get("statement").and_then(|s| s.as_document()); - - // Extract additional fields as needed - // let additional_field1 = doc.get("additional_field1").and_then(|f| f.as_str()); - // let additional_field2 = doc.get("additional_field2").and_then(|f| f.as_i32()); - + match (record_id, statement) { (Some(record_id), Some(statement)) => { let record_id_str = record_id.to_hex(); info!("Record ID: {}", record_id_str); - + let mut statement = statement.to_owned(); if let Some(actor) = statement.get_mut("actor").and_then(|a| a.as_document_mut()) @@ -191,7 +186,6 @@ async fn process_tenant_records( ) }; *name = bson::Bson::String(anonymized_name); - // info!("<<-- Modified statement: {:?}", statement); } else { warn!("Missing 'name' field in 'actor.account'"); } @@ -202,7 +196,7 @@ async fn process_tenant_records( } else { warn!("Missing 'actor' field in 'statement'"); } - + let statement_str = match to_string(&statement) { Ok(s) => s, Err(e) => { @@ -210,8 +204,7 @@ async fn process_tenant_records( continue; } }; - // info!("Inserting statement into ClickHouse: {}", statement_str); - + insert_into_clickhouse( &ch_pool, &statement_str, @@ -220,23 +213,18 @@ async fn process_tenant_records( &tenant_config.clickhouse_table, ) .await; - - // println!(">>-- Statement: {}", statement_str); } (None, Some(_)) => { warn!("Missing '_id' field in the document"); - // Handle the missing '_id' field, e.g., generate a unique identifier or skip the document } (Some(_), None) => { warn!("Missing 'statement' field in the document"); - // Handle the missing 'statement' field, e.g., skip the document or use default values } (None, None) => { warn!("Missing both '_id' and 'statement' fields in the document"); - // Handle the case when both fields are missing } } - + if let Some(resume_token) = change_stream.resume_token() { let token_bytes = match bson::to_vec(&resume_token) { Ok(bytes) => bytes, @@ -246,7 +234,7 @@ async fn process_tenant_records( } }; let tenant_name = tenant_config.name.clone(); - + let pg_pool = app_state.postgres_pool.clone(); match pg_pool.get().await { Ok(pg_conn) => { @@ -258,23 +246,19 @@ async fn process_tenant_records( .await { error!("Failed to update resume token in PostgreSQL: {}", e); - // Handle the error: retry or log the failure } } Err(e) => { error!("Failed to get PostgreSQL connection: {}", e); - // Handle the error: retry or log the failure } }; } } else { warn!("Missing 'full_document' field in the change stream event"); - // Handle the missing 'full_document' field: skip the event or use default values } } Err(e) => { error!("Change stream error: {}", e); - // Handle change stream errors: implement retry logic or log the error } } }