diff --git a/src/main.rs b/src/main.rs index ca37d4d..780fe5b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -298,48 +298,6 @@ async fn insert_into_clickhouse( } } -async fn deduplicate_clickhouse_data( - ch_pool: &ClickhousePool, - clickhouse_db: &str, - clickhouse_table: &str, -) -> Result<()> { - let full_table_name = format!("{}.{}", clickhouse_db, clickhouse_table); - let mut client = match ch_pool.get_handle().await { - Ok(client) => client, - Err(e) => { - error!("Failed to get client from ClickHouse pool: {}", e); - return Err(e.into()); - } - }; - - let deduplicate_query = format!( - " - CREATE TABLE {table}_dedup AS - SELECT * FROM ( - SELECT *, row_number() OVER (PARTITION BY id ORDER BY id) AS row_num - FROM {table} - ) - WHERE row_num = 1; - - DROP TABLE {table}; - - RENAME TABLE {table}_dedup TO {table}; - ", - table = full_table_name - ); - - match client.execute(deduplicate_query.as_str()).await { - Ok(_) => { - info!("Successfully deduplicated data in ClickHouse"); - Ok(()) - } - Err(e) => { - error!("Failed to deduplicate data in ClickHouse: {}", e); - Err(e.into()) - } - } -} - #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init();