Skip to content

Commit

Permalink
working copy
Browse files Browse the repository at this point in the history
  • Loading branch information
isankadn committed Apr 26, 2024
1 parent b846af0 commit 3ecde46
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 19 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"
regex = "1.10.4"
lazy_static = "1.4.0"


[[bin]]
Expand Down
2 changes: 2 additions & 0 deletions historical_data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ serde_yaml = "0.9.33"
rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"
regex = "1.10.4"
lazy_static = "1.4.0"
63 changes: 53 additions & 10 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
error::Error,
fmt,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{signal, sync::mpsc};
use tokio::{
Expand All @@ -30,6 +31,30 @@ use tokio::{
};
type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_1: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_2: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_3: {}", e);
panic!("Invalid regular expression pattern");
}
};
}

#[derive(Deserialize, Clone)]
struct TenantConfig {
name: String,
Expand Down Expand Up @@ -98,14 +123,31 @@ async fn run(
}
}

async fn connect_to_mongo(mongo_uri: &str) -> Result<MongoClient, mongodb::error::Error> {
let mut retry_delay = Duration::from_secs(1);
loop {
match MongoClient::with_uri_str(mongo_uri).await {
Ok(client) => return Ok(client),
Err(e) => {
error!("Failed to connect to MongoDB: {}", e);
tokio::time::sleep(retry_delay).await;
retry_delay *= 2;
if retry_delay > Duration::from_secs(60) {
return Err(e);
}
}
}
}
}

async fn process_tenant_historical_data(
tenant_config: Arc<TenantConfig>,
app_state: Arc<AppState>,
pool_index: usize,
start_date: NaiveDateTime,
end_date: NaiveDateTime,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri)
let mongo_client = connect_to_mongo(&tenant_config.mongo_uri)
.await
.map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?;
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
Expand Down Expand Up @@ -311,23 +353,24 @@ fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) -
}

async fn process_statement(statement: &str) -> Result<String, Box<dyn Error + Send + Sync>> {
// Step 1: Replace all double consecutive backslashes with four backslashes
let re1 = Regex::new(r"\\{2}")?;
let output1 = re1.replace_all(statement, "\\\\\\\\").to_string();
// Replace all double consecutive backslashes with four backslashes
let output1 = BACKSLASH_REGEX_1
.replace_all(statement, "\\\\\\\\")
.to_string();

// Step 2: Replace all single backslashes with two backslashes
let re2 = Regex::new(r"\\(?:\\\\)*")?;
let output2 = re2.replace_all(&output1, |caps: &regex::Captures| {
// Replace all single backslashes with two backslashes
let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: &regex::Captures| {
if caps[0].len() % 2 == 1 {
"\\\\".to_string()
} else {
caps[0].to_string()
}
});

// Step 3: Replace all more than four backslashes with four backslashes
let re3 = Regex::new(r"\\{4,}")?;
let output3 = re3.replace_all(&output2, "\\\\\\\\").to_string();
// Replace all more than four backslashes with four backslashes
let output3 = BACKSLASH_REGEX_3
.replace_all(&output2, "\\\\\\\\")
.to_string();

let trimmed_statement = output3
.trim_start_matches('"')
Expand Down
98 changes: 89 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,38 @@ use mongodb::{
options::ChangeStreamOptions,
Client as MongoClient,
};
use regex::Regex;
use serde::Deserialize;
use serde_json::{to_string, Value};
use sha2::{Digest, Sha256};
use std::{env, error::Error, sync::Arc};
use std::{env, error::Error, sync::Arc, time::Duration};
use tokio::task;
use tokio_postgres::{Client as PostgresClient, NoTls};

lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_1: Regex = match Regex::new(r"\\{2}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_1: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_2: Regex = match Regex::new(r"\\(?:\\\\)*") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_2: {}", e);
panic!("Invalid regular expression pattern");
}
};
static ref BACKSLASH_REGEX_3: Regex = match Regex::new(r"\\{4,}") {
Ok(regex) => regex,
Err(e) => {
error!("Failed to compile BACKSLASH_REGEX_3: {}", e);
panic!("Invalid regular expression pattern");
}
};
}

#[derive(Deserialize, Clone, Debug)]
struct TenantConfig {
name: String,
Expand Down Expand Up @@ -71,12 +96,30 @@ async fn run(app_state: Arc<AppState>) -> Result<(), Box<dyn Error>> {
Ok(())
}

async fn connect_to_mongo(mongo_uri: &str) -> Result<MongoClient, mongodb::error::Error> {
let mut retry_delay = Duration::from_secs(1);
loop {
match MongoClient::with_uri_str(mongo_uri).await {
Ok(client) => return Ok(client),
Err(e) => {
error!("Failed to connect to MongoDB: {}", e);
tokio::time::sleep(retry_delay).await;
retry_delay *= 2;
if retry_delay > Duration::from_secs(60) {
return Err(e);
}
}
}
}
}

async fn process_tenant_records(
tenant_config: TenantConfig,
app_state: Arc<AppState>,
pool_index: usize,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?;
let mongo_client = connect_to_mongo(&tenant_config.mongo_uri).await?;
//let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?;
// println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri);
// println!("<<-- mongo_client {:?}", mongo_client);
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
Expand Down Expand Up @@ -118,12 +161,12 @@ async fn process_tenant_records(
{
let record_id = doc.get("_id").and_then(|id| id.as_object_id());
let statement = doc.get("statement").and_then(|s| s.as_document());

match (record_id, statement) {
(Some(record_id), Some(statement)) => {
let record_id_str = record_id.to_hex();
info!("Record ID: {}", record_id_str);

let mut statement = statement.to_owned();
if let Some(actor) =
statement.get_mut("actor").and_then(|a| a.as_document_mut())
Expand Down Expand Up @@ -196,15 +239,15 @@ async fn process_tenant_records(
} else {
warn!("Missing 'actor' field in 'statement'");
}

let statement_str = match to_string(&statement) {
Ok(s) => s,
Err(e) => {
error!("Failed to convert statement to string: {}", e);
continue;
}
};

insert_into_clickhouse(
&ch_pool,
&statement_str,
Expand All @@ -224,7 +267,7 @@ async fn process_tenant_records(
warn!("Missing both '_id' and 'statement' fields in the document");
}
}

if let Some(resume_token) = change_stream.resume_token() {
let token_bytes = match bson::to_vec(&resume_token) {
Ok(bytes) => bytes,
Expand All @@ -234,7 +277,7 @@ async fn process_tenant_records(
}
};
let tenant_name = tenant_config.name.clone();

let pg_pool = app_state.postgres_pool.clone();
match pg_pool.get().await {
Ok(pg_conn) => {
Expand Down Expand Up @@ -284,6 +327,36 @@ fn anonymize_data(data: &Bson, encryption_salt: &str, tenant_name: &str) -> Stri
hex::encode(result)
}

async fn process_statement(statement: &str) -> Result<String, Box<dyn Error + Send + Sync>> {
// Replace all double consecutive backslashes with four backslashes
let output1 = BACKSLASH_REGEX_1
.replace_all(statement, "\\\\\\\\")
.to_string();

// Replace all single backslashes with two backslashes
let output2 = BACKSLASH_REGEX_2.replace_all(&output1, |caps: &regex::Captures| {
if caps[0].len() % 2 == 1 {
"\\\\".to_string()
} else {
caps[0].to_string()
}
});

// Replace all more than four backslashes with four backslashes
let output3 = BACKSLASH_REGEX_3
.replace_all(&output2, "\\\\\\\\")
.to_string();

let trimmed_statement = output3
.trim_start_matches('"')
.trim_end_matches('"')
.replace("\\'", "\\\\'")
.replace("'", "\\'")
.to_string();

Ok(trimmed_statement)
}

async fn insert_into_clickhouse(
ch_pool: &ClickhousePool,
statement_str: &str,
Expand All @@ -293,7 +366,14 @@ async fn insert_into_clickhouse(
) {
let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table);

let escaped_statement_str = statement_str.replace("'", "\\\'");
// let escaped_statement_str = statement_str.replace("'", "\\\'");
let escaped_statement_str = match process_statement(statement_str).await {
Ok(escaped_str) => escaped_str,
Err(e) => {
error!("Failed to process statement: {}", e);
return;
}
};

let insert_query = format!(
"INSERT INTO {} (id, statement) VALUES ('{}', '{}')",
Expand Down

0 comments on commit 3ecde46

Please sign in to comment.