Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #30

Merged
merged 3 commits into from
Apr 26, 2024
Merged

Dev #30

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ serde_yaml = "0.9.33"
rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"
regex = "1.10.4"
lazy_static = "1.4.0"


[[bin]]
Expand Down
2 changes: 2 additions & 0 deletions historical_data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ serde_yaml = "0.9.33"
rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"
regex = "1.10.4"
lazy_static = "1.4.0"
141 changes: 120 additions & 21 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use mongodb::{
Client as MongoClient,
};
use rayon::prelude::*;
use regex::Regex;
use serde::Deserialize;
use serde_json::to_string;
use sha2::{Digest, Sha256};
Expand All @@ -21,6 +22,7 @@ use std::{
error::Error,
fmt,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{signal, sync::mpsc};
use tokio::{
Expand All @@ -29,6 +31,30 @@ use tokio::{
};
type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_1: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_2: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_3: {}", e);
panic!("Invalid regular expression pattern");
}
};
}

#[derive(Deserialize, Clone)]
struct TenantConfig {
name: String,
Expand Down Expand Up @@ -97,20 +123,37 @@ async fn run(
}
}

async fn connect_to_mongo(mongo_uri: &str) -> Result<MongoClient, mongodb::error::Error> {
let mut retry_delay = Duration::from_secs(1);
loop {
match MongoClient::with_uri_str(mongo_uri).await {
Ok(client) => return Ok(client),
Err(e) => {
error!("Failed to connect to MongoDB: {}", e);
tokio::time::sleep(retry_delay).await;
retry_delay *= 2;
if retry_delay > Duration::from_secs(60) {
return Err(e);
}
}
}
}
}

async fn process_tenant_historical_data(
tenant_config: Arc<TenantConfig>,
app_state: Arc<AppState>,
pool_index: usize,
start_date: NaiveDateTime,
end_date: NaiveDateTime,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri)
let mongo_client = connect_to_mongo(&tenant_config.mongo_uri)
.await
.map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?;
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
let mongo_collection = mongo_db.collection::<Document>(&tenant_config.mongo_collection);
println!("start_date----------- {}", start_date);
println!("end_date----------- {}", end_date);
info!("start_date----------- {}", start_date);
info!("end_date----------- {}", end_date);
let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone());
let start_datetime = DateTime::<Utc>::from_utc(start_date, Utc);
let end_datetime = DateTime::<Utc>::from_utc(end_date, Utc);
Expand All @@ -131,6 +174,34 @@ async fn process_tenant_historical_data(

let batch_size = app_state.config.batch_size;
let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64;
let full_table_name = format!(
"{}.{}",
tenant_config.clickhouse_db, tenant_config.clickhouse_table
);
let drop_query = format!("DROP TABLE IF EXISTS {}_dedup", full_table_name);

let client = match ch_pool.get_handle().await {
Ok(mut client) => {
match client.execute(drop_query.as_str()).await {
Ok(_) => println!("Table dropped successfully"),
Err(e) => {
// Check if the error is related to the table not existing
if e.to_string().contains("doesn't exist") {
println!("Table does not exist, skipping DROP TABLE");
} else {
eprintln!("Error dropping table: {}", e);
// Handle other errors appropriately
}
}
}
client
}
Err(e) => {
let err_msg = format!("Failed to get client from ClickHouse pool: {}", e);
error!("{}", err_msg);
return Err(anyhow!(err_msg));
}
};

let batches: Vec<_> = (0..num_batches)
.into_par_iter()
Expand Down Expand Up @@ -281,6 +352,36 @@ fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) -
hex::encode(result)
}

async fn process_statement(statement: &str) -> Result<String, Box<dyn Error + Send + Sync>> {
// Replace all double consecutive backslashes with four backslashes
let output1 = BACKSLASH_REGEX_1
.replace_all(statement, "\\\\\\\\")
.to_string();

// Replace all single backslashes with two backslashes
let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: &regex::Captures| {
if caps[0].len() % 2 == 1 {
"\\\\".to_string()
} else {
caps[0].to_string()
}
});

// Replace all more than four backslashes with four backslashes
let output3 = BACKSLASH_REGEX_3
.replace_all(&output2, "\\\\\\\\")
.to_string();

let trimmed_statement = output3
.trim_start_matches('"')
.trim_end_matches('"')
.replace("\\'", "\\\\'")
.replace("'", "\\'")
.to_string();

Ok(trimmed_statement)
}

async fn insert_into_clickhouse(
ch_pool: &ClickhousePool,
bulk_insert_values: &[(String, String)],
Expand All @@ -304,26 +405,24 @@ async fn insert_into_clickhouse(
// info!("bulk_insert_values: {:?}", bulk_insert_values);
// println!("bulk_insert_values: {:?}", bulk_insert_values);
while retry_count < max_retries {
let insert_data: Vec<String> = bulk_insert_values
.iter()
.map(
|(record_id, statement)| match serde_json::to_string(statement) {
Ok(serialized_statement) => {
let trimmed_statement = statement
.trim_start_matches('"')
.trim_end_matches('"')
.replace("'", "\\'")
.replace("\\", "\\");
// println!("Inserting record: {}", trimmed_statement);
format!("('{}' , '{}')", record_id, trimmed_statement)
}
Err(e) => {
error!("Failed to serialize JSON: {}", e);
format!("('{}' , '')", record_id)
let insert_data: Vec<String> =
futures::future::try_join_all(bulk_insert_values.iter().map(
|(record_id, statement)| async move {
match process_statement(statement).await {
Ok(processed_statement) => {
Ok(format!("('{}' , '{}')", record_id, processed_statement))
}
Err(e) => {
error!("Failed to process statement: {}", e);
Ok(format!("('{}' , '')", record_id))
}
}
},
)
.collect();
))
.await
.map_err(|e: Box<dyn std::error::Error + Send + Sync>| {
anyhow!("Failed to process statements: {}", e)
})?;

let insert_query = format!(
"INSERT INTO {} (id, statement) VALUES {}",
Expand Down
Loading
Loading