Skip to content

Commit

Permalink
Merge pull request #11 from isankadn/Isanka-patch-7f426b9
Browse files Browse the repository at this point in the history
na
  • Loading branch information
isankadn authored Mar 24, 2024
2 parents 293fb93 + b94d4c2 commit a31cfa7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ name = "mongo-to-clickhouse"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clickhouse-rs = "1.1.0-alpha.1"
Expand Down
12 changes: 8 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{env, error::Error, sync::Arc};
use tokio::task;
use tokio_postgres::{Client as PostgresClient, NoTls};

#[derive(Deserialize, Clone)]
#[derive(Deserialize, Clone, Debug)]
struct TenantConfig {
name: String,
mongo_uri: String,
Expand All @@ -29,7 +29,7 @@ struct TenantConfig {
clickhouse_table: String,
}

#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct AppConfig {
tenants: Vec<TenantConfig>,
postgres_db: String,
Expand All @@ -40,6 +40,7 @@ struct AppConfig {
type PostgresPool = Pool<PostgresConnectionManager<NoTls>>;
type ClickhousePoolType = ClickhousePool;

#[derive(Debug)]
struct AppState {
config: AppConfig,
postgres_pool: PostgresPool,
Expand Down Expand Up @@ -85,22 +86,25 @@ async fn process_tenant_records(

let pg_pool = &app_state.postgres_pool;
let ch_pool = &app_state.clickhouse_pools[pool_index];
println!("pg_pool {:?}", pg_pool);
let pg_conn = pg_pool.get().await?;
println!("pg_conn {:?}", pg_conn);
let row = pg_conn
.query_one(
"SELECT token FROM resume_token WHERE tenant_name = $1 ORDER BY id DESC LIMIT 1",
&[&tenant_config.name],
)
.await
.ok();
println!("row {:?}", row);
let mut options = mongodb::options::ChangeStreamOptions::default();
if let Some(row) = row {
let token_bytes: Vec<u8> = row.get("token");
if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&token_bytes) {
options.resume_after = Some(resume_token);
}
}

println!("app_state {:?}", &app_state.config);
let change_stream_options = ChangeStreamOptions::default();
let mut change_stream = mongo_collection.watch(None, change_stream_options).await?;

Expand Down Expand Up @@ -343,7 +347,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
postgres_pool,
clickhouse_pools,
});

println!("app_state_main {:?}", app_state);
let _ = run(app_state).await;
tokio::signal::ctrl_c()
.await
Expand Down

0 comments on commit a31cfa7

Please sign in to comment.