Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #36

Merged
merged 2 commits into from
Sep 10, 2024
Merged

Dev #36

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 49 additions & 26 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use config::{Config, File};
use futures::stream::StreamExt;
// use log::{error, info, warn};
use mongodb::{
bson::{self, doc, Document},
change_stream::event::{ChangeStreamEvent, ResumeToken},
options::ChangeStreamOptions,
bson::{self, doc, DateTime as BsonDateTime, Document},
Client as MongoClient,
};
use regex::Regex;
Expand All @@ -20,22 +18,21 @@ use std::collections::HashSet;
use std::path::Path;

use std::{env, sync::Arc, time::Duration, time::Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};

use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};

use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::{signal, sync::oneshot, task};

use tokio::{signal, sync::oneshot};

lazy_static::lazy_static! {
static ref BACKSLASH_REGEX_1: Regex = Regex::new(r"\\{2}").expect("Failed to compile BACKSLASH_REGEX_1");
static ref BACKSLASH_REGEX_2: Regex = Regex::new(r"\\(?:\\\\)*").expect("Failed to compile BACKSLASH_REGEX_2");
static ref BACKSLASH_REGEX_3: Regex = Regex::new(r"\\{4,}").expect("Failed to compile BACKSLASH_REGEX_3");
}

const MAX_BATCH_SIZE: usize = 10000;
const MAX_BATCH_SIZE: usize = 8000;
const MAX_RETRIES: u32 = 5;
const INITIAL_RETRY_DELAY: u64 = 1000;
const MAX_RETRY_COUNT: usize = 5;
Expand Down Expand Up @@ -255,12 +252,18 @@ impl BatchSizeManager {

fn adjust_batch_size(&mut self, docs_processed: usize, time_taken: Duration) {
let performance = docs_processed as f64 / time_taken.as_secs_f64();
let old_size = self.current_size;

if performance > self.performance_threshold {
self.current_size = (self.current_size * 2).min(self.max_size);
} else {
self.current_size = (self.current_size / 2).max(self.min_size);
}

info!(
"Batch size adjusted: {} -> {}. Performance: {:.2}, Threshold: {:.2}",
old_size, self.current_size, performance, self.performance_threshold
);
}

fn get_current_size(&self) -> usize {
Expand Down Expand Up @@ -361,10 +364,11 @@ async fn process_tenant_historical_data(
.await
.context("Failed to create MongoDB cursor")?;

let mut batch: Vec<(String, String, String)> = Vec::with_capacity(app_state.config.batch_size);
let mut batch: Vec<(String, String, BsonDateTime, String)> =
Vec::with_capacity(app_state.config.batch_size);
let mut batch_start_time = Instant::now();
let mut batch_manager =
BatchSizeManager::new(app_state.config.batch_size, 1, MAX_BATCH_SIZE, 5000.0);
BatchSizeManager::new(app_state.config.batch_size, 5000, MAX_BATCH_SIZE, 10000.0);

let mut processed_docs = 0;
let mut failed_docs = 0;
Expand Down Expand Up @@ -402,7 +406,8 @@ async fn process_tenant_historical_data(
continue;
}
};
println!("timestamp----------: {:?}", timestamp);

// println!("timestamp----------: {:?}", timestamp);
let mut statement = statement.to_owned();
let hashed_value = match anonymize_statement(
&mut statement,
Expand Down Expand Up @@ -432,7 +437,7 @@ async fn process_tenant_historical_data(
}
};

batch.push((record_id, statement_str, hashed_value));
batch.push((record_id, statement_str, *timestamp, hashed_value));

let should_process = batch.len() >= batch_manager.get_current_size()
|| batch_start_time.elapsed() >= Duration::from_secs(5);
Expand Down Expand Up @@ -512,7 +517,7 @@ async fn process_tenant_historical_data(

async fn process_batch(
ch_pool: &ClickhousePool,
batch: &[(String, String, String)],
batch: &[(String, String, BsonDateTime, String)],
tenant_config: &TenantConfig,
resume_token_store: &RocksDBResumeTokenStore,
batch_manager: &mut BatchSizeManager,
Expand Down Expand Up @@ -604,7 +609,7 @@ async fn process_statement(statement: &str) -> Result<String> {

async fn insert_into_clickhouse(
ch_pool: &ClickhousePool,
bulk_insert_values: &[(String, String, String)],
bulk_insert_values: &[(String, String, BsonDateTime, String)],
clickhouse_db: &str,
clickhouse_table: &str,
clickhouse_table_opt_out: &str,
Expand Down Expand Up @@ -689,7 +694,7 @@ async fn log_failed_batch(
clickhouse_db: &str,
clickhouse_table: &str,
clickhouse_table_opt_out: &str,
failed_batch: &[(String, String, String)],
failed_batch: &[(String, String, BsonDateTime, String)],
) -> Result<()> {
let failed_batch_json =
serde_json::to_string(failed_batch).context("Failed to serialize failed batch to JSON")?;
Expand All @@ -708,15 +713,18 @@ async fn log_failed_batch(

async fn insert_batch(
ch_pool: &ClickhousePool,
batch: &[(String, String, String)],
batch: &[(String, String, BsonDateTime, String)],
full_table_name: &str,
full_table_name_opt_out: &str,
cached_hashes: &Arc<RwLock<CachedHashSet>>,
) -> Result<()> {
let mut client = ch_pool
.get_handle()
.await
.context("Failed to get client from ClickHouse pool")?;
let mut client = match ch_pool.get_handle().await {
Ok(client) => client,
Err(e) => {
error!("Failed to get ClickHouse client: {:?}", e);
return Err(anyhow!("ClickHouse connection error: {:?}", e));
}
};

let mut insert_data = Vec::new();
let mut insert_data_opt_out = Vec::new();
Expand All @@ -727,16 +735,26 @@ async fn insert_batch(
*cached_hashes_guard.last_updated.read().await
);

for (record_id, statement, hashed_value) in batch {
for (record_id, statement, timestamp, hashed_value) in batch {
let processed_statement = process_statement(statement).await?;
let is_opt_out = cached_hashes_guard.contains(hashed_value).await;

debug!(
"Processed statement: {}, is_opt_out: {}",
processed_statement, is_opt_out
);
// let timestamp: DateTime<Utc> = Utc::now();
// println!("timestamp----------------> {}", timestamp);
let millis = timestamp.timestamp_millis();
let chrono_timestamp: DateTime<Utc> = DateTime::from_timestamp_millis(millis).unwrap();

let formatted = format!("('{}', '{}', now())", record_id, processed_statement);
// Format the timestamp for ClickHouse
let formatted_timestamp = chrono_timestamp.format("%Y-%m-%d %H:%M:%S%.3f");

let formatted = format!(
"('{}', '{}', now(), '{}')",
record_id, processed_statement, formatted_timestamp
);

if is_opt_out {
insert_data_opt_out.push(formatted);
Expand All @@ -750,7 +768,7 @@ async fn insert_batch(
// Insert into full_table_name
if !insert_data.is_empty() {
let insert_query = format!(
"INSERT INTO {} (id, statement, created_at) VALUES {}",
"INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}",
full_table_name,
insert_data.join(", ")
);
Expand All @@ -763,7 +781,7 @@ async fn insert_batch(
// Insert into full_table_name_opt_out
if !insert_data_opt_out.is_empty() {
let insert_query_opt_out = format!(
"INSERT INTO {} (id, statement, created_at) VALUES {}",
"INSERT INTO {} (id, statement, created_at, timestamp) VALUES {}",
full_table_name_opt_out,
insert_data_opt_out.join(", ")
);
Expand Down Expand Up @@ -798,7 +816,7 @@ async fn retry_failed_batches(app_state: Arc<AppState>) -> Result<()> {
let clickhouse_table = parts[3];
let clickhouse_table_opt_out = parts[4];

let failed_batch: Vec<(String, String, String)> =
let failed_batch: Vec<(String, String, BsonDateTime, String)> =
serde_json::from_slice(&value).context("Failed to deserialize failed batch")?;

let tenant_config = app_state
Expand All @@ -810,7 +828,12 @@ async fn retry_failed_batches(app_state: Arc<AppState>) -> Result<()> {

if let Some(tenant_config) = tenant_config {
let ch_pool = ClickhousePool::new(tenant_config.clickhouse_uri.as_str());
let mut batch_manager = BatchSizeManager::new(10000, 1000, 100000, 5000.0);
let mut batch_manager = BatchSizeManager::new(
app_state.config.batch_size,
100,
MAX_BATCH_SIZE,
1000.0,
);

match insert_into_clickhouse(
&ch_pool,
Expand Down
Loading
Loading