Skip to content

Commit

Permalink
Merge pull request #17 from isankadn/dev
Browse files Browse the repository at this point in the history
adding date range for hitorical data and readme file
  • Loading branch information
isankadn authored Mar 24, 2024
2 parents 5da3f4f + 23b0372 commit 07d655b
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 7 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde_json = "1.0.114"
serde_yaml = "0.9.33"
rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"


[[bin]]
Expand Down
5 changes: 5 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@


Inside "mongo-to-clickhouse" container run this.

./historical_data saikyo 2020-01-01 2021-12-31
1 change: 1 addition & 0 deletions historical_data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ serde_json = "1.0.114"
serde_yaml = "0.9.33"
rayon = "1.9.0"
sqlx = "0.7.4"
chrono = "0.4.35"
56 changes: 49 additions & 7 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clickhouse_rs::{Client as ClickhouseClient, Pool as ClickhousePool};
use futures::stream::{self, StreamExt};
use log::{error, info};
use mongodb::{
bson::{self, Bson, Document},
bson::{self, doc, Bson, Document},
options::FindOptions,
Client as MongoClient,
};
Expand All @@ -13,11 +13,11 @@ use serde::Deserialize;
use serde_json::to_string;
use sha2::{Digest, Sha256};

use std::{env, error::Error, sync::Arc};
use tokio::sync::Semaphore;

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use std::{env, error::Error, sync::Arc};
use tokio::sync::Semaphore;

type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

Expand Down Expand Up @@ -49,15 +49,27 @@ struct AppState {
pg_pool: PgPool,
}

async fn run(app_state: Arc<AppState>, tenant_name: String) -> Result<(), Box<dyn Error>> {
async fn run(
app_state: Arc<AppState>,
tenant_name: String,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
) -> Result<(), Box<dyn Error>> {
let tenant = app_state
.config
.tenants
.iter()
.find(|t| t.name == tenant_name)
.ok_or_else(|| anyhow!("Tenant not found in the configuration"))?;
// println!("tenant name {:?}", tenant.name);
if let Err(e) = process_tenant_historical_data(tenant.clone(), Arc::clone(&app_state), 0).await
if let Err(e) = process_tenant_historical_data(
tenant.clone(),
Arc::clone(&app_state),
0,
start_date,
end_date,
)
.await
{
error!("Error processing tenant {}: {}", tenant.name, e);
}
Expand All @@ -76,13 +88,23 @@ async fn process_tenant_historical_data(
tenant_config: TenantConfig,
app_state: Arc<AppState>,
pool_index: usize,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?;
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
let mongo_collection = mongo_db.collection::<Document>(&tenant_config.mongo_collection);

let ch_pool = &app_state.clickhouse_pools[pool_index];
let start_datetime = DateTime::<Utc>::from_utc(start_date.and_hms(0, 0, 0), Utc);
let end_datetime = DateTime::<Utc>::from_utc(end_date.and_hms(23, 59, 59), Utc);

let filter = doc! {
"timestamp": {
"$gte": bson::DateTime::from_millis(start_datetime.timestamp_millis()),
"$lte": bson::DateTime::from_millis(end_datetime.timestamp_millis()),
}
};
let total_docs = mongo_collection.estimated_document_count(None).await?;
info!(
"Total documents in {}: {}",
Expand Down Expand Up @@ -406,6 +428,11 @@ async fn retry_failed_batches(app_state: Arc<AppState>) -> Result<()> {
}
}

fn validate_date(date_str: &str) -> Result<chrono::NaiveDate> {
chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
.map_err(|e| anyhow!("Invalid date format: {}", e))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
Expand All @@ -426,6 +453,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
.nth(1)
.ok_or_else(|| anyhow!("Missing tenant name argument"))?;

let start_date = env::args()
.nth(2)
.ok_or_else(|| anyhow!("Missing start date argument"))?;

let end_date = env::args()
.nth(3)
.ok_or_else(|| anyhow!("Missing end date argument"))?;

let start_date = validate_date(&start_date)?;
let end_date = validate_date(&end_date)?;

if end_date < start_date {
return Err(anyhow!("End date must be greater than or equal to start date").into());
}

let tenant = config
.tenants
.iter()
Expand All @@ -447,7 +489,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
pg_pool,
});

run(app_state.clone(), tenant_name).await?;
run(app_state.clone(), tenant_name, start_date, end_date).await?;

tokio::spawn(retry_failed_batches(app_state));

Expand Down

0 comments on commit 07d655b

Please sign in to comment.