Skip to content

Commit

Permalink
Merge pull request #29 from isankadn/dev
Browse files Browse the repository at this point in the history
fixing quotation parsing error_
  • Loading branch information
isankadn authored Apr 23, 2024
2 parents 44c2203 + 3a44f7f commit b88324d
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 69 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 65 additions & 39 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use sha2::{Digest, Sha256};

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use chrono::{DateTime, NaiveDate, NaiveDateTime, Timelike, Utc};
use rayon::prelude::*;
use std::{
env,
error::Error,
fmt,
sync::{Arc, Mutex},
};
use tokio::{signal, sync::mpsc};
Expand Down Expand Up @@ -59,8 +60,8 @@ struct AppState {
async fn run(
app_state: Arc<AppState>,
tenant_name: String,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
start_date: chrono::NaiveDateTime,
end_date: chrono::NaiveDateTime,
) -> Result<bool, Box<dyn Error>> {
let tenant = app_state
.config
Expand Down Expand Up @@ -100,18 +101,19 @@ async fn process_tenant_historical_data(
tenant_config: Arc<TenantConfig>,
app_state: Arc<AppState>,
pool_index: usize,
start_date: NaiveDate,
end_date: NaiveDate,
start_date: NaiveDateTime,
end_date: NaiveDateTime,
) -> Result<()> {
let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri)
.await
.map_err(|e| anyhow!("Failed to connect to MongoDB: {}", e))?;
let mongo_db = mongo_client.database(&tenant_config.mongo_db);
let mongo_collection = mongo_db.collection::<Document>(&tenant_config.mongo_collection);

println!("start_date----------- {}", start_date);
println!("end_date----------- {}", end_date);
let ch_pool = Arc::new(app_state.clickhouse_pools[pool_index].clone());
let start_datetime = DateTime::<Utc>::from_utc(start_date.and_hms(0, 0, 0), Utc);
let end_datetime = DateTime::<Utc>::from_utc(end_date.and_hms(23, 59, 59), Utc);
let start_datetime = DateTime::<Utc>::from_utc(start_date, Utc);
let end_datetime = DateTime::<Utc>::from_utc(end_date, Utc);

let filter = doc! {
"timestamp": {
Expand Down Expand Up @@ -164,8 +166,8 @@ async fn process_tenant_historical_data(

if let (Some(record_id), Some(statement)) = (record_id, statement) {
let record_id_str = record_id.to_hex();

let mut statement = statement.to_owned();

if let Some(actor) =
statement.get_mut("actor").and_then(|a| a.as_document_mut())
{
Expand All @@ -178,10 +180,6 @@ async fn process_tenant_historical_data(
let anonymized_name = if name_str.contains(':') {
let parts: Vec<&str> =
name_str.split(':').collect();
info!(
"{}",
&bson::Bson::String(parts[1].to_string())
);
if parts.len() == 2 {
anonymize_data(
&bson::Bson::String(parts[1].to_string()),
Expand All @@ -198,10 +196,6 @@ async fn process_tenant_historical_data(
} else if name_str.contains('@') {
let parts: Vec<&str> =
name_str.split('@').collect();
info!(
"{}",
&bson::Bson::String(parts[0].to_string())
);
if parts.len() == 2 {
anonymize_data(
&bson::Bson::String(parts[0].to_string()),
Expand All @@ -216,10 +210,6 @@ async fn process_tenant_historical_data(
)
}
} else {
info!(
"{}",
&bson::Bson::String(name_str.to_string())
);
anonymize_data(
&bson::Bson::String(name_str.to_string()),
&app_state.config.encryption_salt,
Expand All @@ -231,13 +221,13 @@ async fn process_tenant_historical_data(
}
}
}

// info!("Statement: {}", statement);
let statement_str = to_string(&statement).unwrap_or_default();
batch.push((record_id_str, statement_str));
}
}
}

// println!("batch: {:?}", batch);
if let Err(e) = insert_into_clickhouse(
&ch_pool,
&batch,
Expand All @@ -250,7 +240,6 @@ async fn process_tenant_historical_data(
{
error!("Error inserting into ClickHouse: {}", e);
}

batch
}
})
Expand All @@ -259,7 +248,17 @@ async fn process_tenant_historical_data(
let handles: Vec<JoinHandle<Vec<_>>> = batches.into_iter().map(tokio::spawn).collect();

for handle in handles {
handle.await.unwrap();
match handle.await {
Ok(processed_records) => {
// The batch was processed successfully and inserted into ClickHouse
// You can perform any additional actions or logging here if needed
// ...
}
Err(e) => {
error!("Error joining batch task: {}", e);
// Handle the error appropriately, such as logging, retrying, or taking corrective action
}
}
}

Ok(())
Expand Down Expand Up @@ -303,29 +302,58 @@ async fn insert_into_clickhouse(
let max_retries = 5;
let mut retry_count = 0;
// info!("bulk_insert_values: {:?}", bulk_insert_values);
// println!("bulk_insert_values: {:?}", bulk_insert_values);
while retry_count < max_retries {
let insert_data: Vec<String> = bulk_insert_values
.iter()
.map(|(record_id, statement)| {
let escaped_statement = statement.replace("'", "\\'");
format!("('{}', '{}')", record_id, escaped_statement)
})
.map(
|(record_id, statement)| match serde_json::to_string(statement) {
Ok(serialized_statement) => {
let trimmed_statement = statement
.trim_start_matches('"')
.trim_end_matches('"')
.replace("'", "\\'")
.replace("\\", "\\");
// println!("Inserting record: {}", trimmed_statement);
format!("('{}' , '{}')", record_id, trimmed_statement)
}
Err(e) => {
error!("Failed to serialize JSON: {}", e);
format!("('{}' , '')", record_id)
}
},
)
.collect();

let insert_query = format!(
"INSERT INTO {} (id, statement) VALUES {}",
full_table_name,
insert_data.join(",")
insert_data.join(" , ")
);
// info!("insert_query: {:?}", insert_query);

match client.execute(insert_query.as_str()).await {
Ok(_) => {
info!("Successfully inserted statements into ClickHouse");
return Ok(());
}
Err(e) => {
let err_msg = format!("Failed to insert statements into ClickHouse: {}", e);
error!("{}", err_msg);
error!("err_msg: {}", err_msg);

// Log the specific record causing the issue
let failed_record = bulk_insert_values
.iter()
.find(|(_, statement)| insert_query.contains(statement))
.map(|(_, statement)| statement.clone())
.unwrap_or_else(|| "Record not found".to_string());
error!("Problematic record: {}", failed_record);
let failed_record_id = bulk_insert_values
.iter()
.find(|(id, _)| insert_query.contains(id))
.map(|(id, _)| id.clone())
.unwrap_or_else(|| "Record not found".to_string());
error!("Problematic record_id: {}", failed_record_id);

retry_count += 1;
if retry_count == max_retries {
let err_msg = "Max retries reached for insertion. Logging failed batch.";
Expand All @@ -347,7 +375,6 @@ async fn insert_into_clickhouse(
}
}
}

Err(anyhow!("Max retries exceeded"))
}

Expand Down Expand Up @@ -504,9 +531,9 @@ async fn retry_failed_batches(app_state: Arc<AppState>) -> Result<()> {
}
}

fn validate_date(date_str: &str) -> Result<chrono::NaiveDate> {
chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
.map_err(|e| anyhow!("Invalid date format: {}", e))
fn validate_date_time(date_time_str: &str) -> Result<NaiveDateTime> {
NaiveDateTime::parse_from_str(date_time_str, "%Y-%m-%dT%H:%M")
.map_err(|e| anyhow!("Invalid date and time format: {}", e))
}

#[tokio::main]
Expand All @@ -523,7 +550,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
return Err("Unsupported environment".into());
}
};

let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?;

let tenant_name = env::args()
Expand All @@ -538,8 +564,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.nth(3)
.ok_or_else(|| anyhow!("Missing end date argument"))?;

let start_date = validate_date(&start_date)?;
let end_date = validate_date(&end_date)?;
let start_date = validate_date_time(&start_date)?;
let end_date = validate_date_time(&end_date)?;

if end_date < start_date {
return Err(anyhow!("End date must be greater than or equal to start date").into());
Expand Down
Loading

0 comments on commit b88324d

Please sign in to comment.