Skip to content

Commit

Permalink
Merge pull request #36 from isankadn/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
isankadn authored Sep 10, 2024
2 parents 3119f32 + 7b7444e commit 2f7c294
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 56 deletions.
75 changes: 49 additions & 26 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use config::{Config, File};
use futures::stream::StreamExt;
// use log::{error, info, warn};
use mongodb::{
bson::{self, doc, Document},
change_stream::event::{ChangeStreamEvent, ResumeToken},
options::ChangeStreamOptions,
bson::{self, doc, DateTime as BsonDateTime, Document},
Client as MongoClient,
};
use regex::Regex;
Expand All @@ -20,22 +18,21 @@ use std::collections::HashSet;
use std::path::Path;

use std::{env, sync::Arc, time::Duration, time::Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};

use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};

use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::{signal, sync::oneshot, task};

use tokio::{signal, sync::oneshot};

lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1");
static ref BACKSLASH_REGEX_2: Regex = Regex::new(r"\\(?:\\\\)*").expect("Failed to compile BACKSLASH_REGEX_2");
static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3");
}

const MAX_BATCH_SIZE: usize = 10000;
const MAX_BATCH_SIZE: usize = 8000;
const MAX_RETRIES: u32 = 5;
const INITIAL_RETRY_DELAY: u64 = 1000;
const MAX_RETRY_COUNT: usize = 5;
Expand Down Expand Up @@ -255,12 +252,18 @@ impl BatchSizeManager {

fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: Duration) {
let performance = docs_processed as f64 / time_taken.as_secs_f64();
let old_size = self.current_size;

if performance > self.performance_threshold {
self.current_size = (self.current_size * 2).min(self.max_size);
} else {
self.current_size = (self.current_size / 2).max(self.min_size);
}

info!(
"Batch size adjusted: {} -> {}. Performance: {:.2}, Threshold: {:.2}",
old_size, self.current_size, performance, self.performance_threshold
);
}

fn get_current_size(&self) -> usize {
Expand Down Expand Up @@ -361,10 +364,11 @@ async fn process_tenant_historical_data(
.await
.context("Failed to create MongoDB cursor")?;

let mut batch: Vec<(String, String, String)> = Vec::with_capacity(app_state.config.batch_size);
let mut batch: Vec<(String, String, BsonDateTime, String)> =
Vec::with_capacity(app_state.config.batch_size);
let mut batch_start_time = Instant::now();
let mut batch_manager =
BatchSizeManager::new(app_state.config.batch_size, 1, MAX_BATCH_SIZE, 5000.0);
BatchSizeManager::new(app_state.config.batch_size, 5000, MAX_BATCH_SIZE, 10000.0);

let mut processed_docs = 0;
let mut failed_docs = 0;
Expand Down Expand Up @@ -402,7 +406,8 @@ async fn process_tenant_historical_data(
continue;
}
};
println!("timestamp----------: {:?}", timestamp);

// println!("timestamp----------: {:?}", timestamp);
let mut statement = statement.to_owned();
let hashed_value = match anonymize_statement(
&mut statement,
Expand Down Expand Up @@ -432,7 +437,7 @@ async fn process_tenant_historical_data(
}
};

batch.push((record_id, statement_str, hashed_value));
batch.push((record_id, statement_str, *timestamp, hashed_value));

let should_process = batch.len() >= batch_manager.get_current_size()
|| batch_start_time.elapsed() >= Duration::from_secs(5);
Expand Down Expand Up @@ -512,7 +517,7 @@ async fn process_tenant_historical_data(

async fn process_batch(
ch_pool: &ClickhousePool,
batch: &[(String, String, String)],
batch: &[(String, String, BsonDateTime, String)],
tenant_config: &TenantConfig,
resume_token_store: &RocksDBResumeTokenStore,
batch_manager: &mut BatchSizeManager,
Expand Down Expand Up @@ -604,7 +609,7 @@ async fn process_statement(statement: &str) -> Result<String> {

async fn insert_into_clickhouse(
ch_pool: &ClickhousePool,
bulk_insert_values: &[(String, String, String)],
bulk_insert_values: &[(String, String, BsonDateTime, String)],
clickhouse_db: &str,
clickhouse_table: &str,
clickhouse_table_opt_out: &str,
Expand Down Expand Up @@ -689,7 +694,7 @@ async fn log_failed_batch(
clickhouse_db: &str,
clickhouse_table: &str,
clickhouse_table_opt_out: &str,
failed_batch: &[(String, String, String)],
failed_batch: &[(String, String, BsonDateTime, String)],
) -> Result<()> {
let failed_batch_json =
serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?;
Expand All @@ -708,15 +713,18 @@ async fn log_failed_batch(

async fn insert_batch(
ch_pool: &ClickhousePool,
batch: &[(String, String, String)],
batch: &[(String, String, BsonDateTime, String)],
full_table_name: &str,
full_table_name_opt_out: &str,
cached_hashes: &Arc<RwLock<CachedHashSet>>,
) -> Result<()> {
let mut client = ch_pool
.get_handle()
.await
.context("Failed to get client from ClickHouse pool")?;
let mut client = match ch_pool.get_handle().await {
Ok(client) => client,
Err(e) => {
error!("Failed to get ClickHouse client: {:?}", e);
return Err(anyhow!("ClickHouse connection error: {:?}", e));
}
};

let mut insert_data = Vec::new();
let mut insert_data_opt_out = Vec::new();
Expand All @@ -727,16 +735,26 @@ async fn insert_batch(
*cached_hashes_guard.last_updated.read().await
);

for (record_id, statement, hashed_value) in batch {
for (record_id, statement, timestamp, hashed_value) in batch {
let processed_statement = process_statement(statement).await?;
let is_opt_out = cached_hashes_guard.contains(hashed_value).await;

debug!(
"Processed statement: {}, is_opt_out: {}",
processed_statement, is_opt_out
);
// let timestamp: DateTime<Utc> = Utc::now();
// println!("timestamp----------------> {}", timestamp);
let millis = timestamp.timestamp_millis();
let chrono_timestamp: DateTime<Utc> = DateTime::from_timestamp_millis(millis).unwrap();

let formatted = format!("('{}', '{}', now())", record_id, processed_statement);
// Format the timestamp for ClickHouse
let formatted_timestamp = chrono_timestamp.format("%Y-%m-%d %H:%M:%S%.3f");

let formatted = format!(
"('{}', '{}', now(), '{}')",
record_id, processed_statement, formatted_timestamp
);

if is_opt_out {
insert_data_opt_out.push(formatted);
Expand All @@ -750,7 +768,7 @@ async fn insert_batch(
// Insert into full_table_name
if !insert_data.is_empty() {
let insert_query = format!(
"INSERT INTO {} (id, statement, created_at) VALUES {}",
"INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}",
full_table_name,
insert_data.join(", ")
);
Expand All @@ -763,7 +781,7 @@ async fn insert_batch(
// Insert into full_table_name_opt_out
if !insert_data_opt_out.is_empty() {
let insert_query_opt_out = format!(
"INSERT INTO {} (id, statement, created_at) VALUES {}",
"INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}",
full_table_name_opt_out,
insert_data_opt_out.join(", ")
);
Expand Down Expand Up @@ -798,7 +816,7 @@ async fn retry_failed_batches(app_state: Arc<AppState>) -> Result<()> {
let clickhouse_table = parts[3];
let clickhouse_table_opt_out = parts[4];

let failed_batch: Vec<(String, String, String)> =
let failed_batch: Vec<(String, String, BsonDateTime, String)> =
serde_json::from_slice(&value).context("Failed to deserialize failed batch")?;

let tenant_config = app_state
Expand All @@ -810,7 +828,12 @@ async fn retry_failed_batches(app_state: Arc<AppState>) -> Result<()> {

if let Some(tenant_config) = tenant_config {
let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri.as_str());
let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0);
let mut batch_manager = BatchSizeManager::new(
app_state.config.batch_size,
100,
MAX_BATCH_SIZE,
1000.0,
);

match insert_into_clickhouse(
&ch_pool,
Expand Down
Loading

0 comments on commit 2f7c294

Please sign in to comment.