Skip to content

Commit

Permalink
Merge pull request #23 from isankadn/dev
Browse files Browse the repository at this point in the history
Dev to Main | working copy using Rayon
  • Loading branch information
isankadn authored Mar 24, 2024
2 parents 48eed91 + eea25d1 commit 8c77f06
Showing 1 changed file with 57 additions and 61 deletions.
118 changes: 57 additions & 61 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ use mongodb::{
Client as MongoClient,
};
use rayon::prelude::*;
use rayon::prelude::*;
use serde::Deserialize;
use serde_json::to_string;
use sha2::{Digest, Sha256};

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use rayon::prelude::*;
use std::{env, error::Error, sync::Arc};
use tokio::signal;
use tokio::sync::oneshot;

use tokio::{signal, sync::mpsc};
use tokio::{
sync::{broadcast, oneshot},
task::JoinHandle,
};
type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

#[derive(Deserialize, Clone)]
Expand All @@ -38,7 +40,7 @@ struct AppConfig {
tenants: Vec<TenantConfig>,
encryption_salt: String,
batch_size: u64,
max_concurrency: usize,
number_of_workers: usize,
pg_database_url: String,
}

Expand Down Expand Up @@ -89,8 +91,8 @@ async fn process_tenant_historical_data(
tenant_config: Arc<TenantConfig>,
app_state: Arc<AppState>,
pool_index: usize,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
start_date: NaiveDate,
end_date: NaiveDate,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri)
.await
Expand Down Expand Up @@ -119,49 +121,32 @@ async fn process_tenant_historical_data(
let batch_size = app_state.config.batch_size;
let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64;

let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

let num_workers = 4;
let mut receivers: Vec<tokio::sync::mpsc::UnboundedReceiver<Vec<(String, String)>>> =
Vec::new();
for _ in 0..num_workers {
let (_, rx) = tokio::sync::mpsc::unbounded_channel();
receivers.push(rx);
}
for mut rx in receivers {
let tenant_config = Arc::clone(&tenant_config);
let ch_pool = Arc::clone(&ch_pool);
let pg_pool = Arc::new(app_state.pg_pool.clone());
tokio::spawn(async move {
while let Some(batch) = rx.recv().await {
for (record_id_str, statement_str) in batch {
if let Err(e) = insert_into_clickhouse(
&ch_pool,
&[(record_id_str, statement_str)],
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&pg_pool,
&tenant_config.name,
)
.await
{
error!("Error inserting into ClickHouse: {}", e);
let batches: Vec<_> = (0..num_batches)
.into_par_iter()
.map(|batch_index| {
let tenant_config = Arc::clone(&tenant_config);
let ch_pool = Arc::clone(&ch_pool);
let pg_pool = app_state.pg_pool.clone();
let mongo_collection = mongo_collection.clone();
let filter = filter.clone();
let app_state = Arc::clone(&app_state);

async move {
let skip = batch_index * batch_size;
let options = FindOptions::builder()
.skip(skip)
.limit(batch_size as i64)
.projection(doc! { "_id": 1, "statement": 1 })
.build();

let mut cursor = match mongo_collection.find(filter, options).await {
Ok(cursor) => cursor,
Err(e) => {
error!("Error retrieving documents from MongoDB: {}", e);
return Vec::new();
}
}
}
});
}

for batch_index in 0..num_batches {
let skip = batch_index * batch_size;
let options = FindOptions::builder()
.skip(skip)
.limit(batch_size as i64)
.projection(doc! { "_id": 1, "statement": 1 })
.build();
};

match mongo_collection.find(filter.clone(), options).await {
Ok(mut cursor) => {
let mut batch = Vec::with_capacity(batch_size as usize);
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
Expand Down Expand Up @@ -192,21 +177,32 @@ async fn process_tenant_historical_data(

let statement_str = to_string(&statement).unwrap_or_default();
batch.push((record_id_str, statement_str));
if batch.len() == batch_size as usize {
sender.send(batch).unwrap();
batch = Vec::with_capacity(batch_size as usize);
}
}
}
}
if !batch.is_empty() {
sender.send(batch).unwrap();

if let Err(e) = insert_into_clickhouse(
&ch_pool,
&batch,
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&pg_pool,
&tenant_config.name,
)
.await
{
error!("Error inserting into ClickHouse: {}", e);
}

batch
}
Err(e) => {
error!("Error retrieving documents from MongoDB: {}", e);
}
}
})
.collect();

let handles: Vec<JoinHandle<Vec<_>>> = batches.into_iter().map(tokio::spawn).collect();

for handle in handles {
handle.await.unwrap();
}

Ok(())
Expand Down Expand Up @@ -249,7 +245,7 @@ async fn insert_into_clickhouse(

let max_retries = 5;
let mut retry_count = 0;

// info!("bulk_insert_values: {:?}", bulk_insert_values);
while retry_count < max_retries {
let insert_data: Vec<String> = bulk_insert_values
.iter()
Expand All @@ -264,7 +260,7 @@ async fn insert_into_clickhouse(
full_table_name,
insert_data.join(",")
);

// info!("insert_query: {:?}", insert_query);
match client.execute(insert_query.as_str()).await {
Ok(_) => {
info!("Successfully inserted statements into ClickHouse");
Expand Down Expand Up @@ -534,7 +530,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if let Err(e) = handle.await {
error!("retry_failed_batches task failed: {}", e);
} else {
info!("Processing finished, exiting...");
info!("Retry finished!");
}

signal::ctrl_c().await?;
Expand Down

0 comments on commit 8c77f06

Please sign in to comment.