diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index bce0762..dbe0c8d 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -6,9 +6,7 @@ use config::{Config, File}; use futures::stream::StreamExt; // use log::{error, info, warn}; use mongodb::{ - bson::{self, doc, Document}, - change_stream::event::{ChangeStreamEvent, ResumeToken}, - options::ChangeStreamOptions, + bson::{self, doc, DateTime as BsonDateTime, Document}, Client as MongoClient, }; use regex::Regex; @@ -20,14 +18,13 @@ use std::collections::HashSet; use std::path::Path; use std::{env, sync::Arc, time::Duration, time::Instant}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; + use tokio::sync::RwLock; use tracing::{debug, error, info, instrument, warn}; use std::fs; -use std::sync::atomic::{AtomicUsize, Ordering}; -use tokio::{signal, sync::oneshot, task}; + +use tokio::{signal, sync::oneshot}; lazy_static::lazy_static! { static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1"); @@ -35,7 +32,7 @@ lazy_static::lazy_static! { static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3"); } -const MAX_BATCH_SIZE: usize = 10000; +const MAX_BATCH_SIZE: usize = 8000; const MAX_RETRIES: u32 = 5; const INITIAL_RETRY_DELAY: u64 = 1000; const MAX_RETRY_COUNT: usize = 5; @@ -255,12 +252,18 @@ impl BatchSizeManager { fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: Duration) { let performance = docs_processed as f64 / time_taken.as_secs_f64(); + let old_size = self.current_size; if performance > self.performance_threshold { self.current_size = (self.current_size * 2).min(self.max_size); } else { self.current_size = (self.current_size / 2).max(self.min_size); } + + info!( + "Batch size adjusted: {} -> {}. Performance: {:.2}, Threshold: {:.2}", + old_size, self.current_size, performance, self.performance_threshold + ); } fn get_current_size(&self) -> usize { @@ -361,10 +364,11 @@ async fn process_tenant_historical_data( .await .context("Failed to create MongoDB cursor")?; - let mut batch: Vec<(String, String, String)> = Vec::with_capacity(app_state.config.batch_size); + let mut batch: Vec<(String, String, BsonDateTime, String)> = + Vec::with_capacity(app_state.config.batch_size); let mut batch_start_time = Instant::now(); let mut batch_manager = - BatchSizeManager::new(app_state.config.batch_size, 1, MAX_BATCH_SIZE, 5000.0); + BatchSizeManager::new(app_state.config.batch_size, 5000, MAX_BATCH_SIZE, 10000.0); let mut processed_docs = 0; let mut failed_docs = 0; @@ -402,7 +406,8 @@ async fn process_tenant_historical_data( continue; } }; - println!("timestamp----------: {:?}", timestamp); + + // println!("timestamp----------: {:?}", timestamp); let mut statement = statement.to_owned(); let hashed_value = match anonymize_statement( &mut statement, @@ -432,7 +437,7 @@ async fn process_tenant_historical_data( } }; - batch.push((record_id, statement_str, hashed_value)); + batch.push((record_id, statement_str, *timestamp, hashed_value)); let should_process = batch.len() >= batch_manager.get_current_size() || batch_start_time.elapsed() >= Duration::from_secs(5); @@ -512,7 +517,7 @@ async fn process_tenant_historical_data( async fn process_batch( ch_pool: &ClickhousePool, - batch: &[(String, String, String)], + batch: &[(String, String, BsonDateTime, String)], tenant_config: &TenantConfig, resume_token_store: &RocksDBResumeTokenStore, batch_manager: &mut BatchSizeManager, @@ -604,7 +609,7 @@ async fn process_statement(statement: &str) -> Result { async fn insert_into_clickhouse( ch_pool: &ClickhousePool, - bulk_insert_values: &[(String, String, String)], + bulk_insert_values: &[(String, String, BsonDateTime, String)], clickhouse_db: &str, clickhouse_table: &str, clickhouse_table_opt_out: &str, @@ -689,7 +694,7 @@ async fn log_failed_batch( clickhouse_db: &str, clickhouse_table: &str, clickhouse_table_opt_out: &str, - failed_batch: &[(String, String, String)], + failed_batch: &[(String, String, BsonDateTime, String)], ) -> Result<()> { let failed_batch_json = serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; @@ -708,15 +713,18 @@ async fn log_failed_batch( async fn insert_batch( ch_pool: &ClickhousePool, - batch: &[(String, String, String)], + batch: &[(String, String, BsonDateTime, String)], full_table_name: &str, full_table_name_opt_out: &str, cached_hashes: &Arc>, ) -> Result<()> { - let mut client = ch_pool - .get_handle() - .await - .context("Failed to get client from ClickHouse pool")?; + let mut client = match ch_pool.get_handle().await { + Ok(client) => client, + Err(e) => { + error!("Failed to get ClickHouse client: {:?}", e); + return Err(anyhow!("ClickHouse connection error: {:?}", e)); + } + }; let mut insert_data = Vec::new(); let mut insert_data_opt_out = Vec::new(); @@ -727,7 +735,7 @@ async fn insert_batch( *cached_hashes_guard.last_updated.read().await ); - for (record_id, statement, hashed_value) in batch { + for (record_id, statement, timestamp, hashed_value) in batch { let processed_statement = process_statement(statement).await?; let is_opt_out = cached_hashes_guard.contains(hashed_value).await; @@ -735,8 +743,18 @@ async fn insert_batch( "Processed statement: {}, is_opt_out: {}", processed_statement, is_opt_out ); + // let timestamp: DateTime = Utc::now(); + // println!("timestamp----------------> {}", timestamp); + let millis = timestamp.timestamp_millis(); + let chrono_timestamp: DateTime = DateTime::from_timestamp_millis(millis).unwrap(); - let formatted = format!("('{}', '{}', now())", record_id, processed_statement); + // Format the timestamp for ClickHouse + let formatted_timestamp = chrono_timestamp.format("%Y-%m-%d %H:%M:%S%.3f"); + + let formatted = format!( + "('{}', '{}', now(), '{}')", + record_id, processed_statement, formatted_timestamp + ); if is_opt_out { insert_data_opt_out.push(formatted); @@ -750,7 +768,7 @@ async fn insert_batch( // Insert into full_table_name if !insert_data.is_empty() { let insert_query = format!( - "INSERT INTO {} (id, statement, created_at) VALUES {}", + "INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}", full_table_name, insert_data.join(", ") ); @@ -763,7 +781,7 @@ async fn insert_batch( // Insert into full_table_name_opt_out if !insert_data_opt_out.is_empty() { let insert_query_opt_out = format!( - "INSERT INTO {} (id, statement, created_at) VALUES {}", + "INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}", full_table_name_opt_out, insert_data_opt_out.join(", ") ); @@ -798,7 +816,7 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { let clickhouse_table = parts[3]; let clickhouse_table_opt_out = parts[4]; - let failed_batch: Vec<(String, String, String)> = + let failed_batch: Vec<(String, String, BsonDateTime, String)> = serde_json::from_slice(&value).context("Failed to deserialize failed batch")?; let tenant_config = app_state @@ -810,7 +828,12 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { if let Some(tenant_config) = tenant_config { let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri.as_str()); - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + let mut batch_manager = BatchSizeManager::new( + app_state.config.batch_size, + 100, + MAX_BATCH_SIZE, + 1000.0, + ); match insert_into_clickhouse( &ch_pool, diff --git a/src/main.rs b/src/main.rs index e11495b..8943cbf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ use config::{Config, File}; use futures::stream::StreamExt; // use log::{error, info, warn}; use mongodb::{ - bson::{self, doc, Document}, + bson::{self, doc, DateTime as BsonDateTime, Document}, change_stream::event::{ChangeStreamEvent, ResumeToken}, options::ChangeStreamOptions, Client as MongoClient, @@ -213,12 +213,18 @@ impl BatchSizeManager { fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: Duration) { let performance = docs_processed as f64 / time_taken.as_secs_f64(); + let old_size = self.current_size; if performance > self.performance_threshold { self.current_size = (self.current_size * 2).min(self.max_size); } else { self.current_size = (self.current_size / 2).max(self.min_size); } + + info!( + "Batch size adjusted: {} -> {}. Performance: {:.2}, Threshold: {:.2}", + old_size, self.current_size, performance, self.performance_threshold + ); } fn get_current_size(&self) -> usize { @@ -320,7 +326,8 @@ async fn process_tenant_records( } }; - let mut batch: Vec<(String, String, String)> = Vec::with_capacity(app_state.config.batch_size); + let mut batch: Vec<(String, String, BsonDateTime, String)> = + Vec::with_capacity(app_state.config.batch_size); let mut batch_start_time = Instant::now(); info!( "Starting change stream processing batch size: {}", @@ -366,7 +373,16 @@ async fn process_tenant_records( } }; - batch.push((record_id_str, statement_str, hashed_value)); + let timestamp = + match doc.get("timestamp").and_then(|ts| ts.as_datetime()) { + Some(ts) => ts, + None => { + warn!("Document is missing timestamp field, skipping"); + continue; + } + }; + + batch.push((record_id_str, statement_str, *timestamp, hashed_value)); let should_process = batch.len() >= batch_manager.get_current_size() || batch_start_time.elapsed() >= Duration::from_secs(5); @@ -458,7 +474,7 @@ async fn process_tenant_records( async fn process_batch( ch_pool: &ClickhousePoolType, - batch: &[(String, String, String)], + batch: &[(String, String, BsonDateTime, String)], tenant_config: &TenantConfig, resume_token_store: &Arc, batch_manager: &mut BatchSizeManager, @@ -555,7 +571,7 @@ async fn process_statement(statement: &str) -> Result { async fn insert_into_clickhouse( ch_pool: &ClickhousePool, - bulk_insert_values: &[(String, String, String)], + bulk_insert_values: &[(String, String, BsonDateTime, String)], clickhouse_db: &str, clickhouse_table: &str, clickhouse_table_opt_out: &str, @@ -705,16 +721,18 @@ async fn handle_client( async fn insert_batch( ch_pool: &ClickhousePool, - batch: &[(String, String, String)], + batch: &[(String, String, BsonDateTime, String)], full_table_name: &str, full_table_name_opt_out: &str, cached_hashes: &Arc>, ) -> Result<()> { - let mut client = ch_pool - .get_handle() - .await - .context("Failed to get client from ClickHouse pool")?; - + let mut client = match ch_pool.get_handle().await { + Ok(client) => client, + Err(e) => { + error!("Failed to get ClickHouse client: {:?}", e); + return Err(anyhow!("ClickHouse connection error: {:?}", e)); + } + }; let mut insert_data = Vec::new(); let mut insert_data_opt_out = Vec::new(); @@ -723,19 +741,31 @@ async fn insert_batch( "Cached HashSet last updated: {:?}", *cached_hashes_guard.data.read().await ); - let processing_futures = batch.iter().map(|(record_id, statement, hashed_value)| { - let cached_hashes_guard = &cached_hashes_guard; - async move { - let processed_statement = process_statement(statement).await?; - let is_opt_out = cached_hashes_guard.contains(hashed_value).await; - debug!( - "Processed statement: {}, is_opt_out: {}", - processed_statement, is_opt_out - ); - let formatted = format!("('{}', '{}', now())", record_id, processed_statement); - Ok::<_, anyhow::Error>((formatted, is_opt_out)) - } - }); + let processing_futures = batch + .iter() + .map(|(record_id, statement, timestamp, hashed_value)| { + let cached_hashes_guard = &cached_hashes_guard; + async move { + let processed_statement = process_statement(statement).await?; + let is_opt_out = cached_hashes_guard.contains(hashed_value).await; + debug!( + "Processed statement: {}, is_opt_out: {}", + processed_statement, is_opt_out + ); + let millis = timestamp.timestamp_millis(); + let chrono_timestamp: DateTime = + DateTime::from_timestamp_millis(millis).unwrap(); + + // Format the timestamp for ClickHouse + let formatted_timestamp = chrono_timestamp.format("%Y-%m-%d %H:%M:%S%.3f"); + + let formatted = format!( + "('{}', '{}', now(), '{}')", + record_id, processed_statement, formatted_timestamp + ); + Ok::<_, anyhow::Error>((formatted, is_opt_out)) + } + }); let results = join_all(processing_futures).await; @@ -757,7 +787,7 @@ async fn insert_batch( // Insert into full_table_name if !insert_data.is_empty() { let insert_query = format!( - "INSERT INTO {} (id, statement, created_at) VALUES {}", + "INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}", full_table_name, insert_data.join(", ") ); @@ -770,7 +800,7 @@ async fn insert_batch( // Insert into full_table_name_opt_out if !insert_data_opt_out.is_empty() { let insert_query_opt_out = format!( - "INSERT INTO {} (id, statement, created_at) VALUES {}", + "INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}", full_table_name_opt_out, insert_data_opt_out.join(", ") ); @@ -789,7 +819,7 @@ async fn log_failed_batch( clickhouse_db: &str, clickhouse_table: &str, clickhouse_table_opt_out: &str, - failed_batch: &[(String, String, String)], + failed_batch: &[(String, String, BsonDateTime, String)], ) -> Result<()> { let failed_batch_json = serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?; @@ -816,7 +846,7 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { let (key, value) = item?; if key.starts_with(failed_batch_prefix) { let key_str = String::from_utf8_lossy(&key); - println!("key_str: {}", key_str); + // println!("key_str: {}", key_str); let parts: Vec<&str> = key_str.splitn(4, ':').collect(); if parts.len() != 4 { error!("Invalid failed batch key format: {}", key_str); @@ -828,7 +858,7 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { let clickhouse_table = parts[3]; let clickhouse_table_opt_out = parts[4]; - let failed_batch: Vec<(String, String, String)> = + let failed_batch: Vec<(String, String, BsonDateTime, String)> = serde_json::from_slice(&value).context("Failed to deserialize failed batch")?; let tenant_config = app_state @@ -840,7 +870,12 @@ async fn retry_failed_batches(app_state: Arc) -> Result<()> { if let Some(tenant_config) = tenant_config { let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri.as_str()); - let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0); + let mut batch_manager = BatchSizeManager::new( + app_state.config.batch_size, + 1, + MAX_BATCH_SIZE, + 1000.0, + ); match insert_into_clickhouse( &ch_pool,