Skip to content

Commit

Permalink
Merge pull request #7 from isankadn/Isanka-patch-e07acf1
Browse files Browse the repository at this point in the history
historical data: fixing tenancy issue
  • Loading branch information
isankadn authored Mar 21, 2024
2 parents 13df94b + 251d64f commit cd09594
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 25 deletions.
98 changes: 76 additions & 22 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct TenantConfig {
clickhouse_table: String,
}

#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
struct AppConfig {
tenants: Vec<TenantConfig>,
encryption_salt: String,
Expand All @@ -41,20 +41,25 @@ struct AppState {
clickhouse_pools: Vec<ClickhousePoolType>,
}

async fn run(app_state: Arc<AppState>) -> Result<(), Box<dyn Error>> {
let tenants = app_state.config.tenants.clone();

let futures = tenants.into_iter().enumerate().map(|(index, tenant)| {
let app_state = app_state.clone();
let tenant_cloned = tenant.clone();
tokio::spawn(async move {
if let Err(e) = process_tenant_historical_data(tenant, app_state, index).await {
error!("Error processing tenant {}: {}", tenant_cloned.name, e);
}
})
});
async fn run(app_state: Arc<AppState>, tenant_name: String) -> Result<(), Box<dyn Error>> {
let tenant = app_state
.config
.tenants
.iter()
.find(|t| t.name == tenant_name)
.ok_or_else(|| anyhow!("Tenant not found in the configuration"))?;
println!("tenant name {:?}", tenant.name);
if let Err(e) = process_tenant_historical_data(tenant.clone(), Arc::clone(&app_state), 0).await
{
error!("Error processing tenant {}: {}", tenant.name, e);
}

futures::future::join_all(futures).await;
let ch_pool = &app_state.clickhouse_pools[0];
if let Err(e) =
deduplicate_clickhouse_data(ch_pool, &tenant.clickhouse_db, &tenant.clickhouse_table).await
{
error!("Error deduplicating data for tenant {}: {}", tenant.name, e);
}

Ok(())
}
Expand Down Expand Up @@ -89,6 +94,7 @@ async fn process_tenant_historical_data(
match mongo_collection.find(None, options).await {
Ok(mut cursor) => {
while let Some(result) = cursor.next().await {
print!("tent info {:?}", &tenant_config.name);
if let Ok(doc) = result {
let record_id = doc.get("_id").and_then(|id| id.as_object_id());
let statement = doc.get("statement").and_then(|s| s.as_document());
Expand Down Expand Up @@ -217,6 +223,48 @@ async fn insert_into_clickhouse(
Err(anyhow!("Max retries exceeded"))
}

async fn deduplicate_clickhouse_data(
ch_pool: &ClickhousePool,
clickhouse_db: &str,
clickhouse_table: &str,
) -> Result<()> {
let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table);
let mut client = match ch_pool.get_handle().await {
Ok(client) => client,
Err(e) => {
error!("Failed to get client from ClickHouse pool: {}", e);
return Err(e.into());
}
};

let deduplicate_query = format!(
"
CREATE TABLE {table}_dedup AS
SELECT * FROM (
SELECT *, row_number() OVER (PARTITION BY id ORDER BY id) AS row_num
FROM {table}
)
WHERE row_num = 1;
DROP TABLE {table};
RENAME TABLE {table}_dedup TO {table};
",
table = full_table_name
);

match client.execute(deduplicate_query.as_str()).await {
Ok(_) => {
info!("Successfully deduplicated data in ClickHouse");
Ok(())
}
Err(e) => {
error!("Failed to deduplicate data in ClickHouse: {}", e);
Err(e.into())
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
Expand All @@ -233,19 +281,25 @@ async fn main() -> Result<(), Box<dyn Error>> {
};

let config: AppConfig = serde_yaml::from_reader(std::fs::File::open(config_path)?)?;
let tenant_name = env::args()
.nth(1)
.ok_or_else(|| anyhow!("Missing tenant name argument"))?;

let mut clickhouse_pools = Vec::new();
for tenant in &config.tenants {
let clickhouse_pool = ClickhousePool::new(&*tenant.clickhouse_uri);
clickhouse_pools.push(clickhouse_pool);
}
let tenant = config
.tenants
.iter()
.find(|t| t.name == tenant_name)
.ok_or_else(|| anyhow!("Tenant not found in the configuration"))?
.clone();

let clickhouse_pool = ClickhousePool::new(tenant.clickhouse_uri);

let app_state = Arc::new(AppState {
config,
clickhouse_pools,
config: config.clone(),
clickhouse_pools: vec![clickhouse_pool],
});

run(app_state).await?;
run(app_state, tenant_name).await?;

Ok(())
}
51 changes: 48 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ async fn process_tenant_records(
.and_then(|acc| acc.as_document_mut())
{
if let Some(name) = account.get_mut("name") {
let anonymized_name =
anonymize_data(name, &app_state.config.encryption_salt, &tenant_config.name);
let anonymized_name = anonymize_data(
name,
&app_state.config.encryption_salt,
&tenant_config.name,
);
*name = Bson::String(anonymized_name);
info!("<<-- Modified statement: {:?}", statement);
} else {
Expand Down Expand Up @@ -286,7 +289,7 @@ async fn insert_into_clickhouse(
error!("Max retries reached. Giving up.");
return;
} else {
let delay_ms = 1000 * retry_count;
let delay_ms = 1000 * retry_count;
info!("Retrying in {} ms...", delay_ms);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
Expand All @@ -295,6 +298,48 @@ async fn insert_into_clickhouse(
}
}

async fn deduplicate_clickhouse_data(
ch_pool: &ClickhousePool,
clickhouse_db: &str,
clickhouse_table: &str,
) -> Result<()> {
let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table);
let mut client = match ch_pool.get_handle().await {
Ok(client) => client,
Err(e) => {
error!("Failed to get client from ClickHouse pool: {}", e);
return Err(e.into());
}
};

let deduplicate_query = format!(
"
CREATE TABLE {table}_dedup AS
SELECT * FROM (
SELECT *, row_number() OVER (PARTITION BY id ORDER BY id) AS row_num
FROM {table}
)
WHERE row_num = 1;
DROP TABLE {table};
RENAME TABLE {table}_dedup TO {table};
",
table = full_table_name
);

match client.execute(deduplicate_query.as_str()).await {
Ok(_) => {
info!("Successfully deduplicated data in ClickHouse");
Ok(())
}
Err(e) => {
error!("Failed to deduplicate data in ClickHouse: {}", e);
Err(e.into())
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
Expand Down

0 comments on commit cd09594

Please sign in to comment.