Skip to content

Commit

Permalink
Merge pull request #12 from isankadn/Isanka-patch-f3cba47
Browse files Browse the repository at this point in the history
na
  • Loading branch information
isankadn authored Mar 24, 2024
2 parents a31cfa7 + fb6d8cc commit ca8fa58
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,32 +237,53 @@ async fn deduplicate_clickhouse_data(
}
};

let deduplicate_query = format!(
"
CREATE TABLE {table}_dedup AS
let create_dedup_table_query = format!(
"CREATE TABLE {table}_dedup AS
SELECT * FROM (
SELECT *, row_number() OVER (PARTITION BY id ORDER BY id) AS row_num
FROM {table}
)
WHERE row_num = 1;
DROP TABLE {table};
RENAME TABLE {table}_dedup TO {table};
",
WHERE row_num = 1;",
table = full_table_name
);

match client.execute(deduplicate_query.as_str()).await {
let drop_table_query = format!("DROP TABLE {}", full_table_name);
let rename_table_query = format!(
"RENAME TABLE {}_dedup TO {}",
full_table_name, full_table_name
);

match client.execute(create_dedup_table_query.as_str()).await {
Ok(_) => {
info!("Successfully deduplicated data in ClickHouse");
Ok(())
info!("Successfully created dedup table in ClickHouse");
}
Err(e) => {
error!("Failed to deduplicate data in ClickHouse: {}", e);
Err(e.into())
error!("Failed to create dedup table in ClickHouse: {}", e);
return Err(e.into());
}
}

match client.execute(drop_table_query.as_str()).await {
Ok(_) => {
info!("Successfully dropped original table in ClickHouse");
}
Err(e) => {
error!("Failed to drop original table in ClickHouse: {}", e);
return Err(e.into());
}
}

match client.execute(rename_table_query.as_str()).await {
Ok(_) => {
info!("Successfully renamed dedup table in ClickHouse");
}
Err(e) => {
error!("Failed to rename dedup table in ClickHouse: {}", e);
return Err(e.into());
}
}

Ok(())
}

#[tokio::main]
Expand Down

0 comments on commit ca8fa58

Please sign in to comment.