diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index cf0b816..ec035a6 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -122,10 +122,10 @@ async fn process_tenant_historical_data( let filter_clone = filter.clone(); let total_docs = mongo_collection.count_documents(filter_clone, None).await?; - info!( - "Total documents in {}: {}", - tenant_config.mongo_collection, total_docs - ); + // info!( + // "Total documents in {}: {}", + // tenant_config.mongo_collection, total_docs + // ); let batch_size = app_state.config.batch_size; let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64; @@ -174,12 +174,60 @@ async fn process_tenant_historical_data( .and_then(|acc| acc.as_document_mut()) { if let Some(name) = account.get_mut("name") { - let anonymized_name = anonymize_data( - name, - &app_state.config.encryption_salt, - &tenant_config.name, - ); - *name = bson::Bson::String(anonymized_name); + if let bson::Bson::String(name_str) = name { + let anonymized_name = if name_str.contains(':') { + let parts: Vec<&str> = + name_str.split(':').collect(); + info!( + "{}", + &bson::Bson::String(parts[1].to_string()) + ); + if parts.len() == 2 { + anonymize_data( + &bson::Bson::String(parts[1].to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } else { + anonymize_data( + &bson::Bson::String(name_str.to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } + } else if name_str.contains('@') { + let parts: Vec<&str> = + name_str.split('@').collect(); + info!( + "{}", + &bson::Bson::String(parts[0].to_string()) + ); + if parts.len() == 2 { + anonymize_data( + &bson::Bson::String(parts[0].to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } else { + anonymize_data( + &bson::Bson::String(name_str.to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } + } else { + info!( + "{}", + &bson::Bson::String(name_str.to_string()) + ); + anonymize_data( + &bson::Bson::String(name_str.to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + }; + *name = bson::Bson::String(anonymized_name); + } } } } @@ -559,6 +607,7 @@ async fn main() -> Result<(), Box> { } } _ = shutdown_rx => { + println!("Program finished, shutting down gracefully.."); info!("Program finished, shutting down gracefully..."); } run_result = run_handle => { diff --git a/pg_init.psql b/pg_init.psql index 5177813..e220185 100644 --- a/pg_init.psql +++ b/pg_init.psql @@ -3,4 +3,14 @@ CREATE TABLE resume_token ( token BYTEA UNIQUE, tenant_name VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + + +CREATE TABLE failed_batches ( + ID SERIAL PRIMARY KEY, + tenant_name TEXT, + clickhouse_db TEXT, + clickhouse_table TEXT, + failed_batch TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 9559058..881d542 100644 --- a/src/main.rs +++ b/src/main.rs @@ -138,15 +138,63 @@ async fn process_tenant_records( .and_then(|acc| acc.as_document_mut()) { if let Some(name) = account.get_mut("name") { - let anonymized_name = anonymize_data( - name, - &app_state.config.encryption_salt, - &tenant_config.name, - ); - *name = Bson::String(anonymized_name); - info!("<<-- Modified statement: {:?}", statement); - } else { - warn!("Missing 'name' field in 'actor.account'"); + if let bson::Bson::String(name_str) = name { + let anonymized_name = if name_str.contains(':') { + let parts: Vec<&str> = + name_str.split(':').collect(); + info!( + "{}", + &bson::Bson::String(parts[1].to_string()) + ); + if parts.len() == 2 { + anonymize_data( + &bson::Bson::String(parts[1].to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } else { + anonymize_data( + &bson::Bson::String(name_str.to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } + } else if name_str.contains('@') { + let parts: Vec<&str> = + name_str.split('@').collect(); + info!( + "{}", + &bson::Bson::String(parts[0].to_string()) + ); + if parts.len() == 2 { + anonymize_data( + &bson::Bson::String(parts[0].to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } else { + anonymize_data( + &bson::Bson::String(name_str.to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + } + } else { + info!( + "{}", + &bson::Bson::String(name_str.to_string()) + ); + anonymize_data( + &bson::Bson::String(name_str.to_string()), + &app_state.config.encryption_salt, + &tenant_config.name, + ) + }; + *name = bson::Bson::String(anonymized_name); + // info!("<<-- Modified statement: {:?}", statement); + } else { + warn!("Missing 'name' field in 'actor.account'"); + } } } else { warn!("Missing 'account' field in 'actor'"); @@ -162,7 +210,7 @@ async fn process_tenant_records( continue; } }; - info!("Inserting statement into ClickHouse: {}", statement_str); + // info!("Inserting statement into ClickHouse: {}", statement_str); insert_into_clickhouse( &ch_pool,