Skip to content

Commit

Permalink
Merge pull request #20 from isankadn/dev
Browse files Browse the repository at this point in the history
some fixes
  • Loading branch information
isankadn authored Mar 24, 2024
2 parents f69ff45 + ba93a3d commit 6b8a2a6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
33 changes: 25 additions & 8 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use std::{env, error::Error, sync::Arc};
use tokio::sync::Semaphore;
use tokio::signal;
use tokio::sync::oneshot;

type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

Expand Down Expand Up @@ -91,7 +92,9 @@ async fn process_tenant_historical_data(
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?;
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri)
.await
.map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?;
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
let mongo_collection = mongo_db.collection::<Document>(&tenant_config.mongo_collection);

Expand Down Expand Up @@ -235,8 +238,9 @@ async fn insert_into_clickhouse(
let mut client = match ch_pool.get_handle().await {
Ok(client) => client,
Err(e) => {
error!("Failed to get client from ClickHouse pool: {}", e);
return Err(e.into());
let err_msg = format!("Failed to get client from ClickHouse pool: {}", e);
error!("{}", err_msg);
return Err(anyhow!(err_msg));
}
};

Expand Down Expand Up @@ -264,10 +268,12 @@ async fn insert_into_clickhouse(
return Ok(());
}
Err(e) => {
error!("Failed to insert statements into ClickHouse: {}", e);
let err_msg = format!("Failed to insert statements into ClickHouse: {}", e);
error!("{}", err_msg);
retry_count += 1;
if retry_count == max_retries {
error!("Max retries reached for insertion. Logging failed batch.");
let err_msg = "Max retries reached for insertion. Logging failed batch.";
error!("{}", err_msg);
log_failed_batch(
pg_pool,
tenant_name,
Expand Down Expand Up @@ -302,7 +308,7 @@ async fn deduplicate_clickhouse_data(
return Err(e.into());
}
};

info!("processing duplicate data...");
let create_dedup_table_query = format!(
"CREATE TABLE {table}_dedup
ENGINE = MergeTree()
Expand Down Expand Up @@ -511,10 +517,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
clickhouse_pools: vec![clickhouse_pool],
pg_pool,
});
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();

run(app_state.clone(), tenant_name, start_date, end_date).await?;
let app_state_clone = app_state.clone();
tokio::spawn(async move {
run(app_state_clone, tenant_name, start_date, end_date)
.await
.unwrap();
});

tokio::spawn(retry_failed_batches(app_state));

signal::ctrl_c().await?;
info!("Received shutdown signal, shutting down gracefully...");

shutdown_tx.send(()).unwrap();

Ok(())
}
12 changes: 6 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,34 @@ async fn process_tenant_records(
pool_index: usize,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?;
println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri);
// println!("<<-- mongo_uri {:?}", &tenant_config.mongo_uri);
// println!("<<-- mongo_client {:?}", mongo_client);
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
println!("<<-- mongo_db: {:?}", mongo_db);
// println!("<<-- mongo_db: {:?}", mongo_db);
let mongo_collection: mongodb::Collection<Document> =
mongo_db.collection(&tenant_config.mongo_collection);

let pg_pool = &app_state.postgres_pool;
let ch_pool = &app_state.clickhouse_pools[pool_index];
println!("pg_pool {:?}", pg_pool);
// println!("pg_pool {:?}", pg_pool);
let pg_conn = pg_pool.get().await?;
println!("pg_conn {:?}", pg_conn);
// println!("pg_conn {:?}", pg_conn);
let row = pg_conn
.query_one(
"SELECT token FROM resume_token WHERE tenant_name = $1 ORDER BY id DESC LIMIT 1",
&[&tenant_config.name],
)
.await
.ok();
println!("row {:?}", row);
// println!("row {:?}", row);
let mut options = mongodb::options::ChangeStreamOptions::default();
if let Some(row) = row {
let token_bytes: Vec<u8> = row.get("token");
if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&token_bytes) {
options.resume_after = Some(resume_token);
}
}
println!("app_state {:?}", &app_state.config);
// println!("app_state {:?}", &app_state.config);
let change_stream_options = ChangeStreamOptions::default();
let mut change_stream = mongo_collection.watch(None, change_stream_options).await?;

Expand Down

0 comments on commit 6b8a2a6

Please sign in to comment.