From 9b5d8298d3db7e8e9cfbc27eff47bacdcd925842 Mon Sep 17 00:00:00 2001 From: Isanka Date: Thu, 25 Apr 2024 11:17:38 +0900 Subject: [PATCH 1/3] working code --- Cargo.lock | 5 ++- Cargo.toml | 1 + historical_data/src/main.rs | 85 +++++++++++++++++++++++++++++++++++-- 3 files changed, 85 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69919e2..43f824d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1297,6 +1297,7 @@ dependencies = [ "r2d2", "r2d2_postgres", "rayon", + "regex", "serde", "serde_json", "serde_yaml", @@ -1828,9 +1829,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", diff --git a/Cargo.toml b/Cargo.toml index 208cd68..d97e228 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ serde_yaml = "0.9.33" rayon = "1.9.0" sqlx = "0.7.4" chrono = "0.4.35" +regex = "1.10.4" [[bin]] diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 6619b4c..bce9e89 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -8,6 +8,7 @@ use mongodb::{ Client as MongoClient, }; use rayon::prelude::*; +use regex::Regex; use serde::Deserialize; use serde_json::to_string; use sha2::{Digest, Sha256}; @@ -109,8 +110,8 @@ async fn process_tenant_historical_data( .map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?; let mongo_db = mongo_client.database(&tenant_config.mongo_db); let mongo_collection = mongo_db.collection::(&tenant_config.mongo_collection); - println!("start_date----------- {}", start_date); - println!("end_date----------- {}", end_date); + info!("start_date----------- {}", start_date); + info!("end_date----------- {}", end_date); let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone()); let start_datetime = DateTime::::from_utc(start_date, Utc); let end_datetime = DateTime::::from_utc(end_date, Utc); @@ -131,6 +132,34 @@ async fn process_tenant_historical_data( let batch_size = app_state.config.batch_size; let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64; + let full_table_name = format!( + "{}.{}", + tenant_config.clickhouse_db, tenant_config.clickhouse_table + ); + let drop_query = format!("DROP TABLE IF EXISTS {}_dedup", full_table_name); + + let client = match ch_pool.get_handle().await { + Ok(mut client) => { + match client.execute(drop_query.as_str()).await { + Ok(_) => println!("Table dropped successfully"), + Err(e) => { + // Check if the error is related to the table not existing + if e.to_string().contains("doesn't exist") { + println!("Table does not exist, skipping DROP TABLE"); + } else { + eprintln!("Error dropping table: {}", e); + // Handle other errors appropriately + } + } + } + client + } + Err(e) => { + let err_msg = format!("Failed to get client from ClickHouse pool: {}", e); + error!("{}", err_msg); + return Err(anyhow!(err_msg)); + } + }; let batches: Vec<_> = (0..num_batches) .into_par_iter() @@ -281,6 +310,35 @@ fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) - hex::encode(result) } +async fn process_statement(statement: &str) -> Result> { + // Step 1: Replace all double consecutive backslashes with four backslashes + let re1 = Regex::new(r"\\{2}")?; + let output1 = re1.replace_all(statement, "\\\\\\\\").to_string(); + + // Step 2: Replace all single backslashes with two backslashes + let re2 = Regex::new(r"\\(?:\\\\)*")?; + let output2 = re2.replace_all(&output1, |caps: ®ex::Captures| { + if caps[0].len() % 2 == 1 { + "\\\\".to_string() + } else { + caps[0].to_string() + } + }); + + // Step 3: Replace all more than four backslashes with four backslashes + let re3 = Regex::new(r"\\{4,}")?; + let output3 = re3.replace_all(&output2, "\\\\\\\\").to_string(); + + let trimmed_statement = output3 + .trim_start_matches('"') + .trim_end_matches('"') + .replace("\\'", "\\\\'") + .replace("'", "\\'") + .to_string(); + + Ok(trimmed_statement) +} + async fn insert_into_clickhouse( ch_pool: &ClickhousePool, bulk_insert_values: &[(String, String)], @@ -309,11 +367,30 @@ async fn insert_into_clickhouse( .map( |(record_id, statement)| match serde_json::to_string(statement) { Ok(serialized_statement) => { - let trimmed_statement = statement + let re1 = Regex::new(r"\\{2}").unwrap(); + let output1 = re1.replace_all(statement, "\\\\\\\\").to_string(); + + // Step 2: Replace all single backslashes with two backslashes + let re2 = Regex::new(r"\\(?:\\\\)*").unwrap(); + let output2 = re2.replace_all(&output1, |caps: ®ex::Captures| { + if caps[0].len() % 2 == 1 { + "\\\\".to_string() + } else { + caps[0].to_string() + } + }); + + // Step 3: Replace all more than four backslashes with four backslashes + let re3 = Regex::new(r"\\{4,}").unwrap(); + let output3 = re3.replace_all(&output2, "\\\\\\\\").to_string(); + + let trimmed_statement = output2 .trim_start_matches('"') .trim_end_matches('"') + .replace("\\'", "\\\\'") .replace("'", "\\'") - .replace("\\", "\\"); + .to_string(); + // println!("Inserting record: {}", trimmed_statement); format!("('{}' , '{}')", record_id, trimmed_statement) } From b846af04b2914a35b1f7c1a425772a562305ba4d Mon Sep 17 00:00:00 2001 From: Isanka Date: Thu, 25 Apr 2024 13:56:58 +0900 Subject: [PATCH 2/3] working code --- historical_data/src/main.rs | 53 +++++++++++-------------------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index bce9e89..d93e6c5 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -362,45 +362,24 @@ async fn insert_into_clickhouse( // info!("bulk_insert_values: {:?}", bulk_insert_values); // println!("bulk_insert_values: {:?}", bulk_insert_values); while retry_count < max_retries { - let insert_data: Vec = bulk_insert_values - .iter() - .map( - |(record_id, statement)| match serde_json::to_string(statement) { - Ok(serialized_statement) => { - let re1 = Regex::new(r"\\{2}").unwrap(); - let output1 = re1.replace_all(statement, "\\\\\\\\").to_string(); - - // Step 2: Replace all single backslashes with two backslashes - let re2 = Regex::new(r"\\(?:\\\\)*").unwrap(); - let output2 = re2.replace_all(&output1, |caps: ®ex::Captures| { - if caps[0].len() % 2 == 1 { - "\\\\".to_string() - } else { - caps[0].to_string() - } - }); - - // Step 3: Replace all more than four backslashes with four backslashes - let re3 = Regex::new(r"\\{4,}").unwrap(); - let output3 = re3.replace_all(&output2, "\\\\\\\\").to_string(); - - let trimmed_statement = output2 - .trim_start_matches('"') - .trim_end_matches('"') - .replace("\\'", "\\\\'") - .replace("'", "\\'") - .to_string(); - - // println!("Inserting record: {}", trimmed_statement); - format!("('{}' , '{}')", record_id, trimmed_statement) - } - Err(e) => { - error!("Failed to serialize JSON: {}", e); - format!("('{}' , '')", record_id) + let insert_data: Vec = + futures::future::try_join_all(bulk_insert_values.iter().map( + |(record_id, statement)| async move { + match process_statement(statement).await { + Ok(processed_statement) => { + Ok(format!("('{}' , '{}')", record_id, processed_statement)) + } + Err(e) => { + error!("Failed to process statement: {}", e); + Ok(format!("('{}' , '')", record_id)) + } } }, - ) - .collect(); + )) + .await + .map_err(|e: Box| { + anyhow!("Failed to process statements: {}", e) + })?; let insert_query = format!( "INSERT INTO {} (id, statement) VALUES {}", From 3ecde4606ecdf4bdc53440d46a422d25dec9ce31 Mon Sep 17 00:00:00 2001 From: Isanka Date: Fri, 26 Apr 2024 13:11:22 +0900 Subject: [PATCH 3/3] working copy --- Cargo.lock | 3 ++ Cargo.toml | 1 + historical_data/Cargo.toml | 2 + historical_data/src/main.rs | 63 ++++++++++++++++++++---- src/main.rs | 98 +++++++++++++++++++++++++++++++++---- 5 files changed, 148 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43f824d..f2a17f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -962,11 +962,13 @@ dependencies = [ "env_logger", "futures", "hex", + "lazy_static", "log", "mongodb", "r2d2", "r2d2_postgres", "rayon", + "regex", "serde", "serde_json", "serde_yaml", @@ -1292,6 +1294,7 @@ dependencies = [ "env_logger", "futures", "hex", + "lazy_static", "log", "mongodb", "r2d2", diff --git a/Cargo.toml b/Cargo.toml index d97e228..c35fc55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ rayon = "1.9.0" sqlx = "0.7.4" chrono = "0.4.35" regex = "1.10.4" +lazy_static = "1.4.0" [[bin]] diff --git a/historical_data/Cargo.toml b/historical_data/Cargo.toml index 3d0b938..255f504 100644 --- a/historical_data/Cargo.toml +++ b/historical_data/Cargo.toml @@ -27,3 +27,5 @@ serde_yaml = "0.9.33" rayon = "1.9.0" sqlx = "0.7.4" chrono = "0.4.35" +regex = "1.10.4" +lazy_static = "1.4.0" diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index d93e6c5..b0c15e4 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -22,6 +22,7 @@ use std::{ error::Error, fmt, sync::{Arc, Mutex}, + time::Duration, }; use tokio::{signal, sync::mpsc}; use tokio::{ @@ -30,6 +31,30 @@ use tokio::{ }; type PgPool = Pool>; +lazy_static::lazy_static! { + static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_1: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_2: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_3: {}", e); + panic!("Invalid regular expression pattern"); + } + }; +} + #[derive(Deserialize, Clone)] struct TenantConfig { name: String, @@ -98,6 +123,23 @@ async fn run( } } +async fn connect_to_mongo(mongo_uri: &str) -> Result { + let mut retry_delay = Duration::from_secs(1); + loop { + match MongoClient::with_uri_str(mongo_uri).await { + Ok(client) => return Ok(client), + Err(e) => { + error!("Failed to connect to MongoDB: {}", e); + tokio::time::sleep(retry_delay).await; + retry_delay *= 2; + if retry_delay > Duration::from_secs(60) { + return Err(e); + } + } + } + } +} + async fn process_tenant_historical_data( tenant_config: Arc, app_state: Arc, @@ -105,7 +147,7 @@ async fn process_tenant_historical_data( start_date: NaiveDateTime, end_date: NaiveDateTime, ) -> Result<()> { - let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri) + let mongo_client = connect_to_mongo(&tenant_config.mongo_uri) .await .map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?; let mongo_db = mongo_client.database(&tenant_config.mongo_db); @@ -311,13 +353,13 @@ fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) - } async fn process_statement(statement: &str) -> Result> { - // Step 1: Replace all double consecutive backslashes with four backslashes - let re1 = Regex::new(r"\\{2}")?; - let output1 = re1.replace_all(statement, "\\\\\\\\").to_string(); + // Replace all double consecutive backslashes with four backslashes + let output1 = BACKSLASH_REGEX_1 + .replace_all(statement, "\\\\\\\\") + .to_string(); - // Step 2: Replace all single backslashes with two backslashes - let re2 = Regex::new(r"\\(?:\\\\)*")?; - let output2 = re2.replace_all(&output1, |caps: ®ex::Captures| { + // Replace all single backslashes with two backslashes + let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { if caps[0].len() % 2 == 1 { "\\\\".to_string() } else { @@ -325,9 +367,10 @@ async fn process_statement(statement: &str) -> Result regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_1: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_2: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_3: {}", e); + panic!("Invalid regular expression pattern"); + } + }; +} + #[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, @@ -71,12 +96,30 @@ async fn run(app_state: Arc) -> Result<(), Box> { Ok(()) } +async fn connect_to_mongo(mongo_uri: &str) -> Result { + let mut retry_delay = Duration::from_secs(1); + loop { + match MongoClient::with_uri_str(mongo_uri).await { + Ok(client) => return Ok(client), + Err(e) => { + error!("Failed to connect to MongoDB: {}", e); + tokio::time::sleep(retry_delay).await; + retry_delay *= 2; + if retry_delay > Duration::from_secs(60) { + return Err(e); + } + } + } + } +} + async fn process_tenant_records( tenant_config: TenantConfig, app_state: Arc, pool_index: usize, ) -> Result<()> { - let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; + let mongo_client = connect_to_mongo(&tenant_config.mongo_uri).await?; + //let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; // println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri); // println!("<<-- mongo_client {:?}", mongo_client); let mongo_db = mongo_client.database(&tenant_config.mongo_db); @@ -118,12 +161,12 @@ async fn process_tenant_records( { let record_id = doc.get("_id").and_then(|id| id.as_object_id()); let statement = doc.get("statement").and_then(|s| s.as_document()); - + match (record_id, statement) { (Some(record_id), Some(statement)) => { let record_id_str = record_id.to_hex(); info!("Record ID: {}", record_id_str); - + let mut statement = statement.to_owned(); if let Some(actor) = statement.get_mut("actor").and_then(|a| a.as_document_mut()) @@ -196,7 +239,7 @@ async fn process_tenant_records( } else { warn!("Missing 'actor' field in 'statement'"); } - + let statement_str = match to_string(&statement) { Ok(s) => s, Err(e) => { @@ -204,7 +247,7 @@ async fn process_tenant_records( continue; } }; - + insert_into_clickhouse( &ch_pool, &statement_str, @@ -224,7 +267,7 @@ async fn process_tenant_records( warn!("Missing both '_id' and 'statement' fields in the document"); } } - + if let Some(resume_token) = change_stream.resume_token() { let token_bytes = match bson::to_vec(&resume_token) { Ok(bytes) => bytes, @@ -234,7 +277,7 @@ async fn process_tenant_records( } }; let tenant_name = tenant_config.name.clone(); - + let pg_pool = app_state.postgres_pool.clone(); match pg_pool.get().await { Ok(pg_conn) => { @@ -284,6 +327,36 @@ fn anonymize_data(data: &Bson, encryption_salt: &str, tenant_name: &str) -> Stri hex::encode(result) } +async fn process_statement(statement: &str) -> Result> { + // Replace all double consecutive backslashes with four backslashes + let output1 = BACKSLASH_REGEX_1 + .replace_all(statement, "\\\\\\\\") + .to_string(); + + // Replace all single backslashes with two backslashes + let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { + if caps[0].len() % 2 == 1 { + "\\\\".to_string() + } else { + caps[0].to_string() + } + }); + + // Replace all more than four backslashes with four backslashes + let output3 = BACKSLASH_REGEX_3 + .replace_all(&output2, "\\\\\\\\") + .to_string(); + + let trimmed_statement = output3 + .trim_start_matches('"') + .trim_end_matches('"') + .replace("\\'", "\\\\'") + .replace("'", "\\'") + .to_string(); + + Ok(trimmed_statement) +} + async fn insert_into_clickhouse( ch_pool: &ClickhousePool, statement_str: &str, @@ -293,7 +366,14 @@ async fn insert_into_clickhouse( ) { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let escaped_statement_str = statement_str.replace("'", "\\\'"); + // let escaped_statement_str = statement_str.replace("'", "\\\'"); + let escaped_statement_str = match process_statement(statement_str).await { + Ok(escaped_str) => escaped_str, + Err(e) => { + error!("Failed to process statement: {}", e); + return; + } + }; let insert_query = format!( "INSERT INTO {} (id, statement) VALUES ('{}', '{}')",