Skip to content

Commit

Permalink
seems perfectly working fine code
Browse files Browse the repository at this point in the history
  • Loading branch information
isankadn committed Aug 5, 2024
1 parent a67bb1e commit 7af6f65
Showing 1 changed file with 152 additions and 116 deletions.
268 changes: 152 additions & 116 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use anyhow::{anyhow, Context, Result};
use clickhouse_rs::{Client as ClickhouseClient, Pool as ClickhousePool};
use futures::stream::{self, StreamExt};
use clickhouse_rs::Pool as ClickhousePool;
use futures::stream::StreamExt;
use log::{error, info, warn};
use mongodb::{
bson::{self, doc, Bson, Document},
options::FindOptions,
bson::{self, doc, Document},
Client as MongoClient,
};

Expand All @@ -15,21 +14,16 @@ use sha2::{Digest, Sha256};

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, NaiveDate, NaiveDateTime, Timelike, Utc};
use chrono::{DateTime, NaiveDateTime, Utc};

use std::{
env,
error::Error,
fmt,
sync::{Arc, Mutex},
time::Duration,
time::Instant,
};
use tokio::{signal, sync::mpsc};
use tokio::{
sync::{broadcast, oneshot},
task::JoinHandle,
};

use tokio::signal;
use tokio::sync::oneshot;
type PgPool = Pool<PostgresConnectionManager<tokio_postgres::NoTls>>;

lazy_static::lazy_static! {
Expand Down Expand Up @@ -188,7 +182,7 @@ async fn process_tenant_historical_data(
let total_docs = mongo_collection
.count_documents(filter.clone(), None)
.await
.context("Failed to count documents in MongoDB")? as usize; // Cast to usize
.context("Failed to count documents in MongoDB")? as usize;
info!("Total documents to process: {}", total_docs);

let mut cursor = mongo_collection
Expand All @@ -199,65 +193,103 @@ async fn process_tenant_historical_data(
let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0);
let mut batch = Vec::with_capacity(batch_manager.get_current_size());
let mut processed_docs = 0;

let mut failed_docs = 0;
let start_time = Instant::now();

while let Some(result) = cursor.next().await {
let doc = result.context("Failed to get next document from MongoDB cursor")?;
let record_id = doc
.get("_id")
.and_then(|id| id.as_object_id())
.ok_or_else(|| anyhow!("Document is missing _id field"))?;
let statement = doc
.get("statement")
.and_then(|s| s.as_document())
.ok_or_else(|| anyhow!("Document is missing statement field"))?;

let record_id_str = record_id.to_hex();
let mut statement = statement.to_owned();

anonymize_statement(
&mut statement,
&app_state.config.encryption_salt,
&tenant_config.name,
)?;

let statement_str =
to_string(&statement).context("Failed to serialize statement to JSON")?;
batch.push((record_id_str, statement_str));

if batch.len() >= batch_manager.get_current_size() {
let batch_start_time = Instant::now();

insert_into_clickhouse(
&ch_pool,
&batch,
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&app_state.pg_pool,
&tenant_config.name,
&mut batch_manager, // Pass batch_manager as mutable reference
)
.await
.context("Failed to insert batch into ClickHouse")?;

let batch_duration = batch_start_time.elapsed();
batch_manager.adjust_batch_size(batch.len(), batch_duration);
match result {
Ok(doc) => {
let record_id = match doc.get("_id").and_then(|id| id.as_object_id()) {
Some(id) => id.to_hex(),
None => {
warn!("Document is missing _id field, skipping");
failed_docs += 1;
continue;
}
};

let statement = match doc.get("statement").and_then(|s| s.as_document()) {
Some(s) => s.to_owned(),
None => {
warn!(
"Document {} is missing statement field, skipping",
record_id
);
failed_docs += 1;
continue;
}
};

let mut statement = statement.to_owned();

if let Err(e) = anonymize_statement(
&mut statement,
&app_state.config.encryption_salt,
&tenant_config.name,
) {
warn!(
"Failed to anonymize statement for document {}: {}",
record_id, e
);
failed_docs += 1;
continue;
}

processed_docs += batch.len();
info!(
"Processed {} out of {} documents. Current batch size: {}",
processed_docs,
total_docs,
batch_manager.get_current_size()
);
batch.clear();
let statement_str = match to_string(&statement) {
Ok(s) => s,
Err(e) => {
warn!(
"Failed to serialize statement to JSON for document {}: {}",
record_id, e
);
failed_docs += 1;
continue;
}
};

batch.push((record_id, statement_str));

if batch.len() >= batch_manager.get_current_size() {
let batch_start_time = Instant::now();

if let Err(e) = insert_into_clickhouse(
&ch_pool,
&batch,
&tenant_config.clickhouse_db,
&tenant_config.clickhouse_table,
&app_state.pg_pool,
&tenant_config.name,
&mut batch_manager,
)
.await
{
error!("Failed to insert batch into ClickHouse: {}", e);
failed_docs += batch.len();
} else {
let batch_duration = batch_start_time.elapsed();
batch_manager.adjust_batch_size(batch.len(), batch_duration);

processed_docs += batch.len();
info!(
"Processed {} out of {} documents. Current batch size: {}",
processed_docs,
total_docs,
batch_manager.get_current_size()
);
}
batch.clear();
}
}
Err(e) => {
warn!("Error fetching document from cursor: {}", e);
failed_docs += 1;
}
}
}

// Insert any remaining documents
if !batch.is_empty() {
insert_into_clickhouse(
if let Err(e) = insert_into_clickhouse(
&ch_pool,
&batch,
&tenant_config.clickhouse_db,
Expand All @@ -267,19 +299,24 @@ async fn process_tenant_historical_data(
&mut batch_manager,
)
.await
.context("Failed to insert final batch into ClickHouse")?;
processed_docs += batch.len();
{
error!("Failed to insert final batch into ClickHouse: {}", e);
failed_docs += batch.len();
} else {
processed_docs += batch.len();
}
}

let total_duration = start_time.elapsed();
info!(
"Completed processing {} documents in {:?}. Final batch size: {}",
"Completed processing. Total processed: {}, Total failed: {}, Duration: {:?}, Final batch size: {}",
processed_docs,
failed_docs,
total_duration,
batch_manager.get_current_size()
);

if processed_docs < total_docs {
if processed_docs + failed_docs < total_docs {
warn!("Some documents were skipped during processing");
}

Expand All @@ -291,14 +328,55 @@ fn anonymize_statement(
encryption_salt: &str,
tenant_name: &str,
) -> Result<()> {
// Implement your anonymization logic here
// This is a placeholder implementation
// Create a deep copy of the statement
let mut statement_copy = statement.clone();

// Check if all required fields exist
if !statement_copy.contains_key("actor")
|| !statement_copy
.get_document("actor")?
.contains_key("account")
|| !statement_copy
.get_document("actor")?
.get_document("account")?
.contains_key("name")
{
return Err(anyhow!("Statement is missing required fields"));
}

let name = statement_copy
.get_document("actor")?
.get_document("account")?
.get_str("name")?;

let value_to_hash = if name.contains('@') {
name.split('@').next().unwrap_or("")
} else if name.contains(':') {
name.split(':').last().unwrap_or("")
} else {
name
};

if value_to_hash.is_empty() {
return Err(anyhow!("Empty value to hash for name: {}", name));
}

let mut hasher = Sha256::new();
hasher.update(format!("{}{}", encryption_salt, tenant_name));
hasher.update(statement.to_string().as_bytes());
hasher.update(encryption_salt.as_bytes());
hasher.update(tenant_name.as_bytes());
hasher.update(value_to_hash.as_bytes());
let result = hasher.finalize();
statement.insert("anonymized_hash", hex::encode(result));
println!("Anonymized statement---: {}", statement);
let hashed_value = hex::encode(result);

// Update the copy
statement_copy
.get_document_mut("actor")?
.get_document_mut("account")?
.insert("name", hashed_value);

// If we've made it this far without errors, update the original statement
*statement = statement_copy;
println!("Anonymized statement: {:?}", statement);
Ok(())
}

Expand Down Expand Up @@ -435,48 +513,6 @@ async fn insert_batch(
Ok(())
}

async fn deduplicate_clickhouse_data(
ch_pool: &ClickhousePool,
clickhouse_db: &str,
clickhouse_table: &str,
) -> Result<()> {
let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table);
let mut client = ch_pool
.get_handle()
.await
.context("Failed to get client from ClickHouse pool")?;

info!("Processing duplicate data...");

let create_dedup_table_query = format!(
"CREATE TABLE {table}_dedup ENGINE = MergeTree() PARTITION BY toYYYYMM(created_at) PRIMARY KEY id ORDER BY (id, created_at) SETTINGS index_granularity = 8192 AS SELECT id, any(statement) AS statement, any(created_at) AS created_at FROM {table} GROUP BY id",
table = full_table_name
);

let drop_table_query = format!("DROP TABLE {}", full_table_name);
let rename_table_query = format!(
"RENAME TABLE {}_dedup TO {}",
full_table_name, full_table_name
);

client
.execute(create_dedup_table_query.as_str())
.await
.context("Failed to create dedup table in ClickHouse")?;

client
.execute(drop_table_query.as_str())
.await
.context("Failed to drop original table in ClickHouse")?;

client
.execute(rename_table_query.as_str())
.await
.context("Failed to rename dedup table in ClickHouse")?;

Ok(())
}

async fn log_failed_batch(
pg_pool: &PgPool,
tenant_name: &str,
Expand Down

0 comments on commit 7af6f65

Please sign in to comment.