diff --git a/Cargo.lock b/Cargo.lock index 8ce2073..b9422e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,6 +140,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom", + "instant", + "rand", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -1053,6 +1064,7 @@ dependencies = [ "r2d2_postgres", "rayon", "regex", + "rocksdb", "serde", "serde_json", "serde_yaml", @@ -1060,6 +1072,7 @@ dependencies = [ "sqlx", "tokio", "tokio-postgres", + "tracing", ] [[package]] @@ -1166,6 +1179,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipconfig" version = "0.3.2" @@ -1422,6 +1444,7 @@ name = "mongo-to-clickhouse" version = "0.1.0" dependencies = [ "anyhow", + "backoff", "bb8", "bb8-postgres", "chrono", @@ -1446,6 +1469,7 @@ dependencies = [ "time", "tokio", "tokio-postgres", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a243475..4cbb3b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,8 @@ regex = "1.10.4" lazy_static = "1.4.0" rocksdb = "0.22.0" time = "0.3.36" +tracing = "0.1.40" +backoff = "0.4.0" diff --git a/historical_data/Cargo.toml b/historical_data/Cargo.toml index 5c11764..27e3f6c 100644 --- a/historical_data/Cargo.toml +++ b/historical_data/Cargo.toml @@ -29,4 +29,6 @@ sqlx = "0.8.0" chrono = "0.4.35" regex = "1.10.4" lazy_static = "1.4.0" +rocksdb = "0.22.0" +tracing = "0.1.40" diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index a178695..bce0762 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -1,31 +1,33 @@ -// historical_data/src/main.rs +// src/main.rs - Live data processing use anyhow::{anyhow, Context, Result}; +use chrono::{DateTime, NaiveDateTime, Utc}; use clickhouse_rs::Pool as ClickhousePool; +use config::{Config, File}; use futures::stream::StreamExt; -use log::{error, info, warn}; +// use log::{error, info, warn}; use mongodb::{ bson::{self, doc, Document}, + change_stream::event::{ChangeStreamEvent, ResumeToken}, + options::ChangeStreamOptions, Client as MongoClient, }; - use regex::Regex; +use rocksdb::{Error as RocksError, Options, DB}; use serde::Deserialize; use serde_json::to_string; use sha2::{Digest, Sha256}; +use std::collections::HashSet; +use std::path::Path; -use bb8::Pool; -use bb8_postgres::PostgresConnectionManager; -use chrono::{DateTime, NaiveDateTime, Utc}; +use std::{env, sync::Arc, time::Duration, time::Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::RwLock; +use tracing::{debug, error, info, instrument, warn}; -use std::{ - env, - sync::{Arc, Mutex}, - time::Duration, - time::Instant, -}; -use tokio::signal; -use tokio::sync::oneshot; -type PgPool = Pool>; +use std::fs; +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::{signal, sync::oneshot, task}; lazy_static::lazy_static! { static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1"); @@ -33,46 +35,129 @@ lazy_static::lazy_static! { static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3"); } -struct BatchSizeManager { - current_size: usize, - min_size: usize, - max_size: usize, - performance_threshold: f64, // in documents per second +const MAX_BATCH_SIZE: usize = 10000; +const MAX_RETRIES: u32 = 5; +const INITIAL_RETRY_DELAY: u64 = 1000; +const MAX_RETRY_COUNT: usize = 5; + +#[derive(Debug)] +pub enum StoreError { + RocksDB(RocksError), + OpenFailed, +} +struct RocksDBResumeTokenStore { + db: Arc, +} +impl std::fmt::Display for StoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StoreError::RocksDB(e) => write!(f, "RocksDB error: {}", e), + StoreError::OpenFailed => write!(f, "Failed to open database after multiple attempts"), + } + } } -impl BatchSizeManager { - fn new( - initial_size: usize, - min_size: usize, - max_size: usize, - performance_threshold: f64, - ) -> Self { - BatchSizeManager { - current_size: initial_size, - min_size, - max_size, - performance_threshold, +impl std::error::Error for StoreError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + StoreError::RocksDB(e) => Some(e), + StoreError::OpenFailed => None, } } +} - fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: std::time::Duration) { - let performance = docs_processed as f64 / time_taken.as_secs_f64(); +impl From for StoreError { + fn from(error: RocksError) -> Self { + StoreError::RocksDB(error) + } +} - if performance > self.performance_threshold { - self.current_size = (self.current_size * 2).min(self.max_size); - } else { - self.current_size = (self.current_size / 2).max(self.min_size); +impl RocksDBResumeTokenStore { + #[instrument(skip(path), err)] + pub fn new(path: &str) -> Result { + info!("Creating new RocksDBResumeTokenStore"); + let db = Self::open_db_aggressive(path)?; + Ok(Self { db: Arc::new(db) }) + } + + #[instrument(skip(path), err)] + fn open_db_aggressive(path: &str) -> Result { + let db_path = Path::new(path); + let lock_file_path = db_path.join("LOCK"); + + // Step 1: Try to open normally + match Self::try_open_db(path) { + Ok(db) => return Ok(db), + Err(e) => warn!("Failed to open RocksDB normally: {}", e), } + + // Step 2: Remove LOCK file if it exists + if lock_file_path.exists() { + info!("Removing existing LOCK file"); + if let Err(e) = fs::remove_file(&lock_file_path) { + warn!("Failed to remove LOCK file: {}", e); + } + } + + // Step 3: Try to open again after removing LOCK file + match Self::try_open_db(path) { + Ok(db) => return Ok(db), + Err(e) => warn!("Failed to open RocksDB after removing LOCK file: {}", e), + } + + // Step 4: If all else fails, delete the entire database and create a new one + warn!("Recreating the entire database"); + if db_path.exists() { + if let Err(e) = fs::remove_dir_all(db_path) { + error!("Failed to remove existing database directory: {}", e); + return Err(StoreError::OpenFailed); + } + } + + Self::try_open_db(path) } - fn get_current_size(&self) -> usize { - self.current_size + fn try_open_db(path: &str) -> Result { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_max_open_files(10_000); + opts.set_keep_log_file_num(10); + opts.set_max_total_wal_size(64 * 1024 * 1024); // 64MB + opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB + opts.set_max_write_buffer_number(3); + opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB + opts.set_level_zero_file_num_compaction_trigger(8); + opts.set_level_zero_slowdown_writes_trigger(17); + opts.set_level_zero_stop_writes_trigger(24); + opts.set_num_levels(4); + opts.set_max_bytes_for_level_base(512 * 1024 * 1024); // 512MB + opts.set_max_bytes_for_level_multiplier(8.0); + + DB::open(&opts, path).map_err(|e| { + error!("Failed to open RocksDB: {}", e); + StoreError::RocksDB(e) + }) } -} -const MAX_BATCH_SIZE: usize = 10000; -const MAX_RETRIES: u32 = 5; -const INITIAL_RETRY_DELAY: u64 = 1000; + #[instrument(skip(self, key), err)] + pub fn get_resume_token(&self, key: &str) -> Result>, StoreError> { + debug!("Getting resume token"); + self.db.get(key).map_err(|e| { + error!("Failed to get resume token: {}", e); + StoreError::from(e) + }) + } + + #[instrument(skip(self, key, value), err)] + pub fn set_resume_token(&self, key: &str, value: &[u8]) -> Result<(), StoreError> { + debug!("Setting resume token for key: {}", key); + self.db.put(key.as_bytes(), value).map_err(|e| { + error!("Failed to set resume token: {}", e); + StoreError::from(e) + }) + } + // Add other methods as needed +} #[derive(Deserialize, Clone)] struct TenantConfig { @@ -83,15 +168,14 @@ struct TenantConfig { clickhouse_uri: String, clickhouse_db: String, clickhouse_table: String, + clickhouse_table_opt_out: String, } #[derive(Deserialize, Clone)] struct AppConfig { tenants: Vec, encryption_salt: String, - batch_size: u64, - number_of_workers: usize, - pg_database_url: String, + batch_size: usize, } type ClickhousePoolType = ClickhousePool; @@ -99,7 +183,89 @@ type ClickhousePoolType = ClickhousePool; struct AppState { config: AppConfig, clickhouse_pools: Vec, - pg_pool: PgPool, + resume_token_store: Arc, + cached_hashes: Arc>, +} + +struct CachedHashSet { + data: Arc>>, + last_updated: Arc>>, +} + +impl CachedHashSet { + async fn new(ch_pool: &ClickhousePool) -> Result { + let data = Arc::new(RwLock::new(HashSet::new())); + let last_updated = Arc::new(RwLock::new(Utc::now())); + let cache = Self { data, last_updated }; + cache.refresh(ch_pool).await?; + Ok(cache) + } + + #[instrument(skip(self, ch_pool))] + async fn refresh(&self, ch_pool: &ClickhousePool) -> Result<()> { + info!("Refreshing cached HashSet"); + let mut client = ch_pool.get_handle().await?; + let query = "SELECT email, hashed_moodle_id FROM default.moodle_ids WHERE (email, version) IN ( SELECT email, MAX(version) AS max_version FROM default.moodle_ids GROUP BY email )"; + let mut cursor = client.query(query).stream(); + let mut new_data = HashSet::new(); + + while let Some(row) = cursor.next().await { + let row = row.context("Failed to fetch row")?; + let hash: String = row.get("hashed_moodle_id")?; + debug!("Inserting hashed value: {}", hash); // Added debug log + new_data.insert(hash); + } + + let mut data = self.data.write().await; + *data = new_data; + let mut last_updated = self.last_updated.write().await; + *last_updated = Utc::now(); + + info!("Cached HashSet refreshed successfully"); + debug!("Total number of hashed values: {}", data.len()); // Added debug log + Ok(()) + } + async fn contains(&self, value: &str) -> bool { + let data = self.data.read().await; + data.contains(value) + } +} + +struct BatchSizeManager { + current_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, +} + +impl BatchSizeManager { + fn new( + initial_size: usize, + min_size: usize, + max_size: usize, + performance_threshold: f64, + ) -> Self { + BatchSizeManager { + current_size: initial_size, + min_size, + max_size, + performance_threshold, + } + } + + fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: Duration) { + let performance = docs_processed as f64 / time_taken.as_secs_f64(); + + if performance > self.performance_threshold { + self.current_size = (self.current_size * 2).min(self.max_size); + } else { + self.current_size = (self.current_size / 2).max(self.min_size); + } + } + + fn get_current_size(&self) -> usize { + self.current_size // if enable this applicaitn will automatically adjust the batch size based on the system performance. + } } async fn run( @@ -167,9 +333,13 @@ async fn process_tenant_historical_data( let mongo_db = mongo_client.database(&tenant_config.mongo_db); let mongo_collection = mongo_db.collection::(&tenant_config.mongo_collection); - info!("Processing data from {} to {}", start_date, end_date); + info!( + "Processing historical data for tenant {} from {} to {}", + tenant_config.name, start_date, end_date + ); - let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone()); + let ch_pool = &app_state.clickhouse_pools[pool_index]; + let resume_token_store = &app_state.resume_token_store; let start_datetime = DateTime::::from_utc(start_date, Utc); let end_datetime = DateTime::::from_utc(end_date, Utc); @@ -191,8 +361,11 @@ async fn process_tenant_historical_data( .await .context("Failed to create MongoDB cursor")?; - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); - let mut batch = Vec::with_capacity(batch_manager.get_current_size()); + let mut batch: Vec<(String, String, String)> = Vec::with_capacity(app_state.config.batch_size); + let mut batch_start_time = Instant::now(); + let mut batch_manager = + BatchSizeManager::new(app_state.config.batch_size, 1, MAX_BATCH_SIZE, 5000.0); + let mut processed_docs = 0; let mut failed_docs = 0; let start_time = Instant::now(); @@ -221,20 +394,31 @@ async fn process_tenant_historical_data( } }; + let timestamp = match doc.get("timestamp").and_then(|ts| ts.as_datetime()) { + Some(ts) => ts, + None => { + warn!("Document is missing timestamp field, skipping"); + failed_docs += 1; + continue; + } + }; + println!("timestamp----------: {:?}", timestamp); let mut statement = statement.to_owned(); - - if let Err(e) = anonymize_statement( + let hashed_value = match anonymize_statement( &mut statement, &app_state.config.encryption_salt, &tenant_config.name, ) { - warn!( - "Failed to anonymize statement for document {}: {}", - record_id, e - ); - failed_docs += 1; - continue; - } + Ok(hashed) => hashed, + Err(e) => { + warn!( + "Failed to anonymize statement for document {}: {}", + record_id, e + ); + failed_docs += 1; + continue; + } + }; let statement_str = match to_string(&statement) { Ok(s) => s, @@ -248,28 +432,27 @@ async fn process_tenant_historical_data( } }; - batch.push((record_id, statement_str)); + batch.push((record_id, statement_str, hashed_value)); - if batch.len() >= batch_manager.get_current_size() { - let batch_start_time = Instant::now(); + let should_process = batch.len() >= batch_manager.get_current_size() + || batch_start_time.elapsed() >= Duration::from_secs(5); - if let Err(e) = insert_into_clickhouse( - &ch_pool, + if should_process { + let batch_duration = batch_start_time.elapsed(); + if let Err(e) = process_batch( + ch_pool, &batch, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - &app_state.pg_pool, - &tenant_config.name, + &tenant_config, + resume_token_store, &mut batch_manager, + batch_duration, + &app_state, ) .await { - error!("Failed to insert batch into ClickHouse: {}", e); + error!("Failed to process batch: {}", e); failed_docs += batch.len(); } else { - let batch_duration = batch_start_time.elapsed(); - batch_manager.adjust_batch_size(batch.len(), batch_duration); - processed_docs += batch.len(); info!( "Processed {} out of {} documents. Current batch size: {}", @@ -278,7 +461,9 @@ async fn process_tenant_historical_data( batch_manager.get_current_size() ); } + batch.clear(); + batch_start_time = Instant::now(); } } Err(e) => { @@ -288,20 +473,20 @@ async fn process_tenant_historical_data( } } - // Insert any remaining documents + // Process any remaining documents in the batch if !batch.is_empty() { - if let Err(e) = insert_into_clickhouse( - &ch_pool, + if let Err(e) = process_batch( + ch_pool, &batch, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - &app_state.pg_pool, - &tenant_config.name, + &tenant_config, + resume_token_store, &mut batch_manager, + batch_start_time.elapsed(), + &app_state, ) .await { - error!("Failed to insert final batch into ClickHouse: {}", e); + error!("Failed to process final batch: {}", e); failed_docs += batch.len(); } else { processed_docs += batch.len(); @@ -310,7 +495,8 @@ async fn process_tenant_historical_data( let total_duration = start_time.elapsed(); info!( - "Completed processing. Total processed: {}, Total failed: {}, Duration: {:?}, Final batch size: {}", + "Completed processing for tenant {}. Total processed: {}, Total failed: {}, Duration: {:?}, Final batch size: {}", + tenant_config.name, processed_docs, failed_docs, total_duration, @@ -324,39 +510,56 @@ async fn process_tenant_historical_data( Ok(()) } -fn anonymize_statement( - statement: &mut Document, - encryption_salt: &str, - tenant_name: &str, +async fn process_batch( + ch_pool: &ClickhousePool, + batch: &[(String, String, String)], + tenant_config: &TenantConfig, + resume_token_store: &RocksDBResumeTokenStore, + batch_manager: &mut BatchSizeManager, + batch_duration: Duration, + app_state: &Arc, ) -> Result<()> { - // Create a deep copy of the statement - let mut statement_copy = statement.clone(); - - // Check if all required fields exist - if !statement_copy.contains_key("actor") - || !statement_copy - .get_document("actor")? - .contains_key("account") - || !statement_copy - .get_document("actor")? - .get_document("account")? - .contains_key("name") + if let Err(e) = insert_into_clickhouse( + ch_pool, + batch, + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + &tenant_config.clickhouse_table_opt_out, + resume_token_store, + &tenant_config.name, + batch_manager, + app_state, + ) + .await { - return Err(anyhow!("Statement is missing required fields")); + error!("Failed to insert batch into ClickHouse: {}", e); + Err(e) + } else { + batch_manager.adjust_batch_size(batch.len(), batch_duration); + Ok(()) } +} - let name = statement_copy - .get_document("actor")? - .get_document("account")? - .get_str("name")?; - - let value_to_hash = if name.contains('@') { - name.split('@').next().unwrap_or("") - } else if name.contains(':') { - name.split(':').last().unwrap_or("") - } else { - name - }; +fn anonymize_statement( + statement: &mut Document, + encryption_salt: &str, + tenant_name: &str, +) -> Result { + let actor = statement + .get_document_mut("actor") + .context("Missing 'actor' field")?; + let account = actor + .get_document_mut("account") + .context("Missing 'account' field in actor")?; + let name = account + .get_str("name") + .context("Missing 'name' field in account")?; + + let value_to_hash = name + .split('@') + .next() + .or_else(|| name.split(':').last()) + .unwrap_or(name); if value_to_hash.is_empty() { return Err(anyhow!("Empty value to hash for name: {}", name)); @@ -366,19 +569,10 @@ fn anonymize_statement( hasher.update(encryption_salt.as_bytes()); hasher.update(tenant_name.as_bytes()); hasher.update(value_to_hash.as_bytes()); - let result = hasher.finalize(); - let hashed_value = hex::encode(result); - - // Update the copy - statement_copy - .get_document_mut("actor")? - .get_document_mut("account")? - .insert("name", hashed_value); - - // If we've made it this far without errors, update the original statement - *statement = statement_copy; - // println!("Anonymized statement: {:?}", statement); - Ok(()) + let hashed_value = hex::encode(hasher.finalize()); + + account.insert("name", hashed_value.clone()); + Ok(hashed_value) } async fn process_statement(statement: &str) -> Result { @@ -410,14 +604,17 @@ async fn process_statement(statement: &str) -> Result { async fn insert_into_clickhouse( ch_pool: &ClickhousePool, - bulk_insert_values: &[(String, String)], + bulk_insert_values: &[(String, String, String)], clickhouse_db: &str, clickhouse_table: &str, - pg_pool: &PgPool, + clickhouse_table_opt_out: &str, + resume_token_store: &RocksDBResumeTokenStore, tenant_name: &str, batch_manager: &mut BatchSizeManager, + app_state: &Arc, ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); + let full_table_name_opt_out = format!("{}.{}", clickhouse_db, clickhouse_table_opt_out); for (chunk_index, chunk) in bulk_insert_values .chunks(batch_manager.get_current_size()) @@ -427,7 +624,15 @@ async fn insert_into_clickhouse( let mut delay = INITIAL_RETRY_DELAY; loop { - match insert_batch(ch_pool, chunk, &full_table_name).await { + match insert_batch( + ch_pool, + chunk, + &full_table_name, + &full_table_name_opt_out, + &app_state.cached_hashes, + ) + .await + { Ok(_) => { info!( "Successfully inserted batch {} of {} records", @@ -445,10 +650,11 @@ async fn insert_into_clickhouse( chunk_index + 1 ); log_failed_batch( - pg_pool, + resume_token_store, tenant_name, clickhouse_db, clickhouse_table, + clickhouse_table_opt_out, chunk, ) .await @@ -466,7 +672,6 @@ async fn insert_into_clickhouse( } } - // Add logging for batch size changes here let old_size = batch_manager.get_current_size(); batch_manager.adjust_batch_size(chunk.len(), Duration::from_secs(1)); // Assume 1 second per batch, adjust as needed let new_size = batch_manager.get_current_size(); @@ -478,149 +683,169 @@ async fn insert_into_clickhouse( Ok(()) } +async fn log_failed_batch( + resume_token_store: &RocksDBResumeTokenStore, + tenant_name: &str, + clickhouse_db: &str, + clickhouse_table: &str, + clickhouse_table_opt_out: &str, + failed_batch: &[(String, String, String)], +) -> Result<()> { + let failed_batch_json = + serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; + + let key = format!( + "failed_batch:{}:{}:{}:{}", + tenant_name, clickhouse_db, clickhouse_table, clickhouse_table_opt_out + ); + + resume_token_store + .set_resume_token(&key, failed_batch_json.as_bytes()) + .context("Failed to store failed batch in RocksDB")?; + + Ok(()) +} + async fn insert_batch( ch_pool: &ClickhousePool, - batch: &[(String, String)], + batch: &[(String, String, String)], full_table_name: &str, + full_table_name_opt_out: &str, + cached_hashes: &Arc>, ) -> Result<()> { let mut client = ch_pool .get_handle() .await .context("Failed to get client from ClickHouse pool")?; - let insert_data: Result> = - futures::future::try_join_all(batch.iter().map(|(record_id, statement)| async move { - let processed_statement = process_statement(statement).await?; - Ok(format!( - "('{}', '{}', now())", - record_id, processed_statement - )) - })) - .await; - - let insert_data = insert_data.context("Failed to process statements")?; - - let insert_query = format!( - "INSERT INTO {} (id, statement, created_at) VALUES {}", - full_table_name, - insert_data.join(" , ") + let mut insert_data = Vec::new(); + let mut insert_data_opt_out = Vec::new(); + + let cached_hashes_guard = cached_hashes.read().await; + debug!( + "Cached HashSet last updated: {:?}", + *cached_hashes_guard.last_updated.read().await ); - client - .execute(insert_query.as_str()) - .await - .context("Failed to execute insert query")?; + for (record_id, statement, hashed_value) in batch { + let processed_statement = process_statement(statement).await?; + let is_opt_out = cached_hashes_guard.contains(hashed_value).await; - Ok(()) -} + debug!( + "Processed statement: {}, is_opt_out: {}", + processed_statement, is_opt_out + ); -async fn log_failed_batch( - pg_pool: &PgPool, - tenant_name: &str, - clickhouse_db: &str, - clickhouse_table: &str, - failed_batch: &[(String, String)], -) -> Result<()> { - let failed_batch_json = - serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; + let formatted = format!("('{}', '{}', now())", record_id, processed_statement); - let mut client = pg_pool - .get() - .await - .context("Failed to get client from PostgreSQL pool")?; + if is_opt_out { + insert_data_opt_out.push(formatted); + } else { + insert_data.push(formatted); + } + } - let statement = client - .prepare( - "INSERT INTO failed_batches (tenant_name, clickhouse_db, clickhouse_table, failed_batch) - VALUES ($1, $2, $3, $4)", - ) + drop(cached_hashes_guard); // Release the read lock + + // Insert into full_table_name + if !insert_data.is_empty() { + let insert_query = format!( + "INSERT INTO {} (id, statement, created_at) VALUES {}", + full_table_name, + insert_data.join(", ") + ); + client + .execute(insert_query.as_str()) .await - .context("Failed to prepare PostgreSQL statement")?; - - client - .execute( - &statement, - &[ - &tenant_name, - &clickhouse_db, - &clickhouse_table, - &failed_batch_json, - ], - ) - .await - .context("Failed to execute PostgreSQL statement")?; + .context("Failed to execute insert query for full_table_name")?; + } + + // Insert into full_table_name_opt_out + if !insert_data_opt_out.is_empty() { + let insert_query_opt_out = format!( + "INSERT INTO {} (id, statement, created_at) VALUES {}", + full_table_name_opt_out, + insert_data_opt_out.join(", ") + ); + client + .execute(insert_query_opt_out.as_str()) + .await + .context("Failed to execute insert query for full_table_name_opt_out")?; + } Ok(()) } async fn retry_failed_batches(app_state: Arc) -> Result<()> { - let pg_pool = &app_state.pg_pool; - + let resume_token_store = &app_state.resume_token_store; loop { - let mut client = pg_pool - .get() - .await - .context("Failed to get PostgreSQL client")?; - let statement = client - .prepare( - "SELECT id, tenant_name, clickhouse_db, clickhouse_table, failed_batch - FROM failed_batches - ORDER BY created_at - LIMIT 100", - ) - .await - .context("Failed to prepare PostgreSQL statement")?; + let iter = resume_token_store.db.iterator(rocksdb::IteratorMode::Start); + let failed_batch_prefix = b"failed_batch:"; + + for item in iter { + let (key, value) = item?; + if key.starts_with(failed_batch_prefix) { + let key_str = String::from_utf8_lossy(&key); + info!("Processing failed batch: {}", key_str); + let parts: Vec<&str> = key_str.splitn(5, ':').collect(); + if parts.len() != 5 { + error!("Invalid failed batch key format: {}", key_str); + continue; + } - let rows = client - .query(&statement, &[]) - .await - .context("Failed to execute PostgreSQL query")?; - - for row in rows { - let failed_batch_id: i32 = row.get(0); - let tenant_name: String = row.get(1); - let clickhouse_db: String = row.get(2); - let clickhouse_table: String = row.get(3); - let failed_batch: String = row.get(4); - - let tenant_config = app_state - .config - .tenants - .iter() - .find(|t| t.name == tenant_name) - .cloned(); - - if let Some(tenant_config) = tenant_config { - let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri); - let bulk_insert_values: Vec<(String, String)> = serde_json::from_str(&failed_batch) - .context("Failed to deserialize failed batch")?; - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); - if let Err(e) = insert_into_clickhouse( - &ch_pool, - &bulk_insert_values, - &clickhouse_db, - &clickhouse_table, - pg_pool, - &tenant_name, - &mut batch_manager, - ) - .await - { - error!("Error retrying failed batch: {}", e); + let tenant_name = parts[1]; + let clickhouse_db = parts[2]; + let clickhouse_table = parts[3]; + let clickhouse_table_opt_out = parts[4]; + + let failed_batch: Vec<(String, String, String)> = + serde_json::from_slice(&value).context("Failed to deserialize failed batch")?; + + let tenant_config = app_state + .config + .tenants + .iter() + .find(|t| t.name == tenant_name) + .cloned(); + + if let Some(tenant_config) = tenant_config { + let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri.as_str()); + let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + + match insert_into_clickhouse( + &ch_pool, + &failed_batch, + clickhouse_db, + clickhouse_table, + clickhouse_table_opt_out, + resume_token_store, + tenant_name, + &mut batch_manager, + &app_state, + ) + .await + { + Ok(_) => { + info!( + "Successfully retried failed batch for tenant: {}", + tenant_name + ); + resume_token_store.db.delete(key)?; + } + Err(e) => { + error!( + "Error retrying failed batch for tenant {}: {}", + tenant_name, e + ); + } + } } else { - let delete_statement = client - .prepare("DELETE FROM failed_batches WHERE id = $1") - .await - .context("Failed to prepare delete statement")?; - client - .execute(&delete_statement, &[&failed_batch_id]) - .await - .context("Failed to delete processed failed batch")?; + error!("Tenant config not found for tenant: {}", tenant_name); } } } - tokio::time::sleep(std::time::Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs(60)).await; } } @@ -643,7 +868,11 @@ async fn main() -> Result<()> { return Err(anyhow!("Unsupported environment")); } }; - let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; + let config: AppConfig = Config::builder() + .add_source(File::with_name(config_path)) + .build()? + .try_deserialize() + .context("Failed to deserialize config")?; let tenant_name = env::args() .nth(1) @@ -673,22 +902,23 @@ async fn main() -> Result<()> { .ok_or_else(|| anyhow!("Tenant not found in the configuration"))? .clone(); - let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri); - let pg_manager = PostgresConnectionManager::new_from_stringlike( - &config.pg_database_url, - tokio_postgres::NoTls, - )?; - let pg_pool = Pool::builder().build(pg_manager).await?; + let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri.as_str()); + let resume_token_store = Arc::new( + RocksDBResumeTokenStore::new("/app/data/rocksdb_historical_data") + .context("Failed to create resume token store for RocksDB historical data")?, + ); + let cached_hashes = Arc::new(RwLock::new(CachedHashSet::new(&clickhouse_pool).await?)); let app_state = Arc::new(AppState { - config: config.clone(), + config, clickhouse_pools: vec![clickhouse_pool], - pg_pool, + resume_token_store, + cached_hashes: Arc::clone(&cached_hashes), }); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let shutdown_tx_opt = Arc::new(Mutex::new(Some(shutdown_tx))); - let shutdown_tx_opt_clone = Arc::clone(&shutdown_tx_opt); + let shutdown_tx = Arc::new(tokio::sync::Mutex::new(Some(shutdown_tx))); + let shutdown_tx_clone = Arc::clone(&shutdown_tx); let app_state_clone = app_state.clone(); let run_handle = tokio::spawn(async move { @@ -699,16 +929,16 @@ async fn main() -> Result<()> { } else { error!("Program encountered an error"); } - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { + if let Some(tx) = shutdown_tx.lock().await.take() { + if tx.send(()).is_err() { error!("Failed to send shutdown signal"); } } } Err(e) => { error!("Error running the program: {}", e); - if let Some(shutdown_tx) = shutdown_tx_opt.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { + if let Some(tx) = shutdown_tx.lock().await.take() { + if tx.send(()).is_err() { error!("Failed to send shutdown signal"); } } @@ -716,13 +946,13 @@ async fn main() -> Result<()> { } }); - let retry_handle = tokio::spawn(retry_failed_batches(app_state)); + let retry_handle = tokio::spawn(retry_failed_batches(app_state.clone())); tokio::select! { _ = signal::ctrl_c() => { info!("Received shutdown signal, shutting down gracefully..."); - if let Some(shutdown_tx) = shutdown_tx_opt_clone.lock().unwrap().take() { - if let Err(_) = shutdown_tx.send(()) { + if let Some(tx) = shutdown_tx_clone.lock().await.take() { + if tx.send(()).is_err() { error!("Failed to send shutdown signal"); } } @@ -731,17 +961,17 @@ async fn main() -> Result<()> { info!("Program finished, shutting down gracefully..."); } run_result = run_handle => { - match run_result { - Ok(_) => info!("Run task completed"), - Err(e) => error!("Run task failed: {}", e), + if let Err(e) = run_result { + error!("Run task panicked: {}", e); } } retry_result = retry_handle => { match retry_result { Ok(inner_result) => { - match inner_result { - Ok(_) => info!("Retry task completed successfully"), - Err(e) => error!("Retry task failed: {}", e), + if let Err(e) = inner_result { + error!("Retry task failed: {}", e); + } else { + info!("Retry task completed successfully"); } } Err(e) => error!("Retry task panicked: {}", e), @@ -749,5 +979,8 @@ async fn main() -> Result<()> { } } + // Perform cleanup + app_state.resume_token_store.db.flush()?; + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 7763d89..e11495b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,10 @@ // src/main.rs - Live data processing use anyhow::{anyhow, Context, Result}; -use bb8::Pool; -use bb8_postgres::PostgresConnectionManager; +use chrono::{DateTime, Utc}; use clickhouse_rs::Pool as ClickhousePool; use config::{Config, File}; -use futures::{future::join_all, stream::StreamExt}; -use log::{error, info, warn}; +use futures::stream::StreamExt; +// use log::{error, info, warn}; use mongodb::{ bson::{self, doc, Document}, change_stream::event::{ChangeStreamEvent, ResumeToken}, @@ -13,14 +12,24 @@ use mongodb::{ Client as MongoClient, }; use regex::Regex; -use rocksdb::{Options, DB}; +use rocksdb::{Error as RocksError, Options, DB}; use serde::Deserialize; use serde_json::to_string; use sha2::{Digest, Sha256}; +use std::collections::HashSet; use std::path::Path; -use std::{env, sync::Arc, time::Duration, time::Instant}; +use std::{env, sync::Arc, time::Duration, time::Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::RwLock; +use tracing::{debug, error, info, instrument, warn}; + +use futures::future::join_all; +use std::fs; +use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::{signal, sync::oneshot, task}; + lazy_static::lazy_static! { static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1"); static ref BACKSLASH_REGEX_2: Regex = Regex::new(r"\\(?:\\\\)*").expect("Failed to compile BACKSLASH_REGEX_2"); @@ -30,29 +39,127 @@ lazy_static::lazy_static! { const MAX_BATCH_SIZE: usize = 10000; const MAX_RETRIES: u32 = 5; const INITIAL_RETRY_DELAY: u64 = 1000; +const MAX_RETRY_COUNT: usize = 5; + +#[derive(Debug)] +pub enum StoreError { + RocksDB(RocksError), + OpenFailed, +} struct RocksDBResumeTokenStore { db: Arc, } +impl std::fmt::Display for StoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StoreError::RocksDB(e) => write!(f, "RocksDB error: {}", e), + StoreError::OpenFailed => write!(f, "Failed to open database after multiple attempts"), + } + } +} + +impl std::error::Error for StoreError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + StoreError::RocksDB(e) => Some(e), + StoreError::OpenFailed => None, + } + } +} + +impl From for StoreError { + fn from(error: RocksError) -> Self { + StoreError::RocksDB(error) + } +} impl RocksDBResumeTokenStore { - pub fn new>(path: P) -> Result { + #[instrument(skip(path), err)] + pub fn new(path: &str) -> Result { + info!("Creating new RocksDBResumeTokenStore"); + let db = Self::open_db_aggressive(path)?; + Ok(Self { db: Arc::new(db) }) + } + + #[instrument(skip(path), err)] + fn open_db_aggressive(path: &str) -> Result { + let db_path = Path::new(path); + let lock_file_path = db_path.join("LOCK"); + + // Step 1: Try to open normally + match Self::try_open_db(path) { + Ok(db) => return Ok(db), + Err(e) => warn!("Failed to open RocksDB normally: {}", e), + } + + // Step 2: Remove LOCK file if it exists + if lock_file_path.exists() { + info!("Removing existing LOCK file"); + if let Err(e) = fs::remove_file(&lock_file_path) { + warn!("Failed to remove LOCK file: {}", e); + } + } + + // Step 3: Try to open again after removing LOCK file + match Self::try_open_db(path) { + Ok(db) => return Ok(db), + Err(e) => warn!("Failed to open RocksDB after removing LOCK file: {}", e), + } + + // Step 4: If all else fails, delete the entire database and create a new one + warn!("Recreating the entire database"); + if db_path.exists() { + if let Err(e) = fs::remove_dir_all(db_path) { + error!("Failed to remove existing database directory: {}", e); + return Err(StoreError::OpenFailed); + } + } + + Self::try_open_db(path) + } + + fn try_open_db(path: &str) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); - opts.set_max_open_files(10000); - opts.set_use_fsync(true); - let db = DB::open(&opts, path)?; - Ok(Self { db: Arc::new(db) }) + opts.set_max_open_files(10_000); + opts.set_keep_log_file_num(10); + opts.set_max_total_wal_size(64 * 1024 * 1024); // 64MB + opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB + opts.set_max_write_buffer_number(3); + opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB + opts.set_level_zero_file_num_compaction_trigger(8); + opts.set_level_zero_slowdown_writes_trigger(17); + opts.set_level_zero_stop_writes_trigger(24); + opts.set_num_levels(4); + opts.set_max_bytes_for_level_base(512 * 1024 * 1024); // 512MB + opts.set_max_bytes_for_level_multiplier(8.0); + + DB::open(&opts, path).map_err(|e| { + error!("Failed to open RocksDB: {}", e); + StoreError::RocksDB(e) + }) } - pub fn update_resume_token(&self, token_bytes: &[u8], tenant_name: &str) -> Result<()> { - self.db.put(tenant_name.as_bytes(), token_bytes)?; - Ok(()) + #[instrument(skip(self, key), err)] + pub fn get_resume_token(&self, key: &str) -> Result>, StoreError> { + debug!("Getting resume token"); + self.db.get(key).map_err(|e| { + error!("Failed to get resume token: {}", e); + StoreError::from(e) + }) } - pub fn get_resume_token(&self, tenant_name: &str) -> Result>> { - Ok(self.db.get(tenant_name.as_bytes())?) + #[instrument(skip(self, key, value), err)] + pub fn set_resume_token(&self, key: &str, value: &[u8]) -> Result<(), StoreError> { + debug!("Setting resume token for key: {}", key); + self.db.put(key.as_bytes(), value).map_err(|e| { + error!("Failed to set resume token: {}", e); + StoreError::from(e) + }) } + // Add other methods as needed } + #[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, @@ -62,6 +169,7 @@ struct TenantConfig { clickhouse_uri: String, clickhouse_db: String, clickhouse_table: String, + clickhouse_table_opt_out: String, } #[derive(Deserialize, Debug)] @@ -69,16 +177,16 @@ struct AppConfig { tenants: Vec, encryption_salt: String, batch_size: usize, - number_of_workers: usize, + clickhouse_uri: String, } -type PostgresPool = Pool>; type ClickhousePoolType = ClickhousePool; struct AppState { config: AppConfig, clickhouse_pools: Vec, resume_token_store: Arc, + cached_hashes: Arc>, } struct BatchSizeManager { @@ -171,6 +279,7 @@ async fn process_tenant_records( app_state: Arc, pool_index: usize, ) -> Result<()> { + println!("mongo_uri: {}", tenant_config.mongo_uri); let mongo_client = match connect_to_mongo(&tenant_config.mongo_uri).await { Ok(client) => client, Err(e) => { @@ -211,7 +320,7 @@ async fn process_tenant_records( } }; - let mut batch = Vec::with_capacity(app_state.config.batch_size); + let mut batch: Vec<(String, String, String)> = Vec::with_capacity(app_state.config.batch_size); let mut batch_start_time = Instant::now(); info!( "Starting change stream processing batch size: {}", @@ -236,15 +345,18 @@ async fn process_tenant_records( (Some(record_id), Some(statement)) => { let record_id_str = record_id.to_hex(); let mut statement = statement.to_owned(); - - if let Err(e) = anonymize_statement( + // println!("statement------->: {:?}", statement); + let hashed_value = match anonymize_statement( &mut statement, &app_state.config.encryption_salt, &tenant_config.name, ) { - warn!("Failed to anonymize statement: {}", e); - continue; - } + Ok(hashed) => hashed, + Err(e) => { + warn!("Failed to anonymize statement: {}", e); + continue; + } + }; let statement_str = match to_string(&statement) { Ok(s) => s, @@ -254,19 +366,20 @@ async fn process_tenant_records( } }; - batch.push((record_id_str, statement_str)); + batch.push((record_id_str, statement_str, hashed_value)); let should_process = batch.len() >= batch_manager.get_current_size() || batch_start_time.elapsed() >= Duration::from_secs(5); if should_process { let batch_duration = batch_start_time.elapsed(); if let Err(e) = process_batch( - &ch_pool, + ch_pool, &batch, &tenant_config, resume_token_store, &mut batch_manager, batch_duration, + &app_state, ) .await { @@ -281,7 +394,7 @@ async fn process_tenant_records( let tenant_name = tenant_config.name.clone(); if let Err(e) = resume_token_store - .update_resume_token(&token_bytes, &tenant_name) + .set_resume_token(&tenant_name, &token_bytes) { error!("Failed to update resume token in RocksDB: {}", e); } @@ -315,13 +428,15 @@ async fn process_tenant_records( if !batch.is_empty() { if let Err(e) = insert_into_clickhouse( - &ch_pool, + ch_pool, &batch, &tenant_config.clickhouse_db, &tenant_config.clickhouse_table, + &tenant_config.clickhouse_table_opt_out, resume_token_store, &tenant_config.name, &mut batch_manager, + &app_state, ) .await { @@ -332,7 +447,7 @@ async fn process_tenant_records( let token_bytes = bson::to_vec(&resume_token)?; let tenant_name = tenant_config.name.clone(); - if let Err(e) = resume_token_store.update_resume_token(&token_bytes, &tenant_name) { + if let Err(e) = resume_token_store.set_resume_token(&tenant_name, &token_bytes) { error!("Failed to update final resume token in RocksDB: {}", e); } } @@ -343,20 +458,23 @@ async fn process_tenant_records( async fn process_batch( ch_pool: &ClickhousePoolType, - batch: &[(String, String)], + batch: &[(String, String, String)], tenant_config: &TenantConfig, resume_token_store: &Arc, batch_manager: &mut BatchSizeManager, batch_duration: Duration, + app_state: &Arc, ) -> Result<()> { if let Err(e) = insert_into_clickhouse( ch_pool, batch, &tenant_config.clickhouse_db, &tenant_config.clickhouse_table, + &tenant_config.clickhouse_table_opt_out, resume_token_store, &tenant_config.name, batch_manager, + &app_state, ) .await { @@ -377,35 +495,22 @@ fn anonymize_statement( statement: &mut Document, encryption_salt: &str, tenant_name: &str, -) -> Result<()> { - // Create a deep copy of the statement - let mut statement_copy = statement.clone(); - - // Check if all required fields exist - if !statement_copy.contains_key("actor") - || !statement_copy - .get_document("actor")? - .contains_key("account") - || !statement_copy - .get_document("actor")? - .get_document("account")? - .contains_key("name") - { - return Err(anyhow!("Statement is missing required fields")); - } - - let name = statement_copy - .get_document("actor")? - .get_document("account")? - .get_str("name")?; - - let value_to_hash = if name.contains('@') { - name.split('@').next().unwrap_or("") - } else if name.contains(':') { - name.split(':').last().unwrap_or("") - } else { - name - }; +) -> Result { + let actor = statement + .get_document_mut("actor") + .context("Missing 'actor' field")?; + let account = actor + .get_document_mut("account") + .context("Missing 'account' field in actor")?; + let name = account + .get_str("name") + .context("Missing 'name' field in account")?; + + let value_to_hash = name + .split('@') + .next() + .or_else(|| name.split(':').last()) + .unwrap_or(name); if value_to_hash.is_empty() { return Err(anyhow!("Empty value to hash for name: {}", name)); @@ -415,18 +520,10 @@ fn anonymize_statement( hasher.update(encryption_salt.as_bytes()); hasher.update(tenant_name.as_bytes()); hasher.update(value_to_hash.as_bytes()); - let result = hasher.finalize(); - let hashed_value = hex::encode(result); - - // Update the copy - statement_copy - .get_document_mut("actor")? - .get_document_mut("account")? - .insert("name", hashed_value); + let hashed_value = hex::encode(hasher.finalize()); - // If we've made it this far without errors, update the original statement - *statement = statement_copy; - Ok(()) + account.insert("name", hashed_value.clone()); + Ok(hashed_value) } async fn process_statement(statement: &str) -> Result { @@ -458,14 +555,17 @@ async fn process_statement(statement: &str) -> Result { async fn insert_into_clickhouse( ch_pool: &ClickhousePool, - bulk_insert_values: &[(String, String)], + bulk_insert_values: &[(String, String, String)], clickhouse_db: &str, clickhouse_table: &str, + clickhouse_table_opt_out: &str, resume_token_store: &RocksDBResumeTokenStore, tenant_name: &str, batch_manager: &mut BatchSizeManager, + app_state: &Arc, ) -> Result<()> { let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); + let full_table_name_opt_out = format!("{}.{}", clickhouse_db, clickhouse_table_opt_out); for (chunk_index, chunk) in bulk_insert_values .chunks(batch_manager.get_current_size()) @@ -475,8 +575,17 @@ async fn insert_into_clickhouse( let mut delay = INITIAL_RETRY_DELAY; loop { - match insert_batch(ch_pool, chunk, &full_table_name).await { + match insert_batch( + ch_pool, + chunk, + &full_table_name, + &full_table_name_opt_out, + &app_state.cached_hashes, + ) + .await + { Ok(_) => { + // println!("chunk---------{:?}", chunk); info!( "Successfully inserted batch {} of {} records", chunk_index + 1, @@ -497,6 +606,7 @@ async fn insert_into_clickhouse( tenant_name, clickhouse_db, clickhouse_table, + clickhouse_table_opt_out, chunk, ) .await @@ -525,38 +635,150 @@ async fn insert_into_clickhouse( Ok(()) } +struct CachedHashSet { + data: Arc>>, + last_updated: Arc>>, +} + +impl CachedHashSet { + async fn new(ch_pool: &ClickhousePool) -> Result { + let data = Arc::new(RwLock::new(HashSet::new())); + let last_updated = Arc::new(RwLock::new(Utc::now())); + let cache = Self { data, last_updated }; + cache.refresh(ch_pool).await?; + Ok(cache) + } + + #[instrument(skip(self, ch_pool))] + async fn refresh(&self, ch_pool: &ClickhousePool) -> Result<()> { + info!("Refreshing cached HashSet"); + let mut client = ch_pool.get_handle().await?; + let query = "SELECT email, hashed_moodle_id FROM default.moodle_ids WHERE (email, version) IN ( SELECT email, MAX(version) AS max_version FROM default.moodle_ids GROUP BY email )"; + let mut cursor = client.query(query).stream(); + let mut new_data = HashSet::new(); + + while let Some(row) = cursor.next().await { + let row = row.context("Failed to fetch row")?; + let hash: String = row.get("hashed_moodle_id")?; + debug!("Inserting hashed value: {}", hash); // Added debug log + new_data.insert(hash); + } + + let mut data = self.data.write().await; + *data = new_data; + let mut last_updated = self.last_updated.write().await; + *last_updated = Utc::now(); + + info!("Cached HashSet refreshed successfully"); + debug!("Total number of hashed values: {}", data.len()); // Added debug log + Ok(()) + } + async fn contains(&self, value: &str) -> bool { + let data = self.data.read().await; + data.contains(value) + } +} + +#[instrument(skip(socket, cached_hashes, ch_pool))] +async fn handle_client( + mut socket: TcpStream, + cached_hashes: Arc>, + ch_pool: Arc, +) -> Result<()> { + let mut buffer = [0; 1024]; + let n = socket.read(&mut buffer).await?; + let command = String::from_utf8_lossy(&buffer[..n]).to_string(); + + match command.trim() { + "invalidate" => { + info!("Received invalidation command"); + cached_hashes.write().await.refresh(&ch_pool).await?; + socket.write_all(b"Cache invalidated successfully").await?; + } + _ => { + socket.write_all(b"Unknown command").await?; + } + } + socket.write_all(b"OK\n").await?; + Ok(()) +} + async fn insert_batch( ch_pool: &ClickhousePool, - batch: &[(String, String)], + batch: &[(String, String, String)], full_table_name: &str, + full_table_name_opt_out: &str, + cached_hashes: &Arc>, ) -> Result<()> { let mut client = ch_pool .get_handle() .await .context("Failed to get client from ClickHouse pool")?; - let insert_data: Result> = - futures::future::try_join_all(batch.iter().map(|(record_id, statement)| async move { - let processed_statement = process_statement(statement).await?; - Ok(format!( - "('{}', '{}', now())", - record_id, processed_statement - )) - })) - .await; - - let insert_data = insert_data.context("Failed to process statements")?; - - let insert_query = format!( - "INSERT INTO {} (id, statement, created_at) VALUES {}", - full_table_name, - insert_data.join(" , ") + let mut insert_data = Vec::new(); + let mut insert_data_opt_out = Vec::new(); + + let cached_hashes_guard = cached_hashes.read().await; + debug!( + "Cached HashSet last updated: {:?}", + *cached_hashes_guard.data.read().await ); + let processing_futures = batch.iter().map(|(record_id, statement, hashed_value)| { + let cached_hashes_guard = &cached_hashes_guard; + async move { + let processed_statement = process_statement(statement).await?; + let is_opt_out = cached_hashes_guard.contains(hashed_value).await; + debug!( + "Processed statement: {}, is_opt_out: {}", + processed_statement, is_opt_out + ); + let formatted = format!("('{}', '{}', now())", record_id, processed_statement); + Ok::<_, anyhow::Error>((formatted, is_opt_out)) + } + }); - client - .execute(insert_query.as_str()) - .await - .context("Failed to execute insert query")?; + let results = join_all(processing_futures).await; + + for result in results { + match result { + Ok((formatted, is_opt_out)) => { + if is_opt_out { + insert_data_opt_out.push(formatted); + } else { + insert_data.push(formatted); + } + } + Err(e) => return Err(e), + } + } + + drop(cached_hashes_guard); // Release the read lock + + // Insert into full_table_name + if !insert_data.is_empty() { + let insert_query = format!( + "INSERT INTO {} (id, statement, created_at) VALUES {}", + full_table_name, + insert_data.join(", ") + ); + client + .execute(insert_query.as_str()) + .await + .context("Failed to execute insert query for full_table_name")?; + } + + // Insert into full_table_name_opt_out + if !insert_data_opt_out.is_empty() { + let insert_query_opt_out = format!( + "INSERT INTO {} (id, statement, created_at) VALUES {}", + full_table_name_opt_out, + insert_data_opt_out.join(", ") + ); + client + .execute(insert_query_opt_out.as_str()) + .await + .context("Failed to execute insert query for full_table_name_opt_out")?; + } Ok(()) } @@ -566,15 +788,16 @@ async fn log_failed_batch( tenant_name: &str, clickhouse_db: &str, clickhouse_table: &str, - failed_batch: &[(String, String)], + clickhouse_table_opt_out: &str, + failed_batch: &[(String, String, String)], ) -> Result<()> { let failed_batch_json = serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; resume_token_store.db.put( format!( - "failed_batch:{}:{}:{}", - tenant_name, clickhouse_db, clickhouse_table + "failed_batch:{}:{}:{}:{}", + tenant_name, clickhouse_db, clickhouse_table, clickhouse_table_opt_out ) .as_bytes(), failed_batch_json.as_bytes(), @@ -585,7 +808,6 @@ async fn log_failed_batch( async fn retry_failed_batches(app_state: Arc) -> Result<()> { let resume_token_store = &app_state.resume_token_store; - loop { let iter = resume_token_store.db.iterator(rocksdb::IteratorMode::Start); let failed_batch_prefix = b"failed_batch:"; @@ -594,6 +816,7 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { let (key, value) = item?; if key.starts_with(failed_batch_prefix) { let key_str = String::from_utf8_lossy(&key); + println!("key_str: {}", key_str); let parts: Vec<&str> = key_str.splitn(4, ':').collect(); if parts.len() != 4 { error!("Invalid failed batch key format: {}", key_str); @@ -603,8 +826,9 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { let tenant_name = parts[1]; let clickhouse_db = parts[2]; let clickhouse_table = parts[3]; + let clickhouse_table_opt_out = parts[4]; - let failed_batch: Vec<(String, String)> = + let failed_batch: Vec<(String, String, String)> = serde_json::from_slice(&value).context("Failed to deserialize failed batch")?; let tenant_config = app_state @@ -623,9 +847,11 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { &failed_batch, clickhouse_db, clickhouse_table, + clickhouse_table_opt_out, resume_token_store, tenant_name, &mut batch_manager, + &app_state, ) .await { @@ -653,6 +879,45 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { } } +async fn run_socket_server( + cached_hashes: Arc>, + ch_pool: Arc, + retry_count: Arc, +) -> Result<()> { + loop { + match TcpListener::bind("0.0.0.0:8088").await { + Ok(listener) => { + info!("Socket server listening on 0.0.0.0:8088"); + retry_count.store(0, Ordering::SeqCst); + + while let Ok((socket, _)) = listener.accept().await { + let cached_hashes = Arc::clone(&cached_hashes); + let ch_pool = Arc::clone(&ch_pool); + + tokio::spawn(async move { + if let Err(e) = handle_client(socket, cached_hashes, ch_pool).await { + error!("Error handling client: {:?}", e); + } + }); + } + } + Err(e) => { + error!("Failed to bind socket server: {:?}", e); + let current_retry = retry_count.fetch_add(1, Ordering::SeqCst); + if current_retry >= MAX_RETRY_COUNT { + error!("Socket server failed to start after {} attempts. Shutting down socket server.", MAX_RETRY_COUNT); + return Err(anyhow!("Socket server permanently down")); + } + let backoff_duration = Duration::from_secs(2u64.pow(current_retry as u32)); + error!("Retrying in {:?}...", backoff_duration); + tokio::time::sleep(backoff_duration).await; + } + } + + error!("Socket server stopped unexpectedly. Restarting..."); + } +} + #[tokio::main] async fn main() -> Result<()> { env_logger::init(); @@ -679,13 +944,18 @@ async fn main() -> Result<()> { let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri.as_str()); clickhouse_pools.push(clickhouse_pool); } - - let resume_token_store = Arc::new(RocksDBResumeTokenStore::new("/app/data/rocksdb")?); + let ch_pool = Arc::new(ClickhousePool::new(config.clickhouse_uri.as_str())); + let cached_hashes = Arc::new(RwLock::new(CachedHashSet::new(&ch_pool).await?)); + let resume_token_store = Arc::new( + RocksDBResumeTokenStore::new("/app/data/rocksdb") + .expect("Failed to create resume token store"), + ); let app_state = Arc::new(AppState { config, clickhouse_pools, resume_token_store, + cached_hashes: Arc::clone(&cached_hashes), }); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); @@ -705,6 +975,19 @@ async fn main() -> Result<()> { } }); + let retry_count = Arc::new(AtomicUsize::new(0)); + let socket_server_handle = tokio::spawn({ + let cached_hashes = Arc::clone(&cached_hashes); + let ch_pool = Arc::clone(&ch_pool); + let retry_count = Arc::clone(&retry_count); + async move { + if let Err(e) = run_socket_server(cached_hashes, ch_pool, retry_count).await { + error!("Socket server encountered a permanent error: {:?}", e); + error!("Socket server will not be restarted."); + } + } + }); + tokio::select! { _ = signal::ctrl_c() => { info!("Received shutdown signal, shutting down gracefully..."); @@ -721,6 +1004,13 @@ async fn main() -> Result<()> { _ = retry_handle => { info!("Retry failed batches loop has completed."); } + _ = socket_server_handle => { + if retry_count.load(Ordering::SeqCst) >= MAX_RETRY_COUNT { + warn!("Socket server has permanently shut down after maximum retry attempts."); + } else { + info!("Socket server has unexpectedly shut down."); + } + } } info!("Shutting down...");