Skip to content

Commit

Permalink
Merge pull request #18 from isankadn/dev
Browse files Browse the repository at this point in the history
adding workers
  • Loading branch information
isankadn authored Mar 24, 2024
2 parents 07d655b + d433491 commit e5ebfab
Showing 1 changed file with 48 additions and 22 deletions.
70 changes: 48 additions & 22 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn run(
.ok_or_else(|| anyhow!("Tenant not found in the configuration"))?;
// println!("tenant name {:?}", tenant.name);
if let Err(e) = process_tenant_historical_data(
tenant.clone(),
Arc::new(tenant.clone()),
Arc::clone(&app_state),
0,
start_date,
Expand All @@ -85,7 +85,7 @@ async fn run(
}

async fn process_tenant_historical_data(
tenant_config: TenantConfig,
tenant_config: Arc<TenantConfig>,
app_state: Arc<AppState>,
pool_index: usize,
start_date: chrono::NaiveDate,
Expand All @@ -95,7 +95,7 @@ async fn process_tenant_historical_data(
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
let mongo_collection = mongo_db.collection::<Document>(&tenant_config.mongo_collection);

let ch_pool = &app_state.clickhouse_pools[pool_index];
let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone());
let start_datetime = DateTime::<Utc>::from_utc(start_date.and_hms(0, 0, 0), Utc);
let end_datetime = DateTime::<Utc>::from_utc(end_date.and_hms(23, 59, 59), Utc);

Expand All @@ -114,6 +114,41 @@ async fn process_tenant_historical_data(
let batch_size = app_state.config.batch_size;
let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64;

// Create a channel to send batches of documents
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

// Spawn a fixed number of worker tasks
let num_workers = 4;
let mut receivers: Vec<tokio::sync::mpsc::UnboundedReceiver<Vec<(String, String)>>> =
Vec::new();
for _ in 0..num_workers {
let (_, rx) = tokio::sync::mpsc::unbounded_channel();
receivers.push(rx);
}
for mut rx in receivers {
let tenant_config = Arc::clone(&tenant_config);
let ch_pool = Arc::clone(&ch_pool);
let pg_pool = Arc::new(app_state.pg_pool.clone());
tokio::spawn(async move {
while let Some(batch) = rx.recv().await {
for (record_id_str, statement_str) in batch {
if let Err(e) = insert_into_clickhouse(
&ch_pool,
&[(record_id_str, statement_str)],
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&pg_pool,
&tenant_config.name,
)
.await
{
error!("Error inserting into ClickHouse: {}", e);
}
}
}
});
}

for batch_index in 0..num_batches {
let skip = batch_index * batch_size;
let options = FindOptions::builder()
Expand All @@ -123,8 +158,9 @@ async fn process_tenant_historical_data(

match mongo_collection.find(None, options).await {
Ok(mut cursor) => {
// Process documents and send batches to the channel
let mut batch = Vec::with_capacity(batch_size as usize);
while let Some(result) = cursor.next().await {
// print!("tent info {:?}", &tenant_config.name);
if let Ok(doc) = result {
let record_id = doc.get("_id").and_then(|id| id.as_object_id());
let statement = doc.get("statement").and_then(|s| s.as_document());
Expand Down Expand Up @@ -152,27 +188,17 @@ async fn process_tenant_historical_data(
}

let statement_str = to_string(&statement).unwrap_or_default();

let ch_pool = ch_pool.clone();
let tenant_config = tenant_config.clone();
let pg_pool = app_state.pg_pool.clone();
tokio::spawn(async move {
if let Err(e) = insert_into_clickhouse(
&ch_pool,
&[(record_id_str, statement_str)],
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&pg_pool,
&tenant_config.name,
)
.await
{
error!("Error inserting into ClickHouse: {}", e);
}
});
batch.push((record_id_str, statement_str));
if batch.len() == batch_size as usize {
sender.send(batch).unwrap();
batch = Vec::with_capacity(batch_size as usize);
}
}
}
}
if !batch.is_empty() {
sender.send(batch).unwrap();
}
}
Err(e) => {
error!("Error retrieving documents from MongoDB: {}", e);
Expand Down

0 comments on commit e5ebfab

Please sign in to comment.