From 3ecde4606ecdf4bdc53440d46a422d25dec9ce31 Mon Sep 17 00:00:00 2001 From: Isanka Date: Fri, 26 Apr 2024 13:11:22 +0900 Subject: [PATCH] working copy --- Cargo.lock | 3 ++ Cargo.toml | 1 + historical_data/Cargo.toml | 2 + historical_data/src/main.rs | 63 ++++++++++++++++++++---- src/main.rs | 98 +++++++++++++++++++++++++++++++++---- 5 files changed, 148 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43f824d..f2a17f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -962,11 +962,13 @@ dependencies = [ "env_logger", "futures", "hex", + "lazy_static", "log", "mongodb", "r2d2", "r2d2_postgres", "rayon", + "regex", "serde", "serde_json", "serde_yaml", @@ -1292,6 +1294,7 @@ dependencies = [ "env_logger", "futures", "hex", + "lazy_static", "log", "mongodb", "r2d2", diff --git a/Cargo.toml b/Cargo.toml index d97e228..c35fc55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ rayon = "1.9.0" sqlx = "0.7.4" chrono = "0.4.35" regex = "1.10.4" +lazy_static = "1.4.0" [[bin]] diff --git a/historical_data/Cargo.toml b/historical_data/Cargo.toml index 3d0b938..255f504 100644 --- a/historical_data/Cargo.toml +++ b/historical_data/Cargo.toml @@ -27,3 +27,5 @@ serde_yaml = "0.9.33" rayon = "1.9.0" sqlx = "0.7.4" chrono = "0.4.35" +regex = "1.10.4" +lazy_static = "1.4.0" diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index d93e6c5..b0c15e4 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -22,6 +22,7 @@ use std::{ error::Error, fmt, sync::{Arc, Mutex}, + time::Duration, }; use tokio::{signal, sync::mpsc}; use tokio::{ @@ -30,6 +31,30 @@ use tokio::{ }; type PgPool = Pool>; +lazy_static::lazy_static! { + static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_1: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_2: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_3: {}", e); + panic!("Invalid regular expression pattern"); + } + }; +} + #[derive(Deserialize, Clone)] struct TenantConfig { name: String, @@ -98,6 +123,23 @@ async fn run( } } +async fn connect_to_mongo(mongo_uri: &str) -> Result { + let mut retry_delay = Duration::from_secs(1); + loop { + match MongoClient::with_uri_str(mongo_uri).await { + Ok(client) => return Ok(client), + Err(e) => { + error!("Failed to connect to MongoDB: {}", e); + tokio::time::sleep(retry_delay).await; + retry_delay *= 2; + if retry_delay > Duration::from_secs(60) { + return Err(e); + } + } + } + } +} + async fn process_tenant_historical_data( tenant_config: Arc, app_state: Arc, @@ -105,7 +147,7 @@ async fn process_tenant_historical_data( start_date: NaiveDateTime, end_date: NaiveDateTime, ) -> Result<()> { - let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri) + let mongo_client = connect_to_mongo(&tenant_config.mongo_uri) .await .map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?; let mongo_db = mongo_client.database(&tenant_config.mongo_db); @@ -311,13 +353,13 @@ fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) - } async fn process_statement(statement: &str) -> Result> { - // Step 1: Replace all double consecutive backslashes with four backslashes - let re1 = Regex::new(r"\\{2}")?; - let output1 = re1.replace_all(statement, "\\\\\\\\").to_string(); + // Replace all double consecutive backslashes with four backslashes + let output1 = BACKSLASH_REGEX_1 + .replace_all(statement, "\\\\\\\\") + .to_string(); - // Step 2: Replace all single backslashes with two backslashes - let re2 = Regex::new(r"\\(?:\\\\)*")?; - let output2 = re2.replace_all(&output1, |caps: ®ex::Captures| { + // Replace all single backslashes with two backslashes + let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { if caps[0].len() % 2 == 1 { "\\\\".to_string() } else { @@ -325,9 +367,10 @@ async fn process_statement(statement: &str) -> Result regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_1: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_2: {}", e); + panic!("Invalid regular expression pattern"); + } + }; + static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") { + Ok(regex) => regex, + Err(e) => { + error!("Failed to compile BACKSLASH_REGEX_3: {}", e); + panic!("Invalid regular expression pattern"); + } + }; +} + #[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, @@ -71,12 +96,30 @@ async fn run(app_state: Arc) -> Result<(), Box> { Ok(()) } +async fn connect_to_mongo(mongo_uri: &str) -> Result { + let mut retry_delay = Duration::from_secs(1); + loop { + match MongoClient::with_uri_str(mongo_uri).await { + Ok(client) => return Ok(client), + Err(e) => { + error!("Failed to connect to MongoDB: {}", e); + tokio::time::sleep(retry_delay).await; + retry_delay *= 2; + if retry_delay > Duration::from_secs(60) { + return Err(e); + } + } + } + } +} + async fn process_tenant_records( tenant_config: TenantConfig, app_state: Arc, pool_index: usize, ) -> Result<()> { - let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; + let mongo_client = connect_to_mongo(&tenant_config.mongo_uri).await?; + //let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; // println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri); // println!("<<-- mongo_client {:?}", mongo_client); let mongo_db = mongo_client.database(&tenant_config.mongo_db); @@ -118,12 +161,12 @@ async fn process_tenant_records( { let record_id = doc.get("_id").and_then(|id| id.as_object_id()); let statement = doc.get("statement").and_then(|s| s.as_document()); - + match (record_id, statement) { (Some(record_id), Some(statement)) => { let record_id_str = record_id.to_hex(); info!("Record ID: {}", record_id_str); - + let mut statement = statement.to_owned(); if let Some(actor) = statement.get_mut("actor").and_then(|a| a.as_document_mut()) @@ -196,7 +239,7 @@ async fn process_tenant_records( } else { warn!("Missing 'actor' field in 'statement'"); } - + let statement_str = match to_string(&statement) { Ok(s) => s, Err(e) => { @@ -204,7 +247,7 @@ async fn process_tenant_records( continue; } }; - + insert_into_clickhouse( &ch_pool, &statement_str, @@ -224,7 +267,7 @@ async fn process_tenant_records( warn!("Missing both '_id' and 'statement' fields in the document"); } } - + if let Some(resume_token) = change_stream.resume_token() { let token_bytes = match bson::to_vec(&resume_token) { Ok(bytes) => bytes, @@ -234,7 +277,7 @@ async fn process_tenant_records( } }; let tenant_name = tenant_config.name.clone(); - + let pg_pool = app_state.postgres_pool.clone(); match pg_pool.get().await { Ok(pg_conn) => { @@ -284,6 +327,36 @@ fn anonymize_data(data: &Bson, encryption_salt: &str, tenant_name: &str) -> Stri hex::encode(result) } +async fn process_statement(statement: &str) -> Result> { + // Replace all double consecutive backslashes with four backslashes + let output1 = BACKSLASH_REGEX_1 + .replace_all(statement, "\\\\\\\\") + .to_string(); + + // Replace all single backslashes with two backslashes + let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: ®ex::Captures| { + if caps[0].len() % 2 == 1 { + "\\\\".to_string() + } else { + caps[0].to_string() + } + }); + + // Replace all more than four backslashes with four backslashes + let output3 = BACKSLASH_REGEX_3 + .replace_all(&output2, "\\\\\\\\") + .to_string(); + + let trimmed_statement = output3 + .trim_start_matches('"') + .trim_end_matches('"') + .replace("\\'", "\\\\'") + .replace("'", "\\'") + .to_string(); + + Ok(trimmed_statement) +} + async fn insert_into_clickhouse( ch_pool: &ClickhousePool, statement_str: &str, @@ -293,7 +366,14 @@ async fn insert_into_clickhouse( ) { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let escaped_statement_str = statement_str.replace("'", "\\\'"); + // let escaped_statement_str = statement_str.replace("'", "\\\'"); + let escaped_statement_str = match process_statement(statement_str).await { + Ok(escaped_str) => escaped_str, + Err(e) => { + error!("Failed to process statement: {}", e); + return; + } + }; let insert_query = format!( "INSERT INTO {} (id, statement) VALUES ('{}', '{}')",