From 867877db8c578a9bf873374a21477e7c83e45a5a Mon Sep 17 00:00:00 2001 From: Isanka Date: Thu, 21 Mar 2024 04:18:12 +0900 Subject: [PATCH] adding historical data --- Cargo.lock | 66 ++++++++++ Cargo.toml | 11 ++ historical_data/Cargo.toml | 27 ++++ historical_data/src/main.rs | 251 ++++++++++++++++++++++++++++++++++++ 4 files changed, 355 insertions(+) create mode 100644 historical_data/Cargo.toml create mode 100644 historical_data/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 5908e88..9658c7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,6 +813,31 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "historical_data" +version = "0.1.0" +dependencies = [ + "anyhow", + "bb8", + "bb8-postgres", + "clickhouse-rs", + "config", + "env_logger", + "futures", + "hex", + "log", + "mongodb", + "r2d2", + "r2d2_postgres", + "rayon", + "serde", + "serde_json", + "serde_yaml", + "sha2", + "tokio", + "tokio-postgres", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1079,8 +1104,10 @@ dependencies = [ "mongodb", "r2d2", "r2d2_postgres", + "rayon", "serde", "serde_json", + "serde_yaml", "sha2", "tokio", "tokio-postgres", @@ -1497,6 +1524,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4963ed1bc86e4f3ee217022bd855b297cef07fb9eac5dfa1f788b220b49b3bd" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1771,6 +1818,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "serde_yaml" +version = "0.9.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0623d197252096520c6f2a5e1171ee436e5af99a5d7caa2891e55e61950e6d9" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -2210,6 +2270,12 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 01ae7fc..f2a5102 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,4 @@ +workspace = { members = ["historical_data"] } [package] name = "mongo-to-clickhouse" version = "0.1.0" @@ -23,4 +24,14 @@ anyhow = "1.0.81" bb8 = "0.8.3" bb8-postgres = "0.8.1" serde_json = "1.0.114" +serde_yaml = "0.9.33" +rayon = "1.9.0" + +[[bin]] +name = "mongo-to-clickhouse" +path = "src/main.rs" + +[[bin]] +name = "historical_data" +path = "historical_data/src/main.rs" \ No newline at end of file diff --git a/historical_data/Cargo.toml b/historical_data/Cargo.toml new file mode 100644 index 0000000..a4950b1 --- /dev/null +++ b/historical_data/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "historical_data" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clickhouse-rs = "1.1.0-alpha.1" +config = "0.14.0" +mongodb = { version = "2.8.0", features = ["tokio-runtime"] } +r2d2 = "0.8.10" +tokio = { version = "1.36.0", features = ["full"] } +serde = { version = "1.0.197", features = ["derive"] } +tokio-postgres = "0.7.10" +sha2 = "0.10.8" +hex = "0.4.3" +r2d2_postgres = "0.18.1" +futures = "0.3.30" +log = "0.4.21" +env_logger = "0.11.3" +anyhow = "1.0.81" +bb8 = "0.8.3" +bb8-postgres = "0.8.1" +serde_json = "1.0.114" +serde_yaml = "0.9.33" +rayon = "1.9.0" diff --git a/historical_data/src/main.rs b/historical_data/src/main.rs new file mode 100644 index 0000000..e8da472 --- /dev/null +++ b/historical_data/src/main.rs @@ -0,0 +1,251 @@ +use anyhow::{anyhow, Result}; +use clickhouse_rs::{Client as ClickhouseClient, Pool as ClickhousePool}; +use futures::stream::{self, StreamExt}; +use log::{error, info}; +use mongodb::{ + bson::{self, Bson, Document}, + options::FindOptions, + Client as MongoClient, +}; +use rayon::prelude::*; +use rayon::prelude::*; +use serde::Deserialize; +use serde_json::to_string; +use sha2::{Digest, Sha256}; +use std::{env, error::Error, sync::Arc}; +use tokio::sync::Semaphore; + +#[derive(Deserialize, Clone)] +struct TenantConfig { + name: String, + mongo_uri: String, + mongo_db: String, + mongo_collection: String, + clickhouse_uri: String, + clickhouse_db: String, + clickhouse_table: String, +} + +#[derive(Deserialize)] +struct AppConfig { + tenants: Vec, + encryption_salt: String, + batch_size: u64, + max_concurrency: usize, +} + +type ClickhousePoolType = ClickhousePool; + +struct AppState { + config: AppConfig, + clickhouse_pools: Vec, +} + +async fn run(app_state: Arc) -> Result<(), Box> { + let tenants = app_state.config.tenants.clone(); + + let futures = tenants.into_iter().enumerate().map(|(index, tenant)| { + let app_state = app_state.clone(); + let tenant_cloned = tenant.clone(); + tokio::spawn(async move { + if let Err(e) = process_tenant_historical_data(tenant, app_state, index).await { + error!("Error processing tenant {}: {}", tenant_cloned.name, e); + } + }) + }); + + futures::future::join_all(futures).await; + + Ok(()) +} + +async fn process_tenant_historical_data( + tenant_config: TenantConfig, + app_state: Arc, + pool_index: usize, +) -> Result<()> { + let mongo_client = MongoClient::with_uri_str(&tenant_config.mongo_uri).await?; + let mongo_db = mongo_client.database(&tenant_config.mongo_db); + let mongo_collection = mongo_db.collection::(&tenant_config.mongo_collection); + + let ch_pool = &app_state.clickhouse_pools[pool_index]; + + let total_docs = mongo_collection.estimated_document_count(None).await?; + info!( + "Total documents in {}: {}", + tenant_config.mongo_collection, total_docs + ); + + let batch_size = app_state.config.batch_size; + let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64; + + for batch_index in 0..num_batches { + let skip = batch_index * batch_size; + let options = FindOptions::builder() + .skip(skip) + .limit(batch_size as i64) + .build(); + + match mongo_collection.find(None, options).await { + Ok(mut cursor) => { + while let Some(result) = cursor.next().await { + if let Ok(doc) = result { + let record_id = doc.get("_id").and_then(|id| id.as_object_id()); + let statement = doc.get("statement").and_then(|s| s.as_document()); + + if let (Some(record_id), Some(statement)) = (record_id, statement) { + let record_id_str = record_id.to_hex(); + + let mut statement = statement.to_owned(); + if let Some(actor) = + statement.get_mut("actor").and_then(|a| a.as_document_mut()) + { + if let Some(account) = actor + .get_mut("account") + .and_then(|acc| acc.as_document_mut()) + { + if let Some(name) = account.get_mut("name") { + let anonymized_name = anonymize_data( + name, + &app_state.config.encryption_salt, + &tenant_config.name, + ); + *name = bson::Bson::String(anonymized_name); + } + } + } + + let statement_str = to_string(&statement).unwrap_or_default(); + + let ch_pool = ch_pool.clone(); + let tenant_config = tenant_config.clone(); + tokio::spawn(async move { + if let Err(e) = insert_into_clickhouse( + &ch_pool, + &[(record_id_str, statement_str)], + &tenant_config.clickhouse_db, + &tenant_config.clickhouse_table, + ) + .await + { + error!("Error inserting into ClickHouse: {}", e); + } + }); + } + } + } + } + Err(e) => { + error!("Error retrieving documents from MongoDB: {}", e); + } + } + } + + Ok(()) +} + +fn anonymize_data(data: &bson::Bson, encryption_salt: &str, tenant_name: &str) -> String { + let mut hasher = Sha256::new(); + let salt = format!("{}{}", encryption_salt, tenant_name); + hasher.update(salt.as_bytes()); + + let data_str = match data { + bson::Bson::String(s) => s.as_str(), + bson::Bson::Int32(i) => return i.to_string(), + bson::Bson::Int64(i) => return i.to_string(), + _ => return String::new(), + }; + + hasher.update(data_str.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) +} + +async fn insert_into_clickhouse( + ch_pool: &ClickhousePool, + bulk_insert_values: &[(String, String)], + clickhouse_db: &str, + clickhouse_table: &str, +) -> Result<()> { + let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); + let mut client = match ch_pool.get_handle().await { + Ok(client) => client, + Err(e) => { + error!("Failed to get client from ClickHouse pool: {}", e); + return Err(e.into()); + } + }; + + let max_retries = 3; + let mut retry_count = 0; + + while retry_count < max_retries { + let insert_data: Vec = bulk_insert_values + .iter() + .map(|(record_id, statement)| { + let escaped_statement = statement.replace("'", "\\'"); + format!("('{}', '{}')", record_id, escaped_statement) + }) + .collect(); + + let insert_query = format!( + "INSERT INTO {} (id, statement) VALUES {}", + full_table_name, + insert_data.join(",") + ); + + match client.execute(insert_query.as_str()).await { + Ok(_) => { + info!("Successfully inserted statements into ClickHouse"); + return Ok(()); + } + Err(e) => { + error!("Failed to insert statements into ClickHouse: {}", e); + retry_count += 1; + if retry_count == max_retries { + error!("Max retries reached. Giving up."); + return Err(e.into()); + } else { + let delay_ms = 1000 * retry_count; + info!("Retrying in {} ms...", delay_ms); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + } + } + } + } + + Err(anyhow!("Max retries exceeded")) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + info!(target: "main", "Starting up"); + + let env = env::var("ENV").unwrap_or_else(|_| "dev".into()); + let config_path = match env.as_str() { + "dev" => "config-dev.yml", + "prod" => "config-prod.yml", + _ => { + error!("Unsupported environment: {}", env); + return Err("Unsupported environment".into()); + } + }; + + let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?; + + let mut clickhouse_pools = Vec::new(); + for tenant in &config.tenants { + let clickhouse_pool = ClickhousePool::new(&*tenant.clickhouse_uri); + clickhouse_pools.push(clickhouse_pool); + } + + let app_state = Arc::new(AppState { + config, + clickhouse_pools, + }); + + run(app_state).await?; + + Ok(()) +}