diff --git a/src/main.rs b/src/main.rs index 8943cbf..01958d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -286,107 +286,185 @@ async fn process_tenant_records( pool_index: usize, ) -> Result<()> { println!("mongo_uri: {}", tenant_config.mongo_uri); - let mongo_client = match connect_to_mongo(&tenant_config.mongo_uri).await { - Ok(client) => client, - Err(e) => { - error!( - "Failed to connect to MongoDB for tenant {}: {}", - tenant_config.name, e - ); - return Err(e.into()); - } - }; - - let mongo_db = mongo_client.database(&tenant_config.mongo_db); - let mongo_collection: mongodb::Collection = - mongo_db.collection(&tenant_config.mongo_collection); - let ch_pool = &app_state.clickhouse_pools[pool_index]; let resume_token_store = &app_state.resume_token_store; - let resume_token = resume_token_store.get_resume_token(&tenant_config.name)?; - info!("Resuming change stream for tenant: {}", tenant_config.name); - info!("Resume token: {:?}", resume_token); + loop { + let mongo_client = match connect_to_mongo(&tenant_config.mongo_uri).await { + Ok(client) => client, + Err(e) => { + error!( + "Failed to connect to MongoDB for tenant {}: {}", + tenant_config.name, e + ); + tokio::time::sleep(Duration::from_secs(60)).await; + continue; + } + }; + + let mongo_db = mongo_client.database(&tenant_config.mongo_db); + let mongo_collection: mongodb::Collection = + mongo_db.collection(&tenant_config.mongo_collection); - let mut options = ChangeStreamOptions::default(); - if let Some(token_bytes) = resume_token { - if let Ok(resume_token) = bson::from_slice::(&token_bytes) { - options.resume_after = Some(resume_token); - } - } + let resume_token = resume_token_store.get_resume_token(&tenant_config.name)?; + info!("Resuming change stream for tenant: {}", tenant_config.name); + info!("Resume token: {:?}", resume_token); - let mut change_stream = match mongo_collection.watch(None, options).await { - Ok(stream) => stream, - Err(e) => { - error!( - "Failed to create change stream for tenant {}: {}", - tenant_config.name, e - ); - return Err(e.into()); + let mut options = ChangeStreamOptions::default(); + if let Some(token_bytes) = resume_token { + if let Ok(resume_token) = bson::from_slice::(&token_bytes) { + options.resume_after = Some(resume_token); + } } - }; - - let mut batch: Vec<(String, String, BsonDateTime, String)> = - Vec::with_capacity(app_state.config.batch_size); - let mut batch_start_time = Instant::now(); - info!( - "Starting change stream processing batch size: {}", - batch.capacity() - ); - let mut batch_manager = - BatchSizeManager::new(app_state.config.batch_size, 1, MAX_BATCH_SIZE, 5000.0); - while let Some(result) = change_stream.next().await { - info!("Processing change event"); - match result { - Ok(change_event) => { - if let ChangeStreamEvent { - full_document: Some(doc), - .. - } = change_event - { - let record_id = doc.get("_id").and_then(|id| id.as_object_id()); - let statement = doc.get("statement").and_then(|s| s.as_document()); - - match (record_id, statement) { - (Some(record_id), Some(statement)) => { - let record_id_str = record_id.to_hex(); - let mut statement = statement.to_owned(); - // println!("statement------->: {:?}", statement); - let hashed_value = match anonymize_statement( - &mut statement, - &app_state.config.encryption_salt, - &tenant_config.name, - ) { - Ok(hashed) => hashed, - Err(e) => { - warn!("Failed to anonymize statement: {}", e); - continue; - } - }; + let mut change_stream = match mongo_collection.watch(None, options.clone()).await { + Ok(stream) => stream, + Err(e) => { + if let mongodb::error::ErrorKind::Command(ref cmd_err) = *e.kind { + if cmd_err.code == 280 { + warn!( + "Resume token not found for tenant {}. Clearing resume token and retrying.", + tenant_config.name + ); + resume_token_store.set_resume_token(&tenant_config.name, &[])?; + options.resume_after = None; + continue; + } + } + error!( + "Failed to create change stream for tenant {}: {}", + tenant_config.name, e + ); + tokio::time::sleep(Duration::from_secs(60)).await; + continue; + } + }; - let statement_str = match to_string(&statement) { - Ok(s) => s, - Err(e) => { - error!("Failed to convert statement to string: {}", e); - continue; - } - }; + let mut batch: Vec<(String, String, BsonDateTime, String)> = + Vec::with_capacity(app_state.config.batch_size); + let mut batch_start_time = Instant::now(); + info!( + "Starting change stream processing batch size: {}", + batch.capacity() + ); + let mut batch_manager = + BatchSizeManager::new(app_state.config.batch_size, 1, MAX_BATCH_SIZE, 5000.0); + + while let Some(result) = change_stream.next().await { + info!("Processing change event"); + match result { + Ok(change_event) => { + if let ChangeStreamEvent { + full_document: Some(doc), + .. + } = change_event + { + let record_id = doc.get("_id").and_then(|id| id.as_object_id()); + let statement = doc.get("statement").and_then(|s| s.as_document()); + + match (record_id, statement) { + (Some(record_id), Some(statement)) => { + let record_id_str = record_id.to_hex(); + let mut statement = statement.to_owned(); + + let hashed_value = match anonymize_statement( + &mut statement, + &app_state.config.encryption_salt, + &tenant_config.name, + ) { + Ok(hashed) => hashed, + Err(e) => { + warn!("Failed to anonymize statement: {}", e); + continue; + } + }; - let timestamp = - match doc.get("timestamp").and_then(|ts| ts.as_datetime()) { - Some(ts) => ts, - None => { - warn!("Document is missing timestamp field, skipping"); + let statement_str = match to_string(&statement) { + Ok(s) => s, + Err(e) => { + error!("Failed to convert statement to string: {}", e); continue; } }; - batch.push((record_id_str, statement_str, *timestamp, hashed_value)); - let should_process = batch.len() >= batch_manager.get_current_size() - || batch_start_time.elapsed() >= Duration::from_secs(5); + let timestamp = + match doc.get("timestamp").and_then(|ts| ts.as_datetime()) { + Some(ts) => ts, + None => { + warn!("Document is missing timestamp field, skipping"); + continue; + } + }; + + batch.push(( + record_id_str, + statement_str, + *timestamp, + hashed_value, + )); + let should_process = batch.len() + >= batch_manager.get_current_size() + || batch_start_time.elapsed() >= Duration::from_secs(5); + + if should_process { + let batch_duration = batch_start_time.elapsed(); + if let Err(e) = process_batch( + ch_pool, + &batch, + &tenant_config, + resume_token_store, + &mut batch_manager, + batch_duration, + &app_state, + ) + .await + { + error!("Failed to process batch: {}", e); + continue; + } + + // Update resume token after successful batch processing + if let Some(resume_token) = change_stream.resume_token() { + let token_bytes = bson::to_vec(&resume_token)?; + let tenant_name = tenant_config.name.clone(); + + if let Err(e) = resume_token_store + .set_resume_token(&tenant_name, &token_bytes) + { + error!( + "Failed to update resume token in RocksDB: {}", + e + ); + } + } + + batch.clear(); + batch_start_time = Instant::now(); + } + } + (None, Some(_)) => { + warn!("Missing '_id' field in the document"); + } + (Some(_), None) => { + warn!("Missing 'statement' field in the document"); + } + (None, None) => { + warn!("Missing both '_id' and 'statement' fields in the document"); + } + } + } + } + Err(e) => { + if let mongodb::error::ErrorKind::Command(ref cmd_err) = *e.kind { + if cmd_err.code == 280 { + warn!( + "Change stream error for tenant {}: Resume token not found. Clearing resume token and restarting stream.", + tenant_config.name + ); + resume_token_store.set_resume_token(&tenant_config.name, &[])?; - if should_process { + // Process any remaining batch before restarting + if !batch.is_empty() { let batch_duration = batch_start_time.elapsed(); if let Err(e) = process_batch( ch_pool, @@ -399,77 +477,101 @@ async fn process_tenant_records( ) .await { - error!("Failed to process batch: {}", e); - // Don't update resume token if batch processing failed - continue; - } - - // Update resume token only after successful batch processing - if let Some(resume_token) = change_stream.resume_token() { - let token_bytes = bson::to_vec(&resume_token)?; - let tenant_name = tenant_config.name.clone(); - - if let Err(e) = resume_token_store - .set_resume_token(&tenant_name, &token_bytes) - { - error!("Failed to update resume token in RocksDB: {}", e); + error!("Failed to process remaining batch: {}", e); + } else { + // Update resume token after successful batch processing + if let Some(resume_token) = change_stream.resume_token() { + let token_bytes = bson::to_vec(&resume_token)?; + let tenant_name = tenant_config.name.clone(); + + if let Err(e) = resume_token_store + .set_resume_token(&tenant_name, &token_bytes) + { + error!( + "Failed to update resume token in RocksDB: {}", + e + ); + } } } - batch.clear(); - batch_start_time = Instant::now(); } + + break; // Break to restart the change stream } - (None, Some(_)) => { - warn!("Missing '_id' field in the document"); - } - (Some(_), None) => { - warn!("Missing 'statement' field in the document"); - } - (None, None) => { - warn!("Missing both '_id' and 'statement' fields in the document"); + } + error!( + "Change stream error for tenant {}: {}. Restarting stream...", + tenant_config.name, e + ); + + // Process any remaining batch before restarting + if !batch.is_empty() { + let batch_duration = batch_start_time.elapsed(); + if let Err(e) = process_batch( + ch_pool, + &batch, + &tenant_config, + resume_token_store, + &mut batch_manager, + batch_duration, + &app_state, + ) + .await + { + error!("Failed to process remaining batch: {}", e); + } else { + // Update resume token after successful batch processing + if let Some(resume_token) = change_stream.resume_token() { + let token_bytes = bson::to_vec(&resume_token)?; + let tenant_name = tenant_config.name.clone(); + + if let Err(e) = + resume_token_store.set_resume_token(&tenant_name, &token_bytes) + { + error!("Failed to update resume token in RocksDB: {}", e); + } + } } + batch.clear(); } + + tokio::time::sleep(Duration::from_secs(60)).await; + break; // Break to restart the change stream } } - Err(e) => { - error!( - "Change stream error for tenant {}: {}. Restarting stream...", - tenant_config.name, e - ); - return Err(e.into()); - } } - } - if !batch.is_empty() { - if let Err(e) = insert_into_clickhouse( - ch_pool, - &batch, - &tenant_config.clickhouse_db, - &tenant_config.clickhouse_table, - &tenant_config.clickhouse_table_opt_out, - resume_token_store, - &tenant_config.name, - &mut batch_manager, - &app_state, - ) - .await - { - error!("Failed to insert final batch into ClickHouse: {}", e); - } else { - // Update resume token after successful processing of the final batch - if let Some(resume_token) = change_stream.resume_token() { - let token_bytes = bson::to_vec(&resume_token)?; - let tenant_name = tenant_config.name.clone(); + // After the while loop ends, process any remaining batch + if !batch.is_empty() { + let batch_duration = batch_start_time.elapsed(); + if let Err(e) = process_batch( + ch_pool, + &batch, + &tenant_config, + resume_token_store, + &mut batch_manager, + batch_duration, + &app_state, + ) + .await + { + error!("Failed to process final batch: {}", e); + } else { + // Update resume token after successful batch processing + if let Some(resume_token) = change_stream.resume_token() { + let token_bytes = bson::to_vec(&resume_token)?; + let tenant_name = tenant_config.name.clone(); - if let Err(e) = resume_token_store.set_resume_token(&tenant_name, &token_bytes) { - error!("Failed to update final resume token in RocksDB: {}", e); + if let Err(e) = resume_token_store.set_resume_token(&tenant_name, &token_bytes) + { + error!("Failed to update resume token in RocksDB: {}", e); + } } } + batch.clear(); } } - Ok(()) } async fn process_batch(