diff --git a/Cargo.toml b/Cargo.toml index f2a5102..8b213a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,6 @@ name = "mongo-to-clickhouse" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] clickhouse-rs = "1.1.0-alpha.1" diff --git a/src/main.rs b/src/main.rs index 780fe5b..bf51a98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ use std::{env, error::Error, sync::Arc}; use tokio::task; use tokio_postgres::{Client as PostgresClient, NoTls}; -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Debug)] struct TenantConfig { name: String, mongo_uri: String, @@ -29,7 +29,7 @@ struct TenantConfig { clickhouse_table: String, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct AppConfig { tenants: Vec, postgres_db: String, @@ -40,6 +40,7 @@ struct AppConfig { type PostgresPool = Pool>; type ClickhousePoolType = ClickhousePool; +#[derive(Debug)] struct AppState { config: AppConfig, postgres_pool: PostgresPool, @@ -85,7 +86,9 @@ async fn process_tenant_records( let pg_pool = &app_state.postgres_pool; let ch_pool = &app_state.clickhouse_pools[pool_index]; + println!("pg_pool {:?}", pg_pool); let pg_conn = pg_pool.get().await?; + println!("pg_conn {:?}", pg_conn); let row = pg_conn .query_one( "SELECT token FROM resume_token WHERE tenant_name = $1 ORDER BY id DESC LIMIT 1", @@ -93,6 +96,7 @@ async fn process_tenant_records( ) .await .ok(); + println!("row {:?}", row); let mut options = mongodb::options::ChangeStreamOptions::default(); if let Some(row) = row { let token_bytes: Vec = row.get("token"); @@ -100,7 +104,7 @@ async fn process_tenant_records( options.resume_after = Some(resume_token); } } - + println!("app_state {:?}", &app_state.config); let change_stream_options = ChangeStreamOptions::default(); let mut change_stream = mongo_collection.watch(None, change_stream_options).await?; @@ -343,7 +347,7 @@ async fn main() -> Result<(), Box> { postgres_pool, clickhouse_pools, }); - + println!("app_state_main {:?}", app_state); let _ = run(app_state).await; tokio::signal::ctrl_c() .await