Skip to content

Commit

Permalink
Merge pull request #9 from isankadn/Isanka-patch-bf420fe
Browse files Browse the repository at this point in the history
na
  • Loading branch information
isankadn authored Mar 21, 2024
2 parents dee38f7 + 5eb96dc commit 6aa9da1
Showing 1 changed file with 0 additions and 42 deletions.
42 changes: 0 additions & 42 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,48 +298,6 @@ async fn insert_into_clickhouse(
}
}

async fn deduplicate_clickhouse_data(
ch_pool: &ClickhousePool,
clickhouse_db: &str,
clickhouse_table: &str,
) -> Result<()> {
let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table);
let mut client = match ch_pool.get_handle().await {
Ok(client) => client,
Err(e) => {
error!("Failed to get client from ClickHouse pool: {}", e);
return Err(e.into());
}
};

let deduplicate_query = format!(
"
CREATE TABLE {table}_dedup AS
SELECT * FROM (
SELECT *, row_number() OVER (PARTITION BY id ORDER BY id) AS row_num
FROM {table}
)
WHERE row_num = 1;
DROP TABLE {table};
RENAME TABLE {table}_dedup TO {table};
",
table = full_table_name
);

match client.execute(deduplicate_query.as_str()).await {
Ok(_) => {
info!("Successfully deduplicated data in ClickHouse");
Ok(())
}
Err(e) => {
error!("Failed to deduplicate data in ClickHouse: {}", e);
Err(e.into())
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
Expand Down

0 comments on commit 6aa9da1

Please sign in to comment.