diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index 93135b9..76a7105 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -1,10 +1,9 @@ use anyhow::{anyhow, Context, Result}; -use clickhouse_rs::{Client as ClickhouseClient, Pool as ClickhousePool}; -use futures::stream::{self, StreamExt}; +use clickhouse_rs::Pool as ClickhousePool; +use futures::stream::StreamExt; use log::{error, info, warn}; use mongodb::{ - bson::{self, doc, Bson, Document}, - options::FindOptions, + bson::{self, doc, Document}, Client as MongoClient, }; @@ -15,21 +14,16 @@ use sha2::{Digest, Sha256}; use bb8::Pool; use bb8_postgres::PostgresConnectionManager; -use chrono::{DateTime, NaiveDate, NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; + use std::{ env, - error::Error, - fmt, sync::{Arc, Mutex}, time::Duration, time::Instant, }; -use tokio::{signal, sync::mpsc}; -use tokio::{ - sync::{broadcast, oneshot}, - task::JoinHandle, -}; - +use tokio::signal; +use tokio::sync::oneshot; type PgPool = Pool>; lazy_static::lazy_static! { @@ -188,7 +182,7 @@ async fn process_tenant_historical_data( let total_docs = mongo_collection .count_documents(filter.clone(), None) .await - .context("Failed to count documents in MongoDB")? as usize; // Cast to usize + .context("Failed to count documents in MongoDB")? as usize; info!("Total documents to process: {}", total_docs); let mut cursor = mongo_collection @@ -199,65 +193,103 @@ async fn process_tenant_historical_data( let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); let mut batch = Vec::with_capacity(batch_manager.get_current_size()); let mut processed_docs = 0; - + let mut failed_docs = 0; let start_time = Instant::now(); while let Some(result) = cursor.next().await { - let doc = result.context("Failed to get next document from MongoDB cursor")?; - let record_id = doc - .get("_id") - .and_then(|id| id.as_object_id()) - .ok_or_else(|| anyhow!("Document is missing _id field"))?; - let statement = doc - .get("statement") - .and_then(|s| s.as_document()) - .ok_or_else(|| anyhow!("Document is missing statement field"))?; - - let record_id_str = record_id.to_hex(); - let mut statement = statement.to_owned(); - - anonymize_statement( - &mut statement, - &app_state.config.encryption_salt, - &tenant_config.name, - )?; - - let statement_str = - to_string(&statement).context("Failed to serialize statement to JSON")?; - batch.push((record_id_str, statement_str)); - - if batch.len() >= batch_manager.get_current_size() { - let batch_start_time = Instant::now(); - - insert_into_clickhouse( - &ch_pool, - &batch, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - &app_state.pg_pool, - &tenant_config.name, - &mut batch_manager, // Pass batch_manager as mutable reference - ) - .await - .context("Failed to insert batch into ClickHouse")?; - - let batch_duration = batch_start_time.elapsed(); - batch_manager.adjust_batch_size(batch.len(), batch_duration); + match result { + Ok(doc) => { + let record_id = match doc.get("_id").and_then(|id| id.as_object_id()) { + Some(id) => id.to_hex(), + None => { + warn!("Document is missing _id field, skipping"); + failed_docs += 1; + continue; + } + }; + + let statement = match doc.get("statement").and_then(|s| s.as_document()) { + Some(s) => s.to_owned(), + None => { + warn!( + "Document {} is missing statement field, skipping", + record_id + ); + failed_docs += 1; + continue; + } + }; + + let mut statement = statement.to_owned(); + + if let Err(e) = anonymize_statement( + &mut statement, + &app_state.config.encryption_salt, + &tenant_config.name, + ) { + warn!( + "Failed to anonymize statement for document {}: {}", + record_id, e + ); + failed_docs += 1; + continue; + } - processed_docs += batch.len(); - info!( - "Processed {} out of {} documents. Current batch size: {}", - processed_docs, - total_docs, - batch_manager.get_current_size() - ); - batch.clear(); + let statement_str = match to_string(&statement) { + Ok(s) => s, + Err(e) => { + warn!( + "Failed to serialize statement to JSON for document {}: {}", + record_id, e + ); + failed_docs += 1; + continue; + } + }; + + batch.push((record_id, statement_str)); + + if batch.len() >= batch_manager.get_current_size() { + let batch_start_time = Instant::now(); + + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + &app_state.pg_pool, + &tenant_config.name, + &mut batch_manager, + ) + .await + { + error!("Failed to insert batch into ClickHouse: {}", e); + failed_docs += batch.len(); + } else { + let batch_duration = batch_start_time.elapsed(); + batch_manager.adjust_batch_size(batch.len(), batch_duration); + + processed_docs += batch.len(); + info!( + "Processed {} out of {} documents. Current batch size: {}", + processed_docs, + total_docs, + batch_manager.get_current_size() + ); + } + batch.clear(); + } + } + Err(e) => { + warn!("Error fetching document from cursor: {}", e); + failed_docs += 1; + } } } // Insert any remaining documents if !batch.is_empty() { - insert_into_clickhouse( + if let Err(e) = insert_into_clickhouse( &ch_pool, &batch, &tenant_config.clickhouse_db, @@ -267,19 +299,24 @@ async fn process_tenant_historical_data( &mut batch_manager, ) .await - .context("Failed to insert final batch into ClickHouse")?; - processed_docs += batch.len(); + { + error!("Failed to insert final batch into ClickHouse: {}", e); + failed_docs += batch.len(); + } else { + processed_docs += batch.len(); + } } let total_duration = start_time.elapsed(); info!( - "Completed processing {} documents in {:?}. Final batch size: {}", + "Completed processing. Total processed: {}, Total failed: {}, Duration: {:?}, Final batch size: {}", processed_docs, + failed_docs, total_duration, batch_manager.get_current_size() ); - if processed_docs < total_docs { + if processed_docs + failed_docs < total_docs { warn!("Some documents were skipped during processing"); } @@ -291,14 +328,55 @@ fn anonymize_statement( encryption_salt: &str, tenant_name: &str, ) -> Result<()> { - // Implement your anonymization logic here - // This is a placeholder implementation + // Create a deep copy of the statement + let mut statement_copy = statement.clone(); + + // Check if all required fields exist + if !statement_copy.contains_key("actor") + || !statement_copy + .get_document("actor")? + .contains_key("account") + || !statement_copy + .get_document("actor")? + .get_document("account")? + .contains_key("name") + { + return Err(anyhow!("Statement is missing required fields")); + } + + let name = statement_copy + .get_document("actor")? + .get_document("account")? + .get_str("name")?; + + let value_to_hash = if name.contains('@') { + name.split('@').next().unwrap_or("") + } else if name.contains(':') { + name.split(':').last().unwrap_or("") + } else { + name + }; + + if value_to_hash.is_empty() { + return Err(anyhow!("Empty value to hash for name: {}", name)); + } + let mut hasher = Sha256::new(); - hasher.update(format!("{}{}", encryption_salt, tenant_name)); - hasher.update(statement.to_string().as_bytes()); + hasher.update(encryption_salt.as_bytes()); + hasher.update(tenant_name.as_bytes()); + hasher.update(value_to_hash.as_bytes()); let result = hasher.finalize(); - statement.insert("anonymized_hash", hex::encode(result)); - println!("Anonymized statement---: {}", statement); + let hashed_value = hex::encode(result); + + // Update the copy + statement_copy + .get_document_mut("actor")? + .get_document_mut("account")? + .insert("name", hashed_value); + + // If we've made it this far without errors, update the original statement + *statement = statement_copy; + println!("Anonymized statement: {:?}", statement); Ok(()) } @@ -435,48 +513,6 @@ async fn insert_batch( Ok(()) } -async fn deduplicate_clickhouse_data( - ch_pool: &ClickhousePool, - clickhouse_db: &str, - clickhouse_table: &str, -) -> Result<()> { - let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let mut client = ch_pool - .get_handle() - .await - .context("Failed to get client from ClickHouse pool")?; - - info!("Processing duplicate data..."); - - let create_dedup_table_query = format!( - "CREATE TABLE {table}_dedup ENGINE = MergeTree() PARTITION BY toYYYYMM(created_at) PRIMARY KEY id ORDER BY (id, created_at) SETTINGS index_granularity = 8192 AS SELECT id, any(statement) AS statement, any(created_at) AS created_at FROM {table} GROUP BY id", - table = full_table_name - ); - - let drop_table_query = format!("DROP TABLE {}", full_table_name); - let rename_table_query = format!( - "RENAME TABLE {}_dedup TO {}", - full_table_name, full_table_name - ); - - client - .execute(create_dedup_table_query.as_str()) - .await - .context("Failed to create dedup table in ClickHouse")?; - - client - .execute(drop_table_query.as_str()) - .await - .context("Failed to drop original table in ClickHouse")?; - - client - .execute(rename_table_query.as_str()) - .await - .context("Failed to rename dedup table in ClickHouse")?; - - Ok(()) -} - async fn log_failed_batch( pg_pool: &PgPool, tenant_name: &str,