diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs index ef07eb3..e34faa0 100644 --- a/historical_data/src/main.rs +++ b/historical_data/src/main.rs @@ -114,10 +114,8 @@ async fn process_tenant_historical_data( let batch_size = app_state.config.batch_size; let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64; - // Create a channel to send batches of documents let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - // Spawn a fixed number of worker tasks let num_workers = 4; let mut receivers: Vec>> = Vec::new(); @@ -158,7 +156,6 @@ async fn process_tenant_historical_data( match mongo_collection.find(None, options).await { Ok(mut cursor) => { - // Process documents and send batches to the channel let mut batch = Vec::with_capacity(batch_size as usize); while let Some(result) = cursor.next().await { if let Ok(doc) = result { diff --git a/src/main.rs b/src/main.rs index bf51a98..8a01436 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,7 +109,7 @@ async fn process_tenant_records( let mut change_stream = mongo_collection.watch(None, change_stream_options).await?; while let Some(result) = change_stream.next().await { - println!(">>--- Change event: {:?}", result); + // println!(">>--- Change event: {:?}", result); match result { Ok(change_event) => { if let ChangeStreamEvent { @@ -347,7 +347,7 @@ async fn main() -> Result<(), Box> { postgres_pool, clickhouse_pools, }); - println!("app_state_main {:?}", app_state); + // println!("app_state_main {:?}", app_state); let _ = run(app_state).await; tokio::signal::ctrl_c() .await