From ea089b494316ebc076dfb5054c82fb6d97879485 Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Thu, 18 Jul 2024 16:37:12 +0200 Subject: [PATCH 1/6] feat: service comm using redis pubsub --- chain/Cargo.toml | 3 + chain/run.sh | 1 + chain/src/app_state.rs | 2 +- chain/src/config.rs | 5 +- chain/src/main.rs | 149 ++++++++++++++++++++++++++--------- chain/src/services/namada.rs | 132 ++++++++++++++++++------------- docker-compose-db.yml | 2 +- orm/src/schema.rs | 48 ++--------- pos/run.sh | 1 + pos/src/config.rs | 3 + pos/src/main.rs | 16 ++++ pos/src/redis_subscriber.rs | 81 +++++++++++++++++++ shared/Cargo.toml | 2 +- shared/src/events.rs | 119 ++++++++++++++++++++++++++++ shared/src/lib.rs | 1 + 15 files changed, 430 insertions(+), 135 deletions(-) create mode 100644 pos/src/redis_subscriber.rs create mode 100644 shared/src/events.rs diff --git a/chain/Cargo.toml b/chain/Cargo.toml index ccc3d1613..f26f7f2b2 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -33,6 +33,9 @@ diesel.workspace = true orm.workspace = true clap-verbosity-flag.workspace = true futures.workspace = true +futures-util.workspace = true +serde.workspace = true +deadpool-redis = "0.15.1" [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/chain/run.sh b/chain/run.sh index 1068af98d..3833c46b9 100755 --- a/chain/run.sh +++ b/chain/run.sh @@ -1,4 +1,5 @@ . ../.env export TENDERMINT_URL export DATABASE_URL +export QUEUE_URL cargo run -- --initial-query-retry-time 5 diff --git a/chain/src/app_state.rs b/chain/src/app_state.rs index 8179fb34a..1b33a673a 100644 --- a/chain/src/app_state.rs +++ b/chain/src/app_state.rs @@ -9,7 +9,7 @@ pub struct AppState { } impl AppState { - pub fn new(db_url: String) -> anyhow::Result { + pub fn new(db_url: &String) -> anyhow::Result { let max_pool_size = env::var("DATABASE_POOL_SIZE") .unwrap_or_else(|_| 8.to_string()) .parse::() diff --git a/chain/src/config.rs b/chain/src/config.rs index 745dd46f7..9c2729514 100644 --- a/chain/src/config.rs +++ b/chain/src/config.rs @@ -15,7 +15,7 @@ impl Display for CargoEnv { } } -#[derive(clap::Parser)] +#[derive(clap::Parser, Clone)] pub struct AppConfig { #[clap(long, env)] pub tendermint_url: String, @@ -23,6 +23,9 @@ pub struct AppConfig { #[clap(long, env)] pub database_url: String, + #[clap(long, env)] + pub queue_url: String, + #[clap(long, env)] pub initial_query_retry_time: u64, diff --git a/chain/src/main.rs b/chain/src/main.rs index cadadac73..7a74c4358 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -9,7 +9,7 @@ use chain::repository; use chain::services::db::get_pos_crawler_state; use chain::services::namada::{ query_all_balances, query_all_bonds_and_unbonds, query_all_proposals, - query_bonds, query_last_block_height, + query_bonds, query_last_block_height, query_tallies, }; use chain::services::{ db as db_service, namada as namada_service, @@ -19,6 +19,7 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; +use namada_sdk::queries::Client; use namada_sdk::time::DateTimeUtc; use orm::migrations::run_migrations; use shared::block::Block; @@ -28,7 +29,11 @@ use shared::crawler::crawl; use shared::crawler_state::ChainCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use shared::id::Id; +use shared::events::{Messages, PosInitializedMsg, PubSub}; use tendermint_rpc::HttpClient; +use tokio::signal; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -65,7 +70,8 @@ async fn main() -> Result<(), MainError> { let client = Arc::new(client); - let app_state = AppState::new(config.database_url).into_db_error()?; + let config = Arc::new(config); + let app_state = AppState::new(&config.database_url).into_db_error()?; let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); // Run migrations @@ -74,24 +80,90 @@ async fn main() -> Result<(), MainError> { .context_db_interact_error() .into_db_error()?; - initial_query(&client, &conn, config.initial_query_retry_time).await?; + let (tx, rx) = oneshot::channel(); + let channel = String::from("channel-0"); - let crawler_state = db_service::get_chain_crawler_state(&conn) - .await - .into_db_error()?; + let (events_tx, events_rx) = mpsc::channel::(100); + let pubsub = PubSub::new(&channel, &config.queue_url); - crawl( - move |block_height| { - crawling_fn( - block_height, - client.clone(), - conn.clone(), - checksums.clone(), - ) - }, - crawler_state.last_processed_block, - ) - .await + let open_handle = pubsub.open(rx, events_tx).unwrap(); + + tokio::select! { + _ = message_processor(events_rx, Arc::clone(&client), Arc::clone(&conn), Arc::clone(&config), checksums) => { + tracing::info!("Message processor exited..."); + } + _ = open_handle => { + tracing::info!("PubSub exited..."); + } + _ = must_exit_handle() => { + tracing::info!("Exiting..."); + tx.send(()).unwrap(); + // open_handle.abort(); + // processor_handle.abort(); + } + } + + Ok(()) +} + +async fn message_processor( + mut rx: mpsc::Receiver, + client: Arc, + conn: Arc, + config: Arc, + checksums: Checksums, +) -> anyhow::Result<()> { + tracing::info!("Starting message processor..."); + while let Some(event) = rx.recv().await { + tracing::info!("Received message: {:?}", event); + match event { + Messages::PosInitialized(data) => { + tracing::info!("Received message: {:?}", data); + let client = Arc::clone(&client); + let conn = Arc::clone(&conn); + let checksums = checksums.clone(); + + initial_query( + Arc::clone(&client), + Arc::clone(&conn), + config.initial_query_retry_time, + ) + .await + .context("Initial query error")?; + + let crawler_state = db_service::get_chain_crawler_state(&conn) + .await + .into_db_error()?; + + crawl( + move |block_height| { + crawling_fn( + block_height, + Arc::clone(&client), + Arc::clone(&conn), + checksums.clone(), + ) + }, + crawler_state.last_processed_block, + ) + .await + .context("Crawling error")?; + } + Messages::Test(_) => { + tracing::info!("Received test message"); + } + } + } + + Ok(()) +} + +fn must_exit_handle() -> JoinHandle<()> { + tokio::spawn(async move { + signal::ctrl_c() + .await + .expect("Error receiving interrupt signal"); + }) } async fn crawling_fn( @@ -157,9 +229,10 @@ async fn crawling_fn( .into_rpc_error()?; let addresses = block.addresses_with_balance_change(native_token); - let balances = namada_service::query_balance(&client, &addresses) - .await - .into_rpc_error()?; + let balances = + namada_service::query_balance(Arc::clone(&client), addresses.clone()) + .await + .into_rpc_error()?; tracing::info!("Updating balance for {} addresses...", addresses.len()); let next_governance_proposal_id = @@ -171,7 +244,7 @@ async fn crawling_fn( tracing::info!("Creating {} governance proposals...", proposals.len()); let proposals_with_tally = - namada_service::query_tallies(&client, proposals) + namada_service::query_tallies(Arc::clone(&client), proposals) .await .into_rpc_error()?; @@ -179,7 +252,9 @@ async fn crawling_fn( tracing::info!("Creating {} governance votes...", proposals_votes.len()); let addresses = block.bond_addresses(); - let bonds = query_bonds(&client, addresses).await.into_rpc_error()?; + let bonds = query_bonds(Arc::clone(&client), addresses) + .await + .into_rpc_error()?; tracing::info!("Updating bonds for {} addresses", bonds.len()); let bonds_updates = bonds @@ -195,7 +270,7 @@ async fn crawling_fn( .collect::>(); let addresses = block.unbond_addresses(); - let unbonds = namada_service::query_unbonds(&client, addresses) + let unbonds = namada_service::query_unbonds(client, addresses) .await .into_rpc_error()?; tracing::info!("Updating unbonds for {} addresses", unbonds.len()); @@ -277,22 +352,22 @@ async fn crawling_fn( }) }) .await - .context_db_interact_error() - .into_db_error()? - .context("Commit block db transaction error") - .into_db_error() + .unwrap() + .unwrap(); + + Ok(()) } async fn initial_query( - client: &HttpClient, - conn: &Object, + client: Arc, + conn: Arc, initial_query_retry_time: u64, ) -> Result<(), MainError> { tracing::info!("Querying initial data..."); let block_height = - query_last_block_height(client).await.into_rpc_error()?; + query_last_block_height(&client).await.into_rpc_error()?; let mut epoch = - namada_service::get_epoch_at_block_height(client, block_height) + namada_service::get_epoch_at_block_height(&client, block_height) .await .into_rpc_error()?; let first_block_in_epoch = namada_service::get_first_block_in_epoch(client) @@ -301,7 +376,7 @@ async fn initial_query( loop { let pos_crawler_state = - get_pos_crawler_state(conn).await.into_db_error(); + get_pos_crawler_state(&conn).await.into_db_error(); match pos_crawler_state { // >= in case epochs are really short @@ -320,17 +395,17 @@ async fn initial_query( sleep(Duration::from_secs(initial_query_retry_time)).await; } - let balances = query_all_balances(client).await.into_rpc_error()?; + let balances = query_all_balances(&client).await.into_rpc_error()?; tracing::info!("Querying bonds and unbonds..."); - let (bonds, unbonds) = query_all_bonds_and_unbonds(client, None, None) + let (bonds, unbonds) = query_all_bonds_and_unbonds(&client, None, None) .await .into_rpc_error()?; tracing::info!("Querying proposals..."); - let proposals = query_all_proposals(client).await.into_rpc_error()?; + let proposals = query_all_proposals(&client).await.into_rpc_error()?; let proposals_with_tally = - namada_service::query_tallies(client, proposals.clone()) + namada_service::query_tallies(Arc::clone(&client), proposals.clone()) .await .into_rpc_error()?; diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index d4816dbca..02d676dcb 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; use std::str::FromStr; +use std::sync::Arc; use anyhow::{anyhow, Context}; use futures::StreamExt; @@ -67,34 +68,40 @@ pub async fn get_epoch_at_block_height( } pub async fn query_balance( - client: &HttpClient, - balance_changes: &HashSet, + client: Arc, + balance_changes: HashSet, ) -> anyhow::Result { Ok(futures::stream::iter(balance_changes) - .filter_map(|balance_change| async move { - tracing::info!( - "Fetching balance change for {} ...", - balance_change.address - ); - - let owner = - NamadaSdkAddress::from_str(&balance_change.address.to_string()) - .context("Failed to parse owner address") - .ok()?; - let token = - NamadaSdkAddress::from_str(&balance_change.token.to_string()) - .context("Failed to parse token address") - .ok()?; - - let amount = rpc::get_token_balance(client, &token, &owner) - .await - .unwrap_or_default(); + .filter_map(|balance_change| { + let client = Arc::clone(&client); + async move { + tracing::info!( + "Fetching balance change for {} ...", + balance_change.address + ); + + let owner = NamadaSdkAddress::from_str( + &balance_change.address.to_string(), + ) + .context("Failed to parse owner address") + .ok()?; + let token = NamadaSdkAddress::from_str( + &balance_change.token.to_string(), + ) + .context("Failed to parse token address") + .ok()?; + + let client: &HttpClient = &client; + let amount = rpc::get_token_balance(client, &token, &owner) + .await + .unwrap_or_default(); - Some(Balance { - owner: Id::from(owner), - token: Id::from(token), - amount: Amount::from(amount), - }) + Some(Balance { + owner: Id::from(owner), + token: Id::from(token), + amount: Amount::from(amount), + }) + } }) .map(futures::future::ready) .buffer_unordered(20) @@ -327,20 +334,23 @@ pub async fn query_next_governance_id( } pub async fn query_bonds( - client: &HttpClient, + client: Arc, addresses: HashSet, -) -> anyhow::Result)>> { +) -> anyhow::Result { let nested_bonds = futures::stream::iter(addresses) - .filter_map(|BondAddresses { source, target }| async move { - // TODO: if this is too slow do not use query_all_bonds_and_unbonds - let (bonds_res, _) = query_all_bonds_and_unbonds( - client, - Some(source.clone()), - Some(target.clone()), - ) - .await - .context("Failed to query all bonds and unbonds") - .ok()?; + .filter_map(|BondAddresses { source, target }| { + let client = Arc::clone(&client); + async move { + let client: &HttpClient = &client; + // TODO: if this is too slow do not use query_all_bonds_and_unbonds + let (bonds, _) = query_all_bonds_and_unbonds( + client, + Some(source), + Some(target), + ) + .await + .context("Failed to query all bonds and unbonds") + .ok()?; let bonds = if !bonds_res.is_empty() { bonds_res @@ -351,7 +361,8 @@ pub async fn query_bonds( vec![(source, target, None)] }; - Some(bonds) + Some(bonds) + } }) .map(futures::future::ready) .buffer_unordered(20) @@ -364,7 +375,7 @@ pub async fn query_bonds( } pub async fn query_unbonds( - client: &HttpClient, + client: Arc, addresses: HashSet, ) -> anyhow::Result { let nested_unbonds = futures::stream::iter(addresses) @@ -374,7 +385,9 @@ pub async fn query_unbonds( let validator = NamadaSdkAddress::from_str(&validator.to_string()) .expect("Failed to parse validator address"); + let client = Arc::clone(&client); async move { + let client: &HttpClient = &client; let unbonds = RPC .vp() .pos() @@ -468,6 +481,7 @@ pub async fn is_steward( ) -> anyhow::Result { let address = NamadaSdkAddress::from_str(&address.to_string()) .context("Failed to parse address")?; + let client: &HttpClient = &client; let is_steward = rpc::is_steward(client, &address).await; @@ -475,16 +489,21 @@ pub async fn is_steward( } pub async fn query_tallies( - client: &HttpClient, + client: Arc, proposals: Vec, ) -> anyhow::Result> { let proposals = futures::stream::iter(proposals) - .filter_map(|proposal| async move { - let is_steward = is_steward(client, &proposal.author).await.ok()?; + .filter_map(|proposal| { + let client = Arc::clone(&client); + async move { + let proposal = proposal.clone(); + let is_steward = + is_steward(&client, &proposal.author).await.ok()?; - let tally_type = TallyType::from(&proposal.r#type, is_steward); + let tally_type = TallyType::from(&proposal.r#type, is_steward); - Some((proposal, tally_type)) + Some((proposal, tally_type)) + } }) .map(futures::future::ready) .buffer_unordered(20) @@ -495,12 +514,16 @@ pub async fn query_tallies( } pub async fn query_all_votes( - client: &HttpClient, + client: Arc, proposals_ids: Vec, -) -> anyhow::Result> { - let votes: Vec> = - futures::stream::iter(proposals_ids) - .filter_map(|proposal_id| async move { +) -> anyhow::Result> { + let votes = futures::stream::iter(proposals_ids) + .filter_map(|proposal_id| { + let client = Arc::clone(&client); + async move { + let proposal_id = proposal_id.clone(); + let client: &HttpClient = &client; + let votes = rpc::query_proposal_votes(client, proposal_id) .await .ok()?; @@ -515,11 +538,12 @@ pub async fn query_all_votes( .collect::>(); Some(votes) - }) - .map(futures::future::ready) - .buffer_unordered(20) - .collect::>() - .await; + } + }) + .map(futures::future::ready) + .buffer_unordered(20) + .collect::>() + .await; anyhow::Ok(votes.iter().flatten().cloned().collect()) } diff --git a/docker-compose-db.yml b/docker-compose-db.yml index 27a49e979..f7e42c8ee 100644 --- a/docker-compose-db.yml +++ b/docker-compose-db.yml @@ -17,7 +17,7 @@ services: dragonfly: image: docker.dragonflydb.io/dragonflydb/dragonfly - command: --logtostderr --cache_mode=true --port 6379 -dbnum 1 --maxmemory=2gb + command: --logtostderr --cache_mode=true --port 6379 -dbnum 1 --maxmemory=4gb ulimits: memlock: -1 ports: diff --git a/orm/src/schema.rs b/orm/src/schema.rs index ba632225a..1b64c3386 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -1,67 +1,35 @@ // @generated automatically by Diesel CLI. pub mod sql_types { - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "crawler_name"))] pub struct CrawlerName; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "governance_kind"))] pub struct GovernanceKind; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "governance_result"))] pub struct GovernanceResult; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "governance_tally_type"))] pub struct GovernanceTallyType; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "transaction_kind"))] pub struct TransactionKind; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "transaction_result"))] pub struct TransactionResult; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "validator_state"))] pub struct ValidatorState; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "vote_kind"))] pub struct VoteKind; } diff --git a/pos/run.sh b/pos/run.sh index df0434ec7..75e42d543 100755 --- a/pos/run.sh +++ b/pos/run.sh @@ -1,4 +1,5 @@ . ../.env export TENDERMINT_URL export DATABASE_URL +export QUEUE_URL cargo run diff --git a/pos/src/config.rs b/pos/src/config.rs index 6e14e0018..21d4b9859 100644 --- a/pos/src/config.rs +++ b/pos/src/config.rs @@ -23,6 +23,9 @@ pub struct AppConfig { #[clap(long, env)] pub database_url: String, + #[clap(long, env)] + pub queue_url: String, + #[command(flatten)] pub verbosity: Verbosity, } diff --git a/pos/src/main.rs b/pos/src/main.rs index acae4ad4d..9552c0eb3 100644 --- a/pos/src/main.rs +++ b/pos/src/main.rs @@ -1,5 +1,6 @@ use std::convert::identity; use std::sync::Arc; +use std::time::Duration; use chrono::{NaiveDateTime, Utc}; use clap::Parser; @@ -16,7 +17,9 @@ use pos::services::namada as namada_service; use shared::crawler; use shared::crawler_state::{CrawlerName, EpochCrawlerState}; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::events::{Messages, PosInitializedMsg, PubSub}; use tendermint_rpc::HttpClient; +use tokio::time::sleep; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -55,6 +58,19 @@ async fn main() -> Result<(), MainError> { .await .into_rpc_error()?; + let channel = String::from("channel-0"); + let pubsub = PubSub::new(&channel, &config.queue_url); + + async { + sleep(Duration::from_secs(5)).await; + let asd = Messages::PosInitialized(PosInitializedMsg { + data: "Hello, world!".to_string(), + }); + + pubsub.publish_message(asd).unwrap(); + } + .await; + crawler::crawl( move |epoch| crawling_fn(epoch, conn.clone(), client.clone()), next_epoch, diff --git a/pos/src/redis_subscriber.rs b/pos/src/redis_subscriber.rs new file mode 100644 index 000000000..5c9b3b12d --- /dev/null +++ b/pos/src/redis_subscriber.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use clap::Parser; +use deadpool_redis::redis::{self, Commands}; +use tokio::{ + sync::{watch, Mutex}, + task::{self, JoinHandle}, +}; + +use crate::config::AppConfig; + +pub fn subscribe(channel: String) -> anyhow::Result> { + let (_tx, rx) = watch::channel(false); + + let handle = tokio::spawn(async move { + let config = AppConfig::parse(); + let conn = redis::Client::open(config.queue_url) + .expect("failed") + .get_connection() + .expect("failed"); + let conn_arc = Arc::new(Mutex::new(conn)); + + tracing::info!("Subscribed to channel: {}", channel); + + let _rx = rx.clone(); + loop { + let conn = conn_arc.clone(); + let channel = channel.clone(); + + tracing::info!("Waiting for message"); + let result = task::spawn_blocking(|| async move { + tracing::info!("Blocking call started"); + let mut conn = conn.lock().await; + + let mut pubsub = conn.as_pubsub(); + pubsub.subscribe(channel.clone()).unwrap(); + + pubsub + .set_read_timeout(Some(std::time::Duration::from_secs(5))) + .unwrap(); + + let msg = pubsub.get_message().unwrap(); + tracing::info!("Received message: {:?}", msg); + + pubsub.unsubscribe(channel).unwrap(); + + msg + }); + + match result.await { + Ok(res) => { + tracing::info!( + "Blocking call result: {:?}", + res.await.get_payload::() + ) + } + Err(e) => println!("Blocking call failed: {}", e), + } + + // Check if we should stop + if *rx.borrow() { + break; + } + } + }); + + Ok(handle) +} + +pub fn publish_message(channel: String, message: String) -> anyhow::Result<()> { + let config = AppConfig::parse(); + let mut conn = redis::Client::open(config.queue_url) + .expect("failed") + .get_connection() + .expect("failed"); + + conn.publish(channel, message.clone())?; + tracing::info!("Published message: {}", message); + + Ok(()) +} diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 96e457161..c630157fe 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -36,4 +36,4 @@ tokio-retry.workspace = true tokio.workspace = true tracing.workspace = true fake.workspace = true -rand.workspace = true \ No newline at end of file +rand.workspace = true diff --git a/shared/src/events.rs b/shared/src/events.rs new file mode 100644 index 000000000..e85473853 --- /dev/null +++ b/shared/src/events.rs @@ -0,0 +1,119 @@ +use std::fmt::Debug; + +use deadpool_redis::redis::{self, Commands}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; + +use serde::{Deserialize, Serialize}; + +pub struct PubSub { + channel: String, + url: String, +} + +impl PubSub { + pub fn new(channel: &String, url: &String) -> Self { + Self { + channel: channel.clone(), + url: url.clone(), + } + } + + pub fn open( + &self, + mut rx: oneshot::Receiver<()>, + tx_events: mpsc::Sender, + ) -> anyhow::Result> + where + M: Message + core::marker::Send + 'static, + { + let url = self.url.clone(); + let channel = self.channel.clone(); + + let handle = tokio::spawn(async move { + let mut conn = redis::Client::open(url) + .expect("failed") + .get_connection() + .expect("failed"); + let mut pubsub = conn.as_pubsub(); + pubsub + // Timemout does not matter as we make infinite loop + .set_read_timeout(Some(std::time::Duration::from_secs(1))) + .unwrap(); + // TODO: do we need to unsubscribe? + pubsub.subscribe(channel).unwrap(); + + loop { + if rx.try_recv().is_ok() { + break; + } + + let msg = pubsub.get_message(); + match msg { + Ok(msg) => { + let msg = Self::parse_msg( + msg.get_payload::().unwrap(), + ) + .unwrap(); + + tx_events.send(msg).await.unwrap(); + } + Err(_e) => { + // TODO: handle error somehow + continue; + } + } + } + }); + + Ok(handle) + } + + pub fn publish_message(&self, message: M) -> anyhow::Result<()> + where + M: Message, + { + tracing::info!("Publishing message: {:?}", message); + let mut conn = redis::Client::open(self.url.clone()) + .expect("failed") + .get_connection() + .expect("failed"); + + let asd = serde_json::to_string(&message).unwrap(); + + conn.publish(&self.channel, asd)?; + + Ok(()) + } + + fn parse_msg(msg: String) -> anyhow::Result + where + M: Message, + { + let obj: M = serde_json::from_str(&msg)?; + + Ok(obj) + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct PosInitializedMsg { + pub data: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct TestMsg { + pub data: String, +} + +pub trait Message: Serialize + for<'a> Deserialize<'a> + Debug {} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Messages { + PosInitialized(PosInitializedMsg), + Test(TestMsg), +} + +impl Message for Messages {} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 512168955..6fd6c42af 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -6,6 +6,7 @@ pub mod checksums; pub mod crawler; pub mod crawler_state; pub mod error; +pub mod events; pub mod gas; pub mod genesis; pub mod header; From 9d68917c84c8d3b0ef68a0283d1473326462bad5 Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Fri, 19 Jul 2024 13:59:12 +0200 Subject: [PATCH 2/6] WIP: send messages between chain and pos - locking connection is a bad idea --- chain/src/main.rs | 63 +++++++++++++------- pos/Cargo.toml | 1 + pos/src/main.rs | 117 +++++++++++++++++++++++++++++++------ shared/src/events.rs | 136 +++++++++++++++++-------------------------- 4 files changed, 193 insertions(+), 124 deletions(-) diff --git a/chain/src/main.rs b/chain/src/main.rs index 7a74c4358..3ff9741e4 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -7,9 +7,10 @@ use chain::app_state::AppState; use chain::config::AppConfig; use chain::repository; use chain::services::db::get_pos_crawler_state; +// TODO remove imports use chain::services::namada::{ query_all_balances, query_all_bonds_and_unbonds, query_all_proposals, - query_bonds, query_last_block_height, query_tallies, + query_bonds, query_last_block_height, }; use chain::services::{ db as db_service, namada as namada_service, @@ -19,7 +20,7 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; -use namada_sdk::queries::Client; +use deadpool_redis::redis; use namada_sdk::time::DateTimeUtc; use orm::migrations::run_migrations; use shared::block::Block; @@ -28,11 +29,11 @@ use shared::checksums::Checksums; use shared::crawler::crawl; use shared::crawler_state::ChainCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::events::{self, TestMsg}; use shared::id::Id; -use shared::events::{Messages, PosInitializedMsg, PubSub}; use tendermint_rpc::HttpClient; use tokio::signal; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::Level; @@ -41,10 +42,9 @@ use tracing_subscriber::FmtSubscriber; #[tokio::main] async fn main() -> Result<(), MainError> { let config = AppConfig::parse(); - let client = HttpClient::new(config.tendermint_url.as_str()).unwrap(); - let mut checksums = Checksums::default(); + for code_path in Checksums::code_paths() { let code = namada_service::query_tx_code_hash(&client, &code_path) .await @@ -62,6 +62,7 @@ async fn main() -> Result<(), MainError> { LevelFilter::Debug => Some(Level::DEBUG), LevelFilter::Trace => Some(Level::TRACE), }; + if let Some(log_level) = log_level { let subscriber = FmtSubscriber::builder().with_max_level(log_level).finish(); @@ -69,10 +70,15 @@ async fn main() -> Result<(), MainError> { } let client = Arc::new(client); - let config = Arc::new(config); let app_state = AppState::new(&config.database_url).into_db_error()?; let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); + let redis_conn = Arc::new(Mutex::new( + redis::Client::open(config.queue_url.clone()) + .expect("failed") + .get_connection() + .expect("failed"), + )); // Run migrations run_migrations(&conn) @@ -81,25 +87,42 @@ async fn main() -> Result<(), MainError> { .into_db_error()?; let (tx, rx) = oneshot::channel(); - let channel = String::from("channel-0"); + let (events_tx, events_rx) = mpsc::channel::(100); - let (events_tx, events_rx) = mpsc::channel::(100); - let pubsub = PubSub::new(&channel, &config.queue_url); + let pos_subscriber = tokio::spawn({ + let redis_conn = Arc::clone(&redis_conn); + async move { + let mut redis_conn = redis_conn.lock().await; + let pubsub = redis_conn.as_pubsub(); - let open_handle = pubsub.open(rx, events_tx).unwrap(); + events::subscribe(rx, events_tx, pubsub, "chain_channel") + .await + .unwrap(); + } + }); + + { + let mut redis_conn = redis_conn.lock().await; + events::publish( + &mut redis_conn, + "pos_channel", + events::Messages::ChainReady(TestMsg { + data: String::from(""), + }), + ) + .unwrap(); + } tokio::select! { + _ = pos_subscriber => { + tracing::info!("Subscriber exited..."); + } _ = message_processor(events_rx, Arc::clone(&client), Arc::clone(&conn), Arc::clone(&config), checksums) => { tracing::info!("Message processor exited..."); } - _ = open_handle => { - tracing::info!("PubSub exited..."); - } _ = must_exit_handle() => { tracing::info!("Exiting..."); tx.send(()).unwrap(); - // open_handle.abort(); - // processor_handle.abort(); } } @@ -107,7 +130,7 @@ async fn main() -> Result<(), MainError> { } async fn message_processor( - mut rx: mpsc::Receiver, + mut rx: mpsc::Receiver, client: Arc, conn: Arc, config: Arc, @@ -117,7 +140,7 @@ async fn message_processor( while let Some(event) = rx.recv().await { tracing::info!("Received message: {:?}", event); match event { - Messages::PosInitialized(data) => { + events::Messages::PosInitialized(data) => { tracing::info!("Received message: {:?}", data); let client = Arc::clone(&client); let conn = Arc::clone(&conn); @@ -149,9 +172,7 @@ async fn message_processor( .await .context("Crawling error")?; } - Messages::Test(_) => { - tracing::info!("Received test message"); - } + _ => {} } } diff --git a/pos/Cargo.toml b/pos/Cargo.toml index cc032cea3..f8f5f636a 100644 --- a/pos/Cargo.toml +++ b/pos/Cargo.toml @@ -29,6 +29,7 @@ diesel.workspace = true diesel_migrations.workspace = true orm.workspace = true clap-verbosity-flag.workspace = true +deadpool-redis = "0.15.1" [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/pos/src/main.rs b/pos/src/main.rs index 9552c0eb3..cdcbf7c43 100644 --- a/pos/src/main.rs +++ b/pos/src/main.rs @@ -6,6 +6,7 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; +use deadpool_redis::redis; use namada_sdk::time::DateTimeUtc; use orm::crawler_state::EpochStateInsertDb; use orm::migrations::run_migrations; @@ -17,8 +18,11 @@ use pos::services::namada as namada_service; use shared::crawler; use shared::crawler_state::{CrawlerName, EpochCrawlerState}; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; -use shared::events::{Messages, PosInitializedMsg, PubSub}; +use shared::events::{self, Messages, PosInitializedMsg}; use tendermint_rpc::HttpClient; +use tokio::signal; +use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -46,6 +50,12 @@ async fn main() -> Result<(), MainError> { let app_state = AppState::new(config.database_url).into_db_error()?; let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); + let redis_conn = Arc::new(Mutex::new( + redis::Client::open(config.queue_url.clone()) + .expect("failed") + .get_connection() + .expect("failed"), + )); // Run migrations run_migrations(&conn) @@ -53,29 +63,98 @@ async fn main() -> Result<(), MainError> { .context_db_interact_error() .into_db_error()?; - // We always start from the current epoch - let next_epoch = namada_service::get_current_epoch(&client.clone()) - .await - .into_rpc_error()?; + let (tx, rx) = oneshot::channel(); + let (events_tx, events_rx) = mpsc::channel::(100); + + let pos_subscriber = tokio::spawn({ + let redis_conn = Arc::clone(&redis_conn); - let channel = String::from("channel-0"); - let pubsub = PubSub::new(&channel, &config.queue_url); + async move { + let mut redis_conn = redis_conn.lock().await; + let pubsub = redis_conn.as_pubsub(); - async { - sleep(Duration::from_secs(5)).await; - let asd = Messages::PosInitialized(PosInitializedMsg { - data: "Hello, world!".to_string(), - }); + events::subscribe(rx, events_tx, pubsub, "pos_channel") + .await + .unwrap(); + } + }); - pubsub.publish_message(asd).unwrap(); + tokio::select! { + _ = pos_subscriber => { + tracing::info!("Subscriber exited..."); + } + _ = message_processor(events_rx, client, conn, redis_conn) => { + tracing::info!("Message processor exited..."); + } + _ = must_exit_handle() => { + tracing::info!("Exiting..."); + tx.send(()).unwrap(); + } } - .await; - crawler::crawl( - move |epoch| crawling_fn(epoch, conn.clone(), client.clone()), - next_epoch, - ) - .await + Ok(()) +} + +fn must_exit_handle() -> JoinHandle<()> { + tokio::spawn(async move { + signal::ctrl_c() + .await + .expect("Error receiving interrupt signal"); + }) +} + +async fn message_processor( + mut rx: mpsc::Receiver, + client: Arc, + conn: Arc, + redis_conn: Arc>, +) -> anyhow::Result<()> { + tracing::info!("Starting message processor..."); + while let Some(event) = rx.recv().await { + tracing::info!("Received message: {:?}", event); + match event { + events::Messages::ChainReady(_) => { + tracing::info!("Chain is ready to process..."); + let client = Arc::clone(&client); + let conn = Arc::clone(&conn); + + tracing::info!("Starting crawler..."); + // We always start from the current epoch + let next_epoch = + namada_service::get_current_epoch(&client.clone()) + .await + .into_rpc_error()?; + + { + tracing::info!("Next epoch to process: {}", next_epoch); + let mut redis_conn = redis_conn.lock().await; + tracing::info!("Publishing PosInitialized message..."); + // TODO: we should wait for first crawl iteration to finish + events::publish( + &mut redis_conn, + "chain_channel", + Messages::PosInitialized(PosInitializedMsg { + data: String::from(""), + }), + ) + .unwrap(); + } + tracing::info!("Published PosInitialized message 222..."); + + crawler::crawl( + move |epoch| { + crawling_fn(epoch, conn.clone(), client.clone()) + }, + next_epoch, + ) + .await + .expect("failed"); + } + _ => {} + } + } + + Ok(()) } async fn crawling_fn( diff --git a/shared/src/events.rs b/shared/src/events.rs index e85473853..179838a70 100644 --- a/shared/src/events.rs +++ b/shared/src/events.rs @@ -1,101 +1,69 @@ -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use deadpool_redis::redis::{self, Commands}; -use tokio::{ - sync::{mpsc, oneshot}, - task::JoinHandle, -}; +use tokio::sync::{mpsc, oneshot, Mutex}; use serde::{Deserialize, Serialize}; -pub struct PubSub { - channel: String, - url: String, +fn parse_msg(msg: String) -> anyhow::Result +where + M: Message, +{ + let obj: M = serde_json::from_str(&msg)?; + + Ok(obj) } -impl PubSub { - pub fn new(channel: &String, url: &String) -> Self { - Self { - channel: channel.clone(), - url: url.clone(), +pub async fn subscribe<'a>( + mut rx: oneshot::Receiver<()>, + tx_events: mpsc::Sender, + mut pubsub: redis::PubSub<'a>, + channel: &str, +) -> anyhow::Result<()> { + pubsub + // Timemout does not matter as we make infinite loop + .set_read_timeout(Some(std::time::Duration::from_secs(1))) + .unwrap(); + // TODO: do we need to unsubscribe? + pubsub.subscribe(channel).unwrap(); + + loop { + if rx.try_recv().is_ok() { + break; } - } - pub fn open( - &self, - mut rx: oneshot::Receiver<()>, - tx_events: mpsc::Sender, - ) -> anyhow::Result> - where - M: Message + core::marker::Send + 'static, - { - let url = self.url.clone(); - let channel = self.channel.clone(); - - let handle = tokio::spawn(async move { - let mut conn = redis::Client::open(url) - .expect("failed") - .get_connection() - .expect("failed"); - let mut pubsub = conn.as_pubsub(); - pubsub - // Timemout does not matter as we make infinite loop - .set_read_timeout(Some(std::time::Duration::from_secs(1))) - .unwrap(); - // TODO: do we need to unsubscribe? - pubsub.subscribe(channel).unwrap(); - - loop { - if rx.try_recv().is_ok() { - break; - } - - let msg = pubsub.get_message(); - match msg { - Ok(msg) => { - let msg = Self::parse_msg( - msg.get_payload::().unwrap(), - ) - .unwrap(); - - tx_events.send(msg).await.unwrap(); - } - Err(_e) => { - // TODO: handle error somehow - continue; - } - } - } - }); + let msg = pubsub.get_message(); + match msg { + Ok(msg) => { + let msg = + // TODO: unwraps + parse_msg(msg.get_payload::().unwrap()).unwrap(); - Ok(handle) + tx_events.send(msg).await.unwrap(); + } + Err(_e) => { + // TODO: handle error somehow + continue; + } + } } - pub fn publish_message(&self, message: M) -> anyhow::Result<()> - where - M: Message, - { - tracing::info!("Publishing message: {:?}", message); - let mut conn = redis::Client::open(self.url.clone()) - .expect("failed") - .get_connection() - .expect("failed"); - - let asd = serde_json::to_string(&message).unwrap(); + Ok(()) +} - conn.publish(&self.channel, asd)?; +pub fn publish( + conn: &mut redis::Connection, + channel: &str, + message: M, +) -> anyhow::Result<()> +where + M: Message, +{ + let message = serde_json::to_string(&message).unwrap(); - Ok(()) - } + conn.publish(&channel, message)?; - fn parse_msg(msg: String) -> anyhow::Result - where - M: Message, - { - let obj: M = serde_json::from_str(&msg)?; - - Ok(obj) - } + Ok(()) } #[derive(Serialize, Deserialize, Debug)] @@ -113,7 +81,7 @@ pub trait Message: Serialize + for<'a> Deserialize<'a> + Debug {} #[derive(Serialize, Deserialize, Debug)] pub enum Messages { PosInitialized(PosInitializedMsg), - Test(TestMsg), + ChainReady(TestMsg), } impl Message for Messages {} From e45fbe3f72188a2743d0e75c2d2f9eb9ec1c6cb5 Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Fri, 26 Jul 2024 09:28:43 +0200 Subject: [PATCH 3/6] wip: comms with streams --- pos/Cargo.toml | 2 + pos/src/app_state.rs | 21 +++++- pos/src/main.rs | 66 ++++++++++------- shared/src/event_store.rs | 148 ++++++++++++++++++++++++++++++++++++++ shared/src/lib.rs | 1 + 5 files changed, 210 insertions(+), 28 deletions(-) create mode 100644 shared/src/event_store.rs diff --git a/pos/Cargo.toml b/pos/Cargo.toml index f8f5f636a..d9b0717f3 100644 --- a/pos/Cargo.toml +++ b/pos/Cargo.toml @@ -30,6 +30,8 @@ diesel_migrations.workspace = true orm.workspace = true clap-verbosity-flag.workspace = true deadpool-redis = "0.15.1" +redis = {version = "0.25.0", features = ["streams"]} +serde_json.workspace = true [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/pos/src/app_state.rs b/pos/src/app_state.rs index 8179fb34a..bc855e13e 100644 --- a/pos/src/app_state.rs +++ b/pos/src/app_state.rs @@ -2,14 +2,16 @@ use std::env; use anyhow::Context; use deadpool_diesel::postgres::{Object, Pool as DbPool}; +use deadpool_redis::{Config, Pool as RedisPool, Runtime}; #[derive(Clone)] pub struct AppState { db: DbPool, + redis: RedisPool, } impl AppState { - pub fn new(db_url: String) -> anyhow::Result { + pub fn new(db_url: String, redis_url: String) -> anyhow::Result { let max_pool_size = env::var("DATABASE_POOL_SIZE") .unwrap_or_else(|_| 8.to_string()) .parse::() @@ -23,7 +25,13 @@ impl AppState { .build() .context("Failed to build Postgres db pool")?; - Ok(Self { db: pool }) + let cfg = Config::from_url(redis_url); + let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + + Ok(Self { + db: pool, + redis: redis_pool, + }) } pub async fn get_db_connection(&self) -> anyhow::Result { @@ -32,4 +40,13 @@ impl AppState { .await .context("Failed to get db connection handle from deadpool") } + + pub async fn get_redis_connection( + &self, + ) -> anyhow::Result { + self.redis + .get() + .await + .context("Failed to get redis connection handle from deadpool") + } } diff --git a/pos/src/main.rs b/pos/src/main.rs index cdcbf7c43..150ba9f5e 100644 --- a/pos/src/main.rs +++ b/pos/src/main.rs @@ -1,12 +1,11 @@ use std::convert::identity; use std::sync::Arc; -use std::time::Duration; use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; -use deadpool_redis::redis; +use deadpool_redis::redis::{self, RedisResult}; use namada_sdk::time::DateTimeUtc; use orm::crawler_state::EpochStateInsertDb; use orm::migrations::run_migrations; @@ -15,9 +14,16 @@ use pos::app_state::AppState; use pos::config::AppConfig; use pos::repository::{self}; use pos::services::namada as namada_service; +use redis::aio::MultiplexedConnection; +use redis::streams::{StreamRangeReply, StreamReadOptions}; +use redis::AsyncCommands; +use redis::{streams::StreamReadReply, Cmd}; use shared::crawler; use shared::crawler_state::{CrawlerName, EpochCrawlerState}; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::event_store::{ + publish, subscribe, Event, PosInitializedEventV1, SupportedEvents, +}; use shared::events::{self, Messages, PosInitializedMsg}; use tendermint_rpc::HttpClient; use tokio::signal; @@ -48,14 +54,11 @@ async fn main() -> Result<(), MainError> { let client = Arc::new(HttpClient::new(config.tendermint_url.as_str()).unwrap()); - let app_state = AppState::new(config.database_url).into_db_error()?; + let app_state = Arc::new( + AppState::new(config.database_url, config.queue_url).into_db_error()?, + ); let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); - let redis_conn = Arc::new(Mutex::new( - redis::Client::open(config.queue_url.clone()) - .expect("failed") - .get_connection() - .expect("failed"), - )); + let (tx, mut rx) = oneshot::channel::<()>(); // Run migrations run_migrations(&conn) @@ -63,33 +66,44 @@ async fn main() -> Result<(), MainError> { .context_db_interact_error() .into_db_error()?; - let (tx, rx) = oneshot::channel(); - let (events_tx, events_rx) = mpsc::channel::(100); + let (events_tx, mut events_rx) = mpsc::channel::(100); - let pos_subscriber = tokio::spawn({ - let redis_conn = Arc::clone(&redis_conn); + let mut redis_conn = + app_state.get_redis_connection().await.into_db_error()?; - async move { - let mut redis_conn = redis_conn.lock().await; - let pubsub = redis_conn.as_pubsub(); + let last_processed_id: String = redis_conn + .get("last_processed_id") + .await + .context_db_interact_error() + .into_db_error()?; - events::subscribe(rx, events_tx, pubsub, "pos_channel") - .await - .unwrap(); + tracing::info!("Last processed id: {}", last_processed_id); + + let subscriber = + tokio::spawn(subscribe(redis_conn, last_processed_id, events_tx, rx)); + + let handler = tokio::spawn(async move { + while let Some(event) = events_rx.recv().await { + tracing::info!("Received event: {}", event); } }); + let redis_conn = app_state.get_redis_connection().await.into_db_error()?; + publish(redis_conn, PosInitializedEventV1) + .await + .into_db_error()?; + tokio::select! { - _ = pos_subscriber => { - tracing::info!("Subscriber exited..."); - } - _ = message_processor(events_rx, client, conn, redis_conn) => { - tracing::info!("Message processor exited..."); - } _ = must_exit_handle() => { - tracing::info!("Exiting..."); + tracing::info!("Received interrupt signal, shutting down..."); tx.send(()).unwrap(); } + _ = handler => { + tracing::info!("Handler finished..."); + } + _ = subscriber => { + tracing::info!("Subscriber finished..."); + } } Ok(()) diff --git a/shared/src/event_store.rs b/shared/src/event_store.rs new file mode 100644 index 000000000..bdb34cc71 --- /dev/null +++ b/shared/src/event_store.rs @@ -0,0 +1,148 @@ +use deadpool_redis::{ + redis::{ + streams::{StreamReadOptions, StreamReadReply}, + AsyncCommands, Value, + }, + Connection, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, oneshot}; + +const EVENT_STORE: &str = "event_store"; +const EVENT: &str = "event"; + +pub async fn subscribe( + mut redis_conn: Connection, + last_processed_id: String, + tx: mpsc::Sender, + mut exit_rx: oneshot::Receiver<()>, +) -> anyhow::Result<()> { + let opts = StreamReadOptions::default().count(1).block(0); + let mut last_processed_id = last_processed_id; + + loop { + let result: Option = redis_conn + .xread_options(&[EVENT_STORE], &[&last_processed_id], &opts) + .await?; + + if let Some(reply) = result { + for stream_key in reply.keys { + for stream_id in stream_key.ids { + let event = + stream_id.map.get(EVENT).expect("event key not found"); + + match event { + Value::Data(data) => { + let event_str = std::str::from_utf8(data) + .expect("event is not valid utf8"); + + let event = T::from_stored(event_str); + + redis_conn + .set("last_processed_id", stream_id.id.clone()) + .await?; + last_processed_id = stream_id.id; + + tx.send(event).await?; + } + _ => {} + } + } + } + } + + if exit_rx.try_recv().is_ok() { + break; + } + } + + Ok(()) +} + +pub async fn publish( + mut redis_conn: Connection, + event: T, +) -> anyhow::Result<()> +where + T: Event, +{ + let stored_event = event.to_stored(); + redis_conn.xadd(EVENT_STORE, "*", &[stored_event]).await?; + + Ok(()) +} + +// Define a macro to create event types with optional fields +macro_rules! define_event { + // Case with fields + ($name:ident, $($field_name:ident: $field_type:ty),*) => { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct $name { + $(pub $field_name: $field_type),* + } + + impl Event for $name { + fn from_stored(value: &str) -> Self { + serde_json::from_str(value).unwrap() + } + } + }; + + + // Case without fields + ($name:ident) => { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct $name; + + impl Event for $name { + fn to_stored(&self) -> (String, String) { + let key = self.name().to_string(); + let value = String::new(); // Return an empty string for structs without fields + (key, value) + } + + fn from_stored(value: &str) -> Self { + serde_json::from_str(value).unwrap() + } + } + }; +} + +pub trait Event: Clone + Serialize + Send + Sync { + fn name(&self) -> &'static str { + // Get the type name at runtime and strip the module path + let full_name = std::any::type_name::(); + // Find the last occurrence of "::" and return the substring after it + match full_name.rsplit("::").next() { + Some(name) => name, + None => full_name, + } + } + + // Default implementation of payload + fn payload(&self) -> Self { + self.clone() + } + + fn to_stored(&self) -> (String, String) { + let key = self.name().to_string(); + let value = serde_json::to_string(&self).unwrap(); + + (key, value) + } + + // Factory method to create an event from stored data + fn from_stored(value: &str) -> Self + where + Self: Sized; +} + +define_event!(PosInitializedEventV1); +define_event!(ChainInitializedEventV1); + +trait Support + +pub enum SupportedEvents { + PosInitializedEventV1(PosInitializedEventV1), + ChainInitializedEventV1(ChainInitializedEventV1), +} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 6fd6c42af..945914c55 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -6,6 +6,7 @@ pub mod checksums; pub mod crawler; pub mod crawler_state; pub mod error; +pub mod event_store; pub mod events; pub mod gas; pub mod genesis; From 85cc8afdf571dc5c5eafb7c0aa371b7f34a163c3 Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Mon, 29 Jul 2024 14:44:17 +0200 Subject: [PATCH 4/6] poc: comms with redis streams --- chain/Cargo.toml | 1 + chain/src/app_state.rs | 21 +++++++- chain/src/main.rs | 103 +++++++++++++++++++++----------------- pos/src/main.rs | 87 ++++++++++++++++---------------- shared/src/event_store.rs | 66 +++++++++++------------- 5 files changed, 150 insertions(+), 128 deletions(-) diff --git a/chain/Cargo.toml b/chain/Cargo.toml index f26f7f2b2..2608c33b0 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -36,6 +36,7 @@ futures.workspace = true futures-util.workspace = true serde.workspace = true deadpool-redis = "0.15.1" +redis = {version = "0.25.0", features = ["streams"]} [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/chain/src/app_state.rs b/chain/src/app_state.rs index 1b33a673a..bc855e13e 100644 --- a/chain/src/app_state.rs +++ b/chain/src/app_state.rs @@ -2,14 +2,16 @@ use std::env; use anyhow::Context; use deadpool_diesel::postgres::{Object, Pool as DbPool}; +use deadpool_redis::{Config, Pool as RedisPool, Runtime}; #[derive(Clone)] pub struct AppState { db: DbPool, + redis: RedisPool, } impl AppState { - pub fn new(db_url: &String) -> anyhow::Result { + pub fn new(db_url: String, redis_url: String) -> anyhow::Result { let max_pool_size = env::var("DATABASE_POOL_SIZE") .unwrap_or_else(|_| 8.to_string()) .parse::() @@ -23,7 +25,13 @@ impl AppState { .build() .context("Failed to build Postgres db pool")?; - Ok(Self { db: pool }) + let cfg = Config::from_url(redis_url); + let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + + Ok(Self { + db: pool, + redis: redis_pool, + }) } pub async fn get_db_connection(&self) -> anyhow::Result { @@ -32,4 +40,13 @@ impl AppState { .await .context("Failed to get db connection handle from deadpool") } + + pub async fn get_redis_connection( + &self, + ) -> anyhow::Result { + self.redis + .get() + .await + .context("Failed to get redis connection handle from deadpool") + } } diff --git a/chain/src/main.rs b/chain/src/main.rs index 3ff9741e4..de77e3322 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -20,7 +20,7 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; -use deadpool_redis::redis; +use deadpool_redis::redis::{self, AsyncCommands}; use namada_sdk::time::DateTimeUtc; use orm::migrations::run_migrations; use shared::block::Block; @@ -29,6 +29,9 @@ use shared::checksums::Checksums; use shared::crawler::crawl; use shared::crawler_state::ChainCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::event_store::{ + publish, subscribe, ChainInitializedEventV1, PosEvents, Test, +}; use shared::events::{self, TestMsg}; use shared::id::Id; use tendermint_rpc::HttpClient; @@ -70,15 +73,10 @@ async fn main() -> Result<(), MainError> { } let client = Arc::new(client); - let config = Arc::new(config); - let app_state = AppState::new(&config.database_url).into_db_error()?; - let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); - let redis_conn = Arc::new(Mutex::new( - redis::Client::open(config.queue_url.clone()) - .expect("failed") - .get_connection() - .expect("failed"), - )); + let app_state = Arc::new( + AppState::new(config.database_url, config.queue_url).into_db_error()?, + ); + let conn = app_state.get_db_connection().await.into_db_error()?; // Run migrations run_migrations(&conn) @@ -87,69 +85,78 @@ async fn main() -> Result<(), MainError> { .into_db_error()?; let (tx, rx) = oneshot::channel(); - let (events_tx, events_rx) = mpsc::channel::(100); + let (events_tx, events_rx) = mpsc::channel::(100); - let pos_subscriber = tokio::spawn({ - let redis_conn = Arc::clone(&redis_conn); - async move { - let mut redis_conn = redis_conn.lock().await; - let pubsub = redis_conn.as_pubsub(); + let mut redis_conn = + app_state.get_redis_connection().await.into_db_error()?; - events::subscribe(rx, events_tx, pubsub, "chain_channel") - .await - .unwrap(); - } - }); - - { - let mut redis_conn = redis_conn.lock().await; - events::publish( - &mut redis_conn, - "pos_channel", - events::Messages::ChainReady(TestMsg { - data: String::from(""), - }), - ) - .unwrap(); - } + let last_processed_id: String = redis_conn + .get("chain_last_processed_id") + .await + .ok() + .unwrap_or("0".to_string()); + + let subscriber = tokio::spawn(subscribe( + redis_conn, + ("chain_last_processed_id".to_string(), last_processed_id), + events_tx, + rx, + )); + + let handler = tokio::spawn(message_processor( + events_rx, + Arc::clone(&client), + Arc::clone(&app_state), + config.initial_query_retry_time, + checksums.clone(), + )); + + let redis_conn = app_state.get_redis_connection().await.into_db_error()?; + publish( + redis_conn, + PosEvents::ChainInitializedEventV1(ChainInitializedEventV1), + ) + .await + .into_db_error()?; tokio::select! { - _ = pos_subscriber => { - tracing::info!("Subscriber exited..."); - } - _ = message_processor(events_rx, Arc::clone(&client), Arc::clone(&conn), Arc::clone(&config), checksums) => { - tracing::info!("Message processor exited..."); - } _ = must_exit_handle() => { - tracing::info!("Exiting..."); + tracing::info!("Received interrupt signal, shutting down..."); tx.send(()).unwrap(); } + _ = handler => { + tracing::info!("Handler finished..."); + } + _ = subscriber => { + tracing::info!("Subscriber finished..."); + } } Ok(()) } async fn message_processor( - mut rx: mpsc::Receiver, + mut rx: mpsc::Receiver, client: Arc, - conn: Arc, - config: Arc, + app_state: Arc, + initial_query_retry_time: u64, checksums: Checksums, ) -> anyhow::Result<()> { tracing::info!("Starting message processor..."); while let Some(event) = rx.recv().await { tracing::info!("Received message: {:?}", event); match event { - events::Messages::PosInitialized(data) => { + PosEvents::PosInitializedEventV1(data) => { tracing::info!("Received message: {:?}", data); let client = Arc::clone(&client); - let conn = Arc::clone(&conn); + let conn = Arc::new(app_state.get_db_connection().await?); + let redis_conn = app_state.get_redis_connection().await?; let checksums = checksums.clone(); initial_query( Arc::clone(&client), Arc::clone(&conn), - config.initial_query_retry_time, + initial_query_retry_time, ) .await .context("Initial query error")?; @@ -158,6 +165,10 @@ async fn message_processor( .await .into_db_error()?; + publish(redis_conn, PosEvents::Test(Test)) + .await + .into_db_error()?; + crawl( move |block_height| { crawling_fn( diff --git a/pos/src/main.rs b/pos/src/main.rs index 150ba9f5e..00b42aa1f 100644 --- a/pos/src/main.rs +++ b/pos/src/main.rs @@ -22,7 +22,8 @@ use shared::crawler; use shared::crawler_state::{CrawlerName, EpochCrawlerState}; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use shared::event_store::{ - publish, subscribe, Event, PosInitializedEventV1, SupportedEvents, + publish, subscribe, Event, PosEvents, PosInitializedEventV1, + SupportedEvents, }; use shared::events::{self, Messages, PosInitializedMsg}; use tendermint_rpc::HttpClient; @@ -57,8 +58,8 @@ async fn main() -> Result<(), MainError> { let app_state = Arc::new( AppState::new(config.database_url, config.queue_url).into_db_error()?, ); - let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); - let (tx, mut rx) = oneshot::channel::<()>(); + let conn = app_state.get_db_connection().await.into_db_error()?; + let (tx, rx) = oneshot::channel::<()>(); // Run migrations run_migrations(&conn) @@ -66,32 +67,37 @@ async fn main() -> Result<(), MainError> { .context_db_interact_error() .into_db_error()?; - let (events_tx, mut events_rx) = mpsc::channel::(100); + let (events_tx, events_rx) = mpsc::channel::(100); let mut redis_conn = app_state.get_redis_connection().await.into_db_error()?; let last_processed_id: String = redis_conn - .get("last_processed_id") + .get("pos_last_processed_id") .await - .context_db_interact_error() - .into_db_error()?; - - tracing::info!("Last processed id: {}", last_processed_id); - - let subscriber = - tokio::spawn(subscribe(redis_conn, last_processed_id, events_tx, rx)); - - let handler = tokio::spawn(async move { - while let Some(event) = events_rx.recv().await { - tracing::info!("Received event: {}", event); - } - }); - - let redis_conn = app_state.get_redis_connection().await.into_db_error()?; - publish(redis_conn, PosInitializedEventV1) - .await - .into_db_error()?; + .ok() + .unwrap_or("0".to_string()); + + let subscriber = tokio::spawn(subscribe( + redis_conn, + ("pos_last_processed_id".to_string(), last_processed_id), + events_tx, + rx, + )); + + let handler = tokio::spawn(message_processor( + events_rx, + Arc::clone(&client), + Arc::clone(&app_state), + )); + + // let redis_conn = app_state.get_redis_connection().await.into_db_error()?; + // publish( + // redis_conn, + // PosEvents::PosInitializedEventV1(PosInitializedEventV1), + // ) + // .await + // .into_db_error()?; tokio::select! { _ = must_exit_handle() => { @@ -118,19 +124,19 @@ fn must_exit_handle() -> JoinHandle<()> { } async fn message_processor( - mut rx: mpsc::Receiver, + mut rx: mpsc::Receiver, client: Arc, - conn: Arc, - redis_conn: Arc>, + app_state: Arc, ) -> anyhow::Result<()> { tracing::info!("Starting message processor..."); while let Some(event) = rx.recv().await { tracing::info!("Received message: {:?}", event); match event { - events::Messages::ChainReady(_) => { + PosEvents::ChainInitializedEventV1(_) => { tracing::info!("Chain is ready to process..."); let client = Arc::clone(&client); - let conn = Arc::clone(&conn); + let conn = Arc::new(app_state.get_db_connection().await?); + let redis_conn = app_state.get_redis_connection().await?; tracing::info!("Starting crawler..."); // We always start from the current epoch @@ -140,33 +146,28 @@ async fn message_processor( .into_rpc_error()?; { - tracing::info!("Next epoch to process: {}", next_epoch); - let mut redis_conn = redis_conn.lock().await; - tracing::info!("Publishing PosInitialized message..."); // TODO: we should wait for first crawl iteration to finish - events::publish( - &mut redis_conn, - "chain_channel", - Messages::PosInitialized(PosInitializedMsg { - data: String::from(""), - }), + publish( + redis_conn, + PosEvents::PosInitializedEventV1(PosInitializedEventV1), ) - .unwrap(); + .await?; } - tracing::info!("Published PosInitialized message 222..."); - crawler::crawl( + tokio::spawn(crawler::crawl( move |epoch| { crawling_fn(epoch, conn.clone(), client.clone()) }, next_epoch, - ) - .await - .expect("failed"); + )); + } + PosEvents::Test(_) => { + tracing::info!("!!!!!Test message received...!!!!!"); } _ => {} } } + tracing::info!("Message processor finished..."); Ok(()) } diff --git a/shared/src/event_store.rs b/shared/src/event_store.rs index bdb34cc71..82c355ade 100644 --- a/shared/src/event_store.rs +++ b/shared/src/event_store.rs @@ -13,12 +13,15 @@ const EVENT: &str = "event"; pub async fn subscribe( mut redis_conn: Connection, - last_processed_id: String, + (last_processed_key, last_processed_val): (String, String), tx: mpsc::Sender, mut exit_rx: oneshot::Receiver<()>, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +where + T: SupportedEvents, +{ let opts = StreamReadOptions::default().count(1).block(0); - let mut last_processed_id = last_processed_id; + let mut last_processed_id = last_processed_val.clone(); loop { let result: Option = redis_conn @@ -27,7 +30,8 @@ pub async fn subscribe( if let Some(reply) = result { for stream_key in reply.keys { - for stream_id in stream_key.ids { + for stream_id in stream_key.clone().ids { + tracing::info!("Processing event: {:?}", stream_key); let event = stream_id.map.get(EVENT).expect("event key not found"); @@ -39,11 +43,11 @@ pub async fn subscribe( let event = T::from_stored(event_str); redis_conn - .set("last_processed_id", stream_id.id.clone()) + .set(&last_processed_key, stream_id.id.clone()) .await?; last_processed_id = stream_id.id; - tx.send(event).await?; + tx.send(event).await.expect("send failed"); } _ => {} } @@ -64,7 +68,7 @@ pub async fn publish( event: T, ) -> anyhow::Result<()> where - T: Event, + T: SupportedEvents, { let stored_event = event.to_stored(); redis_conn.xadd(EVENT_STORE, "*", &[stored_event]).await?; @@ -80,12 +84,6 @@ macro_rules! define_event { pub struct $name { $(pub $field_name: $field_type),* } - - impl Event for $name { - fn from_stored(value: &str) -> Self { - serde_json::from_str(value).unwrap() - } - } }; @@ -94,17 +92,7 @@ macro_rules! define_event { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct $name; - impl Event for $name { - fn to_stored(&self) -> (String, String) { - let key = self.name().to_string(); - let value = String::new(); // Return an empty string for structs without fields - (key, value) - } - - fn from_stored(value: &str) -> Self { - serde_json::from_str(value).unwrap() - } - } + impl Event for $name {} }; } @@ -123,26 +111,30 @@ pub trait Event: Clone + Serialize + Send + Sync { fn payload(&self) -> Self { self.clone() } +} + +define_event!(PosInitializedEventV1); +define_event!(ChainInitializedEventV1); +define_event!(Test); + +pub trait SupportedEvents: for<'a> Deserialize<'a> + Serialize { + fn from_stored(value: &str) -> Self { + serde_json::from_str(value).unwrap() + } fn to_stored(&self) -> (String, String) { - let key = self.name().to_string(); let value = serde_json::to_string(&self).unwrap(); - (key, value) + ("event".to_string(), value) } - - // Factory method to create an event from stored data - fn from_stored(value: &str) -> Self - where - Self: Sized; } -define_event!(PosInitializedEventV1); -define_event!(ChainInitializedEventV1); - -trait Support - -pub enum SupportedEvents { +// TODO: move this to POS module +#[derive(Serialize, Deserialize, Debug)] +pub enum PosEvents { PosInitializedEventV1(PosInitializedEventV1), ChainInitializedEventV1(ChainInitializedEventV1), + Test(Test), } + +impl SupportedEvents for PosEvents {} From 88ff90e02a81833492cde76141004d0484a90783 Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Thu, 5 Sep 2024 11:06:45 +0200 Subject: [PATCH 5/6] fix: after rebase --- chain/src/services/namada.rs | 24 +++++----- pos/src/main.rs | 1 - shared/Cargo.toml | 9 ++-- shared/src/events.rs | 87 ------------------------------------ shared/src/lib.rs | 1 - 5 files changed, 18 insertions(+), 104 deletions(-) delete mode 100644 shared/src/events.rs diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 02d676dcb..3d0c2775a 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -343,7 +343,7 @@ pub async fn query_bonds( async move { let client: &HttpClient = &client; // TODO: if this is too slow do not use query_all_bonds_and_unbonds - let (bonds, _) = query_all_bonds_and_unbonds( + let (bonds_res, _) = query_all_bonds_and_unbonds( client, Some(source), Some(target), @@ -352,14 +352,16 @@ pub async fn query_bonds( .context("Failed to query all bonds and unbonds") .ok()?; - let bonds = if !bonds_res.is_empty() { - bonds_res - .into_iter() - .map(|bond| (source.clone(), target.clone(), Some(bond))) - .collect::>() - } else { - vec![(source, target, None)] - }; + let bonds = if !bonds_res.is_empty() { + bonds_res + .into_iter() + .map(|bond| { + (source.clone(), target.clone(), Some(bond)) + }) + .collect::>() + } else { + vec![(source, target, None)] + }; Some(bonds) } @@ -369,7 +371,7 @@ pub async fn query_bonds( .collect::>() .await; - let bonds = nested_bonds.iter().flatten().cloned().collect(); + let bonds = nested_bonds.iter().flatten().collect(); anyhow::Ok(bonds) } @@ -516,7 +518,7 @@ pub async fn query_tallies( pub async fn query_all_votes( client: Arc, proposals_ids: Vec, -) -> anyhow::Result> { +) -> anyhow::Result> { let votes = futures::stream::iter(proposals_ids) .filter_map(|proposal_id| { let client = Arc::clone(&client); diff --git a/pos/src/main.rs b/pos/src/main.rs index 00b42aa1f..3908c6def 100644 --- a/pos/src/main.rs +++ b/pos/src/main.rs @@ -25,7 +25,6 @@ use shared::event_store::{ publish, subscribe, Event, PosEvents, PosInitializedEventV1, SupportedEvents, }; -use shared::events::{self, Messages, PosInitializedMsg}; use tendermint_rpc::HttpClient; use tokio::signal; use tokio::sync::{mpsc, oneshot, Mutex}; diff --git a/shared/Cargo.toml b/shared/Cargo.toml index c630157fe..37456c8c6 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -15,17 +15,20 @@ path = "src/lib.rs" [dependencies] anyhow.workspace = true async-stream.workspace = true -bimap.workspace = true bigdecimal.workspace = true +bimap.workspace = true +deadpool-redis = "0.15.1" +fake.workspace = true futures-core.workspace = true futures-util.workspace = true futures.workspace = true namada_core.workspace = true namada_governance.workspace = true -namada_proof_of_stake.workspace = true namada_ibc.workspace = true +namada_proof_of_stake.workspace = true namada_sdk.workspace = true namada_tx.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true subtle-encoding.workspace = true @@ -35,5 +38,3 @@ thiserror.workspace = true tokio-retry.workspace = true tokio.workspace = true tracing.workspace = true -fake.workspace = true -rand.workspace = true diff --git a/shared/src/events.rs b/shared/src/events.rs deleted file mode 100644 index 179838a70..000000000 --- a/shared/src/events.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::{fmt::Debug, sync::Arc}; - -use deadpool_redis::redis::{self, Commands}; -use tokio::sync::{mpsc, oneshot, Mutex}; - -use serde::{Deserialize, Serialize}; - -fn parse_msg(msg: String) -> anyhow::Result -where - M: Message, -{ - let obj: M = serde_json::from_str(&msg)?; - - Ok(obj) -} - -pub async fn subscribe<'a>( - mut rx: oneshot::Receiver<()>, - tx_events: mpsc::Sender, - mut pubsub: redis::PubSub<'a>, - channel: &str, -) -> anyhow::Result<()> { - pubsub - // Timemout does not matter as we make infinite loop - .set_read_timeout(Some(std::time::Duration::from_secs(1))) - .unwrap(); - // TODO: do we need to unsubscribe? - pubsub.subscribe(channel).unwrap(); - - loop { - if rx.try_recv().is_ok() { - break; - } - - let msg = pubsub.get_message(); - match msg { - Ok(msg) => { - let msg = - // TODO: unwraps - parse_msg(msg.get_payload::().unwrap()).unwrap(); - - tx_events.send(msg).await.unwrap(); - } - Err(_e) => { - // TODO: handle error somehow - continue; - } - } - } - - Ok(()) -} - -pub fn publish( - conn: &mut redis::Connection, - channel: &str, - message: M, -) -> anyhow::Result<()> -where - M: Message, -{ - let message = serde_json::to_string(&message).unwrap(); - - conn.publish(&channel, message)?; - - Ok(()) -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct PosInitializedMsg { - pub data: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct TestMsg { - pub data: String, -} - -pub trait Message: Serialize + for<'a> Deserialize<'a> + Debug {} - -#[derive(Serialize, Deserialize, Debug)] -pub enum Messages { - PosInitialized(PosInitializedMsg), - ChainReady(TestMsg), -} - -impl Message for Messages {} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 945914c55..a02eb66bb 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -7,7 +7,6 @@ pub mod crawler; pub mod crawler_state; pub mod error; pub mod event_store; -pub mod events; pub mod gas; pub mod genesis; pub mod header; From b814a8d980023e319c82e11f0691da22be3cc96e Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Thu, 5 Sep 2024 15:22:33 +0200 Subject: [PATCH 6/6] feat: poc events send to webserver --- chain/src/main.rs | 56 ++++++++++++++++----------- chain/src/services/namada.rs | 47 ++++++++++++++++++++--- pos/src/main.rs | 19 ++-------- shared/src/event_store.rs | 25 ++++++------ webserver/Cargo.toml | 4 +- webserver/src/app.rs | 20 ++++++++-- webserver/src/handler/chain.rs | 69 +++++++++++++++++++++------------- webserver/src/main.rs | 33 +++++++++++++++- webserver/src/state/common.rs | 14 ++++++- 9 files changed, 198 insertions(+), 89 deletions(-) diff --git a/chain/src/main.rs b/chain/src/main.rs index de77e3322..f0f47f72f 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -20,9 +20,10 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; -use deadpool_redis::redis::{self, AsyncCommands}; +use deadpool_redis::redis::AsyncCommands; use namada_sdk::time::DateTimeUtc; use orm::migrations::run_migrations; +use redis::aio::MultiplexedConnection; use shared::block::Block; use shared::block_result::BlockResult; use shared::checksums::Checksums; @@ -30,13 +31,12 @@ use shared::crawler::crawl; use shared::crawler_state::ChainCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use shared::event_store::{ - publish, subscribe, ChainInitializedEventV1, PosEvents, Test, + publish, subscribe, ChainInitializedEventV1, ChainProcessed, PosEvents, }; -use shared::events::{self, TestMsg}; use shared::id::Id; use tendermint_rpc::HttpClient; use tokio::signal; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::Level; @@ -112,6 +112,7 @@ async fn main() -> Result<(), MainError> { )); let redis_conn = app_state.get_redis_connection().await.into_db_error()?; + publish( redis_conn, PosEvents::ChainInitializedEventV1(ChainInitializedEventV1), @@ -142,40 +143,35 @@ async fn message_processor( initial_query_retry_time: u64, checksums: Checksums, ) -> anyhow::Result<()> { - tracing::info!("Starting message processor..."); while let Some(event) = rx.recv().await { - tracing::info!("Received message: {:?}", event); + let app_state = Arc::clone(&app_state); + match event { - PosEvents::PosInitializedEventV1(data) => { - tracing::info!("Received message: {:?}", data); + PosEvents::PosInitializedEventV1(_data) => { let client = Arc::clone(&client); - let conn = Arc::new(app_state.get_db_connection().await?); - let redis_conn = app_state.get_redis_connection().await?; let checksums = checksums.clone(); + let conn = app_state.get_db_connection().await?; initial_query( Arc::clone(&client), - Arc::clone(&conn), + conn, initial_query_retry_time, ) .await .context("Initial query error")?; + let conn = app_state.get_db_connection().await?; let crawler_state = db_service::get_chain_crawler_state(&conn) .await .into_db_error()?; - publish(redis_conn, PosEvents::Test(Test)) - .await - .into_db_error()?; - crawl( move |block_height| { crawling_fn( block_height, Arc::clone(&client), - Arc::clone(&conn), checksums.clone(), + app_state.clone(), ) }, crawler_state.last_processed_block, @@ -183,9 +179,13 @@ async fn message_processor( .await .context("Crawling error")?; } + PosEvents::Test(_) => { + tracing::info!("Received test event"); + } _ => {} } } + tracing::info!("Message processor finished..."); Ok(()) } @@ -201,11 +201,13 @@ fn must_exit_handle() -> JoinHandle<()> { async fn crawling_fn( block_height: u32, client: Arc, - conn: Arc, checksums: Checksums, + app_state: Arc, ) -> Result<(), MainError> { let should_process = can_process(block_height, client.clone()).await?; + let conn = app_state.get_db_connection().await.into_db_error()?; + if !should_process { let timestamp = Utc::now().naive_utc(); update_crawler_timestamp(&conn, timestamp).await?; @@ -387,12 +389,23 @@ async fn crawling_fn( .unwrap() .unwrap(); + let redis_conn = app_state.get_redis_connection().await.into_db_error()?; + + publish( + redis_conn, + PosEvents::ChainProcessed(ChainProcessed { + block: block_height, + }), + ) + .await + .into_db_error()?; + Ok(()) } async fn initial_query( client: Arc, - conn: Arc, + conn: Object, initial_query_retry_time: u64, ) -> Result<(), MainError> { tracing::info!("Querying initial data..."); @@ -402,9 +415,10 @@ async fn initial_query( namada_service::get_epoch_at_block_height(&client, block_height) .await .into_rpc_error()?; - let first_block_in_epoch = namada_service::get_first_block_in_epoch(client) - .await - .into_rpc_error()?; + let first_block_in_epoch = + namada_service::get_first_block_in_epoch(&client) + .await + .into_rpc_error()?; loop { let pos_crawler_state = diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 3d0c2775a..24cf74afa 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -333,10 +333,47 @@ pub async fn query_next_governance_id( .context("Failed to deserialize proposal id") } +pub async fn query_bonds_old( + client: &HttpClient, + addresses: HashSet, +) -> anyhow::Result)>> { + let nested_bonds = futures::stream::iter(addresses) + .filter_map(|BondAddresses { source, target }| async move { + // TODO: if this is too slow do not use query_all_bonds_and_unbonds + let (bonds_res, _) = query_all_bonds_and_unbonds( + client, + Some(source.clone()), + Some(target.clone()), + ) + .await + .context("Failed to query all bonds and unbonds") + .ok()?; + + let bonds = if !bonds_res.is_empty() { + bonds_res + .into_iter() + .map(|bond| (source.clone(), target.clone(), Some(bond))) + .collect::>() + } else { + vec![(source, target, None)] + }; + + Some(bonds) + }) + .map(futures::future::ready) + .buffer_unordered(20) + .collect::>() + .await; + + let bonds = nested_bonds.iter().flatten().cloned().collect(); + + anyhow::Ok(bonds) +} + pub async fn query_bonds( client: Arc, addresses: HashSet, -) -> anyhow::Result { +) -> anyhow::Result)>> { let nested_bonds = futures::stream::iter(addresses) .filter_map(|BondAddresses { source, target }| { let client = Arc::clone(&client); @@ -345,8 +382,8 @@ pub async fn query_bonds( // TODO: if this is too slow do not use query_all_bonds_and_unbonds let (bonds_res, _) = query_all_bonds_and_unbonds( client, - Some(source), - Some(target), + Some(source.clone()), + Some(target.clone()), ) .await .context("Failed to query all bonds and unbonds") @@ -371,7 +408,7 @@ pub async fn query_bonds( .collect::>() .await; - let bonds = nested_bonds.iter().flatten().collect(); + let bonds = nested_bonds.iter().flatten().cloned().collect(); anyhow::Ok(bonds) } @@ -483,7 +520,6 @@ pub async fn is_steward( ) -> anyhow::Result { let address = NamadaSdkAddress::from_str(&address.to_string()) .context("Failed to parse address")?; - let client: &HttpClient = &client; let is_steward = rpc::is_steward(client, &address).await; @@ -523,7 +559,6 @@ pub async fn query_all_votes( .filter_map(|proposal_id| { let client = Arc::clone(&client); async move { - let proposal_id = proposal_id.clone(); let client: &HttpClient = &client; let votes = rpc::query_proposal_votes(client, proposal_id) diff --git a/pos/src/main.rs b/pos/src/main.rs index 3908c6def..b6e577d64 100644 --- a/pos/src/main.rs +++ b/pos/src/main.rs @@ -5,7 +5,7 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; -use deadpool_redis::redis::{self, RedisResult}; +use deadpool_redis::redis::{self}; use namada_sdk::time::DateTimeUtc; use orm::crawler_state::EpochStateInsertDb; use orm::migrations::run_migrations; @@ -14,22 +14,17 @@ use pos::app_state::AppState; use pos::config::AppConfig; use pos::repository::{self}; use pos::services::namada as namada_service; -use redis::aio::MultiplexedConnection; -use redis::streams::{StreamRangeReply, StreamReadOptions}; use redis::AsyncCommands; -use redis::{streams::StreamReadReply, Cmd}; use shared::crawler; use shared::crawler_state::{CrawlerName, EpochCrawlerState}; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use shared::event_store::{ - publish, subscribe, Event, PosEvents, PosInitializedEventV1, - SupportedEvents, + publish, subscribe, PosEvents, PosInitializedEventV1, }; use tendermint_rpc::HttpClient; use tokio::signal; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use tokio::time::sleep; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -90,14 +85,6 @@ async fn main() -> Result<(), MainError> { Arc::clone(&app_state), )); - // let redis_conn = app_state.get_redis_connection().await.into_db_error()?; - // publish( - // redis_conn, - // PosEvents::PosInitializedEventV1(PosInitializedEventV1), - // ) - // .await - // .into_db_error()?; - tokio::select! { _ = must_exit_handle() => { tracing::info!("Received interrupt signal, shutting down..."); diff --git a/shared/src/event_store.rs b/shared/src/event_store.rs index 82c355ade..7f43554e7 100644 --- a/shared/src/event_store.rs +++ b/shared/src/event_store.rs @@ -35,21 +35,18 @@ where let event = stream_id.map.get(EVENT).expect("event key not found"); - match event { - Value::Data(data) => { - let event_str = std::str::from_utf8(data) - .expect("event is not valid utf8"); + if let Value::Data(data) = event { + let event_str = std::str::from_utf8(data) + .expect("event is not valid utf8"); - let event = T::from_stored(event_str); + let event = T::from_stored(event_str); - redis_conn - .set(&last_processed_key, stream_id.id.clone()) - .await?; - last_processed_id = stream_id.id; + redis_conn + .set(&last_processed_key, stream_id.id.clone()) + .await?; + last_processed_id = stream_id.id; - tx.send(event).await.expect("send failed"); - } - _ => {} + tx.send(event).await.expect("send failed"); } } } @@ -115,6 +112,7 @@ pub trait Event: Clone + Serialize + Send + Sync { define_event!(PosInitializedEventV1); define_event!(ChainInitializedEventV1); +define_event!(ChainProcessed, block: u32); define_event!(Test); pub trait SupportedEvents: for<'a> Deserialize<'a> + Serialize { @@ -130,10 +128,11 @@ pub trait SupportedEvents: for<'a> Deserialize<'a> + Serialize { } // TODO: move this to POS module -#[derive(Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug)] pub enum PosEvents { PosInitializedEventV1(PosInitializedEventV1), ChainInitializedEventV1(ChainInitializedEventV1), + ChainProcessed(ChainProcessed), Test(Test), } diff --git a/webserver/Cargo.toml b/webserver/Cargo.toml index e0acd0473..5bc5a76f0 100644 --- a/webserver/Cargo.toml +++ b/webserver/Cargo.toml @@ -44,11 +44,13 @@ tokio-stream.workspace = true namada_core.workspace = true namada_sdk.workspace = true namada_parameters.workspace = true -deadpool-redis = "0.13.0" +deadpool-redis = "0.15.1" +redis = {version = "0.25.0", features = ["streams"]} bigdecimal.workspace = true shared.workspace = true strum.workspace = true strum_macros.workspace = true +async-stream = "0.3.5" [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/webserver/src/app.rs b/webserver/src/app.rs index 378f5b948..caaffbfbd 100644 --- a/webserver/src/app.rs +++ b/webserver/src/app.rs @@ -1,14 +1,19 @@ use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::Arc; use std::time::Duration; use axum::error_handling::HandleErrorLayer; use axum::http::{HeaderValue, StatusCode}; use axum::response::IntoResponse; use axum::routing::get; +use axum::Extension; use axum::{BoxError, Json, Router}; use lazy_static::lazy_static; use namada_sdk::tendermint_rpc::HttpClient; use serde_json::json; +use shared::event_store::PosEvents; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; use tower::buffer::BufferLayer; use tower::limit::RateLimitLayer; use tower::ServiceBuilder; @@ -33,16 +38,25 @@ lazy_static! { pub struct ApplicationServer; impl ApplicationServer { - pub async fn serve(config: AppConfig) -> anyhow::Result<()> { + pub async fn serve( + config: AppConfig, + events_rx: Receiver, + ) -> anyhow::Result<()> { let db_url = config.database_url.clone(); let cache_url = config.cache_url.clone(); let app_state = AppState::new(db_url, cache_url); let client = HttpClient::new(config.tendermint_url.as_str()).unwrap(); + // let events_stream = + // tokio_stream::wrappers::ReceiverStream::new(events_rx); let routes = { - let common_state = - CommonState::new(client, config.clone(), app_state.clone()); + let common_state = CommonState::new( + client, + config.clone(), + app_state.clone(), + Arc::new(Mutex::new(events_rx)), + ); Router::new() .route("/pos/validator", get(pos_handlers::get_validators)) diff --git a/webserver/src/handler/chain.rs b/webserver/src/handler/chain.rs index 8373d80fa..440af5d65 100644 --- a/webserver/src/handler/chain.rs +++ b/webserver/src/handler/chain.rs @@ -1,5 +1,6 @@ use std::convert::Infallible; -use std::time::Duration; +use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use axum::extract::State; use axum::http::HeaderMap; @@ -24,34 +25,50 @@ struct ChainStatusEvent { pub async fn chain_status( State(state): State, ) -> Sse>> { - let stream = tokio_stream::wrappers::IntervalStream::new( - tokio::time::interval(Duration::from_secs(3)), - ) - .then(move |_| { - let state = state.clone(); - - async move { - let height = state - .chain_service - .find_last_processed_block() - .await - .expect("Failed to get last processed block"); - - let epoch = state - .chain_service - .find_last_processed_epoch() - .await - .expect("Failed to get last processed epoch"); - + // let stream = state.events_rx.map(|event| { + // let event = + // serde_json::to_string(&event).expect("Failed to serialize event"); + + // Ok(Event::default().data(event)) + // }); + // let stream = tokio_stream::wrappers::IntervalStream::new( + // tokio::time::interval(Duration::from_secs(3)), + // ) + // .then(move |_| { + // let state = state.clone(); + + // async move { + // let height = state + // .chain_service + // .find_last_processed_block() + // .await + // .expect("Failed to get last processed block"); + + // let epoch = state + // .chain_service + // .find_last_processed_epoch() + // .await + // .expect("Failed to get last processed epoch"); + + // let event = + // serde_json::to_string(&ChainStatusEvent { height, epoch }) + // .expect("Failed to serialize event"); + + // Ok(Event::default().data(event)) + // } + // }); + // Convert the channels to a `Stream`. + let rx1 = async_stream::stream! { + let mut events_rx = state.events_rx.lock().await; + + while let Some(event) = events_rx.recv().await { let event = - serde_json::to_string(&ChainStatusEvent { height, epoch }) - .expect("Failed to serialize event"); - - Ok(Event::default().data(event)) + serde_json::to_string(&event).expect("Failed to serialize event"); + yield Ok(Event::default().data(event)); } - }); + }; - Sse::new(stream).keep_alive(KeepAlive::default()) + Sse::new(rx1).keep_alive(KeepAlive::default()) } pub async fn get_parameters( diff --git a/webserver/src/main.rs b/webserver/src/main.rs index 19dad7ffb..11ad9597a 100644 --- a/webserver/src/main.rs +++ b/webserver/src/main.rs @@ -1,5 +1,9 @@ use anyhow::Context; use clap::Parser; +use deadpool_redis::redis::AsyncCommands; +use deadpool_redis::{Config, Runtime}; +use shared::event_store::{subscribe, PosEvents}; +use tokio::sync::{mpsc, oneshot}; use webserver::app::ApplicationServer; use webserver::config::AppConfig; @@ -11,9 +15,34 @@ async fn main() -> anyhow::Result<()> { .with_max_level(tracing::Level::INFO) .init(); - ApplicationServer::serve(config) + let (tx, rx) = oneshot::channel(); + let (events_tx, events_rx) = mpsc::channel::(100); + let cfg = Config::from_url("redis://redis@0.0.0.0:6379"); + let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + let mut redis_conn = redis_pool.get().await.unwrap(); + + let last_processed_id: String = redis_conn + .get("webserver_last_processed_id") .await - .context("could not initialize application routes")?; + .ok() + .unwrap_or("0".to_string()); + + let var_name = + ("webserver_last_processed_id".to_string(), last_processed_id); + let subscriber = + tokio::spawn(subscribe(redis_conn, var_name, events_tx, rx)); + + let server = ApplicationServer::serve(config, events_rx); + + tokio::select! { + _ = server => { + tracing::info!("Received interrupt signal, shutting down..."); + tx.send(()).unwrap(); + } + _ = subscriber => { + tracing::info!("Subscriber finished..."); + } + } Ok(()) } diff --git a/webserver/src/state/common.rs b/webserver/src/state/common.rs index f50fafb06..779bfeda5 100644 --- a/webserver/src/state/common.rs +++ b/webserver/src/state/common.rs @@ -1,4 +1,9 @@ +use std::sync::Arc; + use namada_sdk::tendermint_rpc::HttpClient; +use shared::event_store::PosEvents; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; use crate::appstate::AppState; use crate::config::AppConfig; @@ -23,10 +28,16 @@ pub struct CommonState { pub crawler_state_service: CrawlerStateService, pub client: HttpClient, pub config: AppConfig, + pub events_rx: Arc>>, } impl CommonState { - pub fn new(client: HttpClient, config: AppConfig, data: AppState) -> Self { + pub fn new( + client: HttpClient, + config: AppConfig, + data: AppState, + events_rx: Arc>>, + ) -> Self { Self { pos_service: PosService::new(data.clone()), gov_service: GovernanceService::new(data.clone()), @@ -38,6 +49,7 @@ impl CommonState { crawler_state_service: CrawlerStateService::new(data.clone()), client, config, + events_rx, } } }