diff --git a/Cargo.lock b/Cargo.lock index 69919e2..f2a17f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -962,11 +962,13 @@ dependencies = [ "env_logger", "futures", "hex", + "lazy_static", "log", "mongodb", "r2d2", "r2d2_postgres", "rayon", + "regex", "serde", "serde_json", "serde_yaml", @@ -1292,11 +1294,13 @@ dependencies = [ "env_logger", "futures", "hex", + "lazy_static", "log", "mongodb", "r2d2", "r2d2_postgres", "rayon", + "regex", "serde", "serde_json", "serde_yaml", @@ -1828,9 +1832,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", diff --git a/Cargo.toml b/Cargo.toml index 208cd68..c35fc55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ serde_yaml = "0.9.33" rayon = "1.9.0" sqlx = "0.7.4" chrono = "0.4.35" +regex = "1.10.4" +lazy_static = "1.4.0" [[bin]] diff --git a/historical_data/Cargo.toml b/historical_data/Cargo.toml index 3d0b938..255f504 100644 --- a/historical_data/Cargo.toml +++ b/historical_data/Cargo.toml @@ -27,3 +27,5 @@ serde_yaml = "0.9.33" rayon = "1.9.0" sqlx = "0.7.4" chrono = "0.4.35" +regex = "1.10.4" +lazy_static = "1.4.0" diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 6619b4c..b0c15e4 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -8,6 +8,7 @@ use mongodb::{ Client as MongoClient, }; use rayon::prelude::*; +use regex::Regex; use serde::Deserialize; use serde_json::to_string; use sha2::{Digest, Sha256}; @@ -21,6 +22,7 @@ use std::{ error::Error, fmt, sync::{Arc, Mutex}, + time::Duration, }; use tokio::{signal, sync::mpsc}; use tokio::{ @@ -29,6 +31,30 @@ use tokio::{ }; type PgPool = Pool>; +lazy_static::lazy_static! { + static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_1: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_2: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_3: {}", e); + panic!("Invalid regular expression pattern"); + } + }; +} + #[derive(Deserialize, Clone)] struct TenantConfig { name: String, @@ -97,6 +123,23 @@ async fn run( } } +async fn connect_to_mongo(mongo_uri: &str) -> Result { + let mut retry_delay = Duration::from_secs(1); + loop { + match MongoClient::with_uri_str(mongo_uri).await { + Ok(client) => return Ok(client), + Err(e) => { + error!("Failed to connect to MongoDB: {}", e); + tokio::time::sleep(retry_delay).await; + retry_delay *= 2; + if retry_delay > Duration::from_secs(60) { + return Err(e); + } + } + } + } +} + async fn process_tenant_historical_data( tenant_config: Arc, app_state: Arc, @@ -104,13 +147,13 @@ async fn process_tenant_historical_data( start_date: NaiveDateTime, end_date: NaiveDateTime, ) -> Result<()> { - let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri) + let mongo_client = connect_to_mongo(&tenant_config.mongo_uri) .await .map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?; let mongo_db = mongo_client.database(&tenant_config.mongo_db); let mongo_collection = mongo_db.collection::(&tenant_config.mongo_collection); - println!("start_date----------- {}", start_date); - println!("end_date----------- {}", end_date); + info!("start_date----------- {}", start_date); + info!("end_date----------- {}", end_date); let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone()); let start_datetime = DateTime::::from_utc(start_date, Utc); let end_datetime = DateTime::::from_utc(end_date, Utc); @@ -131,6 +174,34 @@ async fn process_tenant_historical_data( let batch_size = app_state.config.batch_size; let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64; + let full_table_name = format!( + "{}.{}", + tenant_config.clickhouse_db, tenant_config.clickhouse_table + ); + let drop_query = format!("DROP TABLE IF EXISTS {}_dedup", full_table_name); + + let client = match ch_pool.get_handle().await { + Ok(mut client) => { + match client.execute(drop_query.as_str()).await { + Ok(_) => println!("Table dropped successfully"), + Err(e) => { + // Check if the error is related to the table not existing + if e.to_string().contains("doesn't exist") { + println!("Table does not exist, skipping DROP TABLE"); + } else { + eprintln!("Error dropping table: {}", e); + // Handle other errors appropriately + } + } + } + client + } + Err(e) => { + let err_msg = format!("Failed to get client from ClickHouse pool: {}", e); + error!("{}", err_msg); + return Err(anyhow!(err_msg)); + } + }; let batches: Vec<_> = (0..num_batches) .into_par_iter() @@ -281,6 +352,36 @@ fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) - hex::encode(result) } +async fn process_statement(statement: &str) -> Result> { + // Replace all double consecutive backslashes with four backslashes + let output1 = BACKSLASH_REGEX_1 + .replace_all(statement, "\\\\\\\\") + .to_string(); + + // Replace all single backslashes with two backslashes + let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { + if caps[0].len() % 2 == 1 { + "\\\\".to_string() + } else { + caps[0].to_string() + } + }); + + // Replace all more than four backslashes with four backslashes + let output3 = BACKSLASH_REGEX_3 + .replace_all(&output2, "\\\\\\\\") + .to_string(); + + let trimmed_statement = output3 + .trim_start_matches('"') + .trim_end_matches('"') + .replace("\\'", "\\\\'") + .replace("'", "\\'") + .to_string(); + + Ok(trimmed_statement) +} + async fn insert_into_clickhouse( ch_pool: &ClickhousePool, bulk_insert_values: &[(String, String)], @@ -304,26 +405,24 @@ async fn insert_into_clickhouse( // info!("bulk_insert_values: {:?}", bulk_insert_values); // println!("bulk_insert_values: {:?}", bulk_insert_values); while retry_count < max_retries { - let insert_data: Vec = bulk_insert_values - .iter() - .map( - |(record_id, statement)| match serde_json::to_string(statement) { - Ok(serialized_statement) => { - let trimmed_statement = statement - .trim_start_matches('"') - .trim_end_matches('"') - .replace("'", "\\'") - .replace("\\", "\\"); - // println!("Inserting record: {}", trimmed_statement); - format!("('{}' , '{}')", record_id, trimmed_statement) - } - Err(e) => { - error!("Failed to serialize JSON: {}", e); - format!("('{}' , '')", record_id) + let insert_data: Vec = + futures::future::try_join_all(bulk_insert_values.iter().map( + |(record_id, statement)| async move { + match process_statement(statement).await { + Ok(processed_statement) => { + Ok(format!("('{}' , '{}')", record_id, processed_statement)) + } + Err(e) => { + error!("Failed to process statement: {}", e); + Ok(format!("('{}' , '')", record_id)) + } } }, - ) - .collect(); + )) + .await + .map_err(|e: Box| { + anyhow!("Failed to process statements: {}", e) + })?; let insert_query = format!( "INSERT INTO {} (id, statement) VALUES {}", diff --git a/src/main.rs b/src/main.rs index b3a1aef..fc0fcf1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,13 +11,38 @@ use mongodb::{ options::ChangeStreamOptions, Client as MongoClient, }; +use regex::Regex; use serde::Deserialize; use serde_json::{to_string, Value}; use sha2::{Digest, Sha256}; -use std::{env, error::Error, sync::Arc}; +use std::{env, error::Error, sync::Arc, time::Duration}; use tokio::task; use tokio_postgres::{Client as PostgresClient, NoTls}; +lazy_static::lazy_static! { + static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_1: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_2: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_3: {}", e); + panic!("Invalid regular expression pattern"); + } + }; +} + #[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, @@ -71,12 +96,30 @@ async fn run(app_state: Arc) -> Result<(), Box> { Ok(()) } +async fn connect_to_mongo(mongo_uri: &str) -> Result { + let mut retry_delay = Duration::from_secs(1); + loop { + match MongoClient::with_uri_str(mongo_uri).await { + Ok(client) => return Ok(client), + Err(e) => { + error!("Failed to connect to MongoDB: {}", e); + tokio::time::sleep(retry_delay).await; + retry_delay *= 2; + if retry_delay > Duration::from_secs(60) { + return Err(e); + } + } + } + } +} + async fn process_tenant_records( tenant_config: TenantConfig, app_state: Arc, pool_index: usize, ) -> Result<()> { - let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; + let mongo_client = connect_to_mongo(&tenant_config.mongo_uri).await?; + //let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; // println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri); // println!("<<-- mongo_client {:?}", mongo_client); let mongo_db = mongo_client.database(&tenant_config.mongo_db); @@ -118,12 +161,12 @@ async fn process_tenant_records( { let record_id = doc.get("_id").and_then(|id| id.as_object_id()); let statement = doc.get("statement").and_then(|s| s.as_document()); - + match (record_id, statement) { (Some(record_id), Some(statement)) => { let record_id_str = record_id.to_hex(); info!("Record ID: {}", record_id_str); - + let mut statement = statement.to_owned(); if let Some(actor) = statement.get_mut("actor").and_then(|a| a.as_document_mut()) @@ -196,7 +239,7 @@ async fn process_tenant_records( } else { warn!("Missing 'actor' field in 'statement'"); } - + let statement_str = match to_string(&statement) { Ok(s) => s, Err(e) => { @@ -204,7 +247,7 @@ async fn process_tenant_records( continue; } }; - + insert_into_clickhouse( &ch_pool, &statement_str, @@ -224,7 +267,7 @@ async fn process_tenant_records( warn!("Missing both '_id' and 'statement' fields in the document"); } } - + if let Some(resume_token) = change_stream.resume_token() { let token_bytes = match bson::to_vec(&resume_token) { Ok(bytes) => bytes, @@ -234,7 +277,7 @@ async fn process_tenant_records( } }; let tenant_name = tenant_config.name.clone(); - + let pg_pool = app_state.postgres_pool.clone(); match pg_pool.get().await { Ok(pg_conn) => { @@ -284,6 +327,36 @@ fn anonymize_data(data: &Bson, encryption_salt: &str, tenant_name: &str) -> Stri hex::encode(result) } +async fn process_statement(statement: &str) -> Result> { + // Replace all double consecutive backslashes with four backslashes + let output1 = BACKSLASH_REGEX_1 + .replace_all(statement, "\\\\\\\\") + .to_string(); + + // Replace all single backslashes with two backslashes + let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { + if caps[0].len() % 2 == 1 { + "\\\\".to_string() + } else { + caps[0].to_string() + } + }); + + // Replace all more than four backslashes with four backslashes + let output3 = BACKSLASH_REGEX_3 + .replace_all(&output2, "\\\\\\\\") + .to_string(); + + let trimmed_statement = output3 + .trim_start_matches('"') + .trim_end_matches('"') + .replace("\\'", "\\\\'") + .replace("'", "\\'") + .to_string(); + + Ok(trimmed_statement) +} + async fn insert_into_clickhouse( ch_pool: &ClickhousePool, statement_str: &str, @@ -293,7 +366,14 @@ async fn insert_into_clickhouse( ) { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let escaped_statement_str = statement_str.replace("'", "\\\'"); + // let escaped_statement_str = statement_str.replace("'", "\\\'"); + let escaped_statement_str = match process_statement(statement_str).await { + Ok(escaped_str) => escaped_str, + Err(e) => { + error!("Failed to process statement: {}", e); + return; + } + }; let insert_query = format!( "INSERT INTO {} (id, statement) VALUES ('{}', '{}')",