Skip to content

Commit

Permalink
Merge pull request #30 from isankadn/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
isankadn authored Apr 26, 2024
2 parents b88324d + 3ecde46 commit e138544
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 32 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ serde_yaml = "0.9.33"
rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"
regex = "1.10.4"
lazy_static = "1.4.0"


[[bin]]
Expand Down
2 changes: 2 additions & 0 deletions historical_data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ serde_yaml = "0.9.33"
rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"
regex = "1.10.4"
lazy_static = "1.4.0"
141 changes: 120 additions & 21 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use mongodb::{
Client as MongoClient,
};
use rayon::prelude::*;
use regex::Regex;
use serde::Deserialize;
use serde_json::to_string;
use sha2::{Digest, Sha256};
Expand All @@ -21,6 +22,7 @@ use std::{
error::Error,
fmt,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{signal, sync::mpsc};
use tokio::{
Expand All @@ -29,6 +31,30 @@ use tokio::{
};
type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_1: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_2: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_3: {}", e);
panic!("Invalid regular expression pattern");
}
};
}

#[derive(Deserialize, Clone)]
struct TenantConfig {
name: String,
Expand Down Expand Up @@ -97,20 +123,37 @@ async fn run(
}
}

async fn connect_to_mongo(mongo_uri: &str) -> Result<MongoClient, mongodb::error::Error> {
let mut retry_delay = Duration::from_secs(1);
loop {
match MongoClient::with_uri_str(mongo_uri).await {
Ok(client) => return Ok(client),
Err(e) => {
error!("Failed to connect to MongoDB: {}", e);
tokio::time::sleep(retry_delay).await;
retry_delay *= 2;
if retry_delay > Duration::from_secs(60) {
return Err(e);
}
}
}
}
}

async fn process_tenant_historical_data(
tenant_config: Arc<TenantConfig>,
app_state: Arc<AppState>,
pool_index: usize,
start_date: NaiveDateTime,
end_date: NaiveDateTime,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri)
let mongo_client = connect_to_mongo(&tenant_config.mongo_uri)
.await
.map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?;
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
let mongo_collection = mongo_db.collection::<Document>(&tenant_config.mongo_collection);
println!("start_date----------- {}", start_date);
println!("end_date----------- {}", end_date);
info!("start_date----------- {}", start_date);
info!("end_date----------- {}", end_date);
let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone());
let start_datetime = DateTime::<Utc>::from_utc(start_date, Utc);
let end_datetime = DateTime::<Utc>::from_utc(end_date, Utc);
Expand All @@ -131,6 +174,34 @@ async fn process_tenant_historical_data(

let batch_size = app_state.config.batch_size;
let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64;
let full_table_name = format!(
"{}.{}",
tenant_config.clickhouse_db, tenant_config.clickhouse_table
);
let drop_query = format!("DROP TABLE IF EXISTS {}_dedup", full_table_name);

let client = match ch_pool.get_handle().await {
Ok(mut client) => {
match client.execute(drop_query.as_str()).await {
Ok(_) => println!("Table dropped successfully"),
Err(e) => {
// Check if the error is related to the table not existing
if e.to_string().contains("doesn't exist") {
println!("Table does not exist, skipping DROP TABLE");
} else {
eprintln!("Error dropping table: {}", e);
// Handle other errors appropriately
}
}
}
client
}
Err(e) => {
let err_msg = format!("Failed to get client from ClickHouse pool: {}", e);
error!("{}", err_msg);
return Err(anyhow!(err_msg));
}
};

let batches: Vec<_> = (0..num_batches)
.into_par_iter()
Expand Down Expand Up @@ -281,6 +352,36 @@ fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) -
hex::encode(result)
}

async fn process_statement(statement: &str) -> Result<String, Box<dyn Error + Send + Sync>> {
// Replace all double consecutive backslashes with four backslashes
let output1 = BACKSLASH_REGEX_1
.replace_all(statement, "\\\\\\\\")
.to_string();

// Replace all single backslashes with two backslashes
let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: &regex::Captures| {
if caps[0].len() % 2 == 1 {
"\\\\".to_string()
} else {
caps[0].to_string()
}
});

// Replace all more than four backslashes with four backslashes
let output3 = BACKSLASH_REGEX_3
.replace_all(&output2, "\\\\\\\\")
.to_string();

let trimmed_statement = output3
.trim_start_matches('"')
.trim_end_matches('"')
.replace("\\'", "\\\\'")
.replace("'", "\\'")
.to_string();

Ok(trimmed_statement)
}

async fn insert_into_clickhouse(
ch_pool: &ClickhousePool,
bulk_insert_values: &[(String, String)],
Expand All @@ -304,26 +405,24 @@ async fn insert_into_clickhouse(
// info!("bulk_insert_values: {:?}", bulk_insert_values);
// println!("bulk_insert_values: {:?}", bulk_insert_values);
while retry_count < max_retries {
let insert_data: Vec<String> = bulk_insert_values
.iter()
.map(
|(record_id, statement)| match serde_json::to_string(statement) {
Ok(serialized_statement) => {
let trimmed_statement = statement
.trim_start_matches('"')
.trim_end_matches('"')
.replace("'", "\\'")
.replace("\\", "\\");
// println!("Inserting record: {}", trimmed_statement);
format!("('{}' , '{}')", record_id, trimmed_statement)
}
Err(e) => {
error!("Failed to serialize JSON: {}", e);
format!("('{}' , '')", record_id)
let insert_data: Vec<String> =
futures::future::try_join_all(bulk_insert_values.iter().map(
|(record_id, statement)| async move {
match process_statement(statement).await {
Ok(processed_statement) => {
Ok(format!("('{}' , '{}')", record_id, processed_statement))
}
Err(e) => {
error!("Failed to process statement: {}", e);
Ok(format!("('{}' , '')", record_id))
}
}
},
)
.collect();
))
.await
.map_err(|e: Box<dyn std::error::Error + Send + Sync>| {
anyhow!("Failed to process statements: {}", e)
})?;

let insert_query = format!(
"INSERT INTO {} (id, statement) VALUES {}",
Expand Down
Loading

0 comments on commit e138544

Please sign in to comment.