diff --git a/sugondat/shim/src/cmd/query/mod.rs b/sugondat/shim/src/cmd/query/mod.rs index d17b2afc..7b8980e8 100644 --- a/sugondat/shim/src/cmd/query/mod.rs +++ b/sugondat/shim/src/cmd/query/mod.rs @@ -19,5 +19,5 @@ pub async fn run(params: Params) -> anyhow::Result<()> { async fn connect_rpc( conn_params: crate::cli::SugondatRpcParams, ) -> anyhow::Result { - sugondat_rpc::Client::new(conn_params.node_url).await + sugondat_rpc::Client::new(conn_params.node_url, conn_params.no_retry).await } diff --git a/sugondat/shim/src/cmd/serve.rs b/sugondat/shim/src/cmd/serve.rs index 83dfb639..8f4e6452 100644 --- a/sugondat/shim/src/cmd/serve.rs +++ b/sugondat/shim/src/cmd/serve.rs @@ -16,7 +16,7 @@ Pass --submit-dev-alice or --submit-private-key=<..> to fix." ); } let server = Server::builder().build(listen_on).await?; - let client = connect_client(¶ms.rpc.node_url).await?; + let client = connect_client(¶ms.rpc.node_url, params.rpc.no_retry).await?; let methods = dock::init(dock::Config { // TODO: whenever there are more docks, the logic of checking if any at least one is enabled // and other similar stuff should be in CLI. @@ -30,7 +30,7 @@ Pass --submit-dev-alice or --submit-private-key=<..> to fix." Ok(()) } -async fn connect_client(url: &str) -> anyhow::Result { - let client = Client::new(url.to_string()).await?; +async fn connect_client(url: &str, no_retry: bool) -> anyhow::Result { + let client = Client::new(url.to_string(), no_retry).await?; Ok(client) } diff --git a/sugondat/shim/src/sugondat_rpc/mod.rs b/sugondat/shim/src/sugondat_rpc/mod.rs index 08f4c7eb..90fb4935 100644 --- a/sugondat/shim/src/sugondat_rpc/mod.rs +++ b/sugondat/shim/src/sugondat_rpc/mod.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use crate::key::Keypair; use anyhow::Context; use subxt::{config::Header as _, rpc_params, utils::H256}; @@ -28,7 +26,7 @@ mod conn; /// This is a thin wrapper that can be cloned cheaply. #[derive(Clone)] pub struct Client { - connector: Arc, + connection: conn::ConnectionType, } impl Client { @@ -38,36 +36,29 @@ impl Client { /// The RPC URL must be a valid URL pointing to a sugondat node. If it's not a malformed URL, /// returns an error. #[tracing::instrument(level = Level::DEBUG)] - pub async fn new(rpc_url: String) -> anyhow::Result { + pub async fn new(rpc_url: String, no_retry: bool) -> anyhow::Result { anyhow::ensure!( url::Url::parse(&rpc_url).is_ok(), "invalid RPC URL: {}", rpc_url ); - tracing::info!("connecting to sugondat node: {}", rpc_url); - let rpc_url = Arc::new(rpc_url); - let me = Self { - connector: Arc::new(conn::Connector::new(rpc_url)), - }; - me.connector.ensure_connected().await; - Ok(me) + let connection = conn::ConnectionType::new(rpc_url, no_retry).await?; + Ok(Self { connection }) } /// Blocks until the sugondat node has finalized a block at the given height. Returns /// the block hash of the block at the given height. #[tracing::instrument(level = Level::DEBUG, skip(self))] pub async fn wait_finalized_height(&self, height: u64) -> [u8; 32] { - loop { - let conn = self.connector.ensure_connected().await; - match conn.finalized.wait_until_finalized(self, height).await { - Some(block_hash) => return block_hash, - None => { - // The watcher task has terminated. Reset the connection and retry. - self.connector.reset().await; - } - } - } + self.connection + .run(|conn| async move { + conn.finalized + .wait_until_finalized(self, height) + .await + .ok_or(anyhow::anyhow!("failed to wait for last finalized block")) + }) + .await } /// Returns the block hash of the block at the given height. @@ -75,31 +66,23 @@ impl Client { /// If there is no block at the given height, returns `None`. #[tracing::instrument(level = Level::DEBUG, skip(self))] pub async fn block_hash(&self, height: u64) -> anyhow::Result> { - loop { - let conn = self.connector.ensure_connected().await; - let block_hash: Option = match conn - .raw - .request("chain_getBlockHash", rpc_params![height]) - .await - { - Ok(it) => it, - Err(err) => { - tracing::error!(?err, "failed to query block hash"); - self.connector.reset().await; - continue; - } - }; - - break match block_hash { - None => Ok(None), - Some(h) if h == H256::zero() => { - // Little known fact: the sugondat node returns a zero block hash if there is no block - // at the given height. - Ok(None) - } - Some(block_hash) => Ok(Some(block_hash)), - }; - } + self.connection + .run(|conn| async move { + conn.raw + .request("chain_getBlockHash", rpc_params![height]) + .await + .and_then(|block_hash| match block_hash { + None => Ok(Ok(None)), + Some(h) if h == H256::zero() => { + // Little known fact: the sugondat node returns a zero block hash if there is no block + // at the given height. + Ok(Ok(None)) + } + Some(block_hash) => Ok(Ok(Some(block_hash))), + }) + .map_err(|err| anyhow::anyhow!("failed to query block hash: {}", err)) + }) + .await } /// Returns the header and the body of the block with the given hash, automatically retrying @@ -109,32 +92,31 @@ impl Client { block_hash: Option<[u8; 32]>, ) -> anyhow::Result<(Header, Vec)> { let block_hash = block_hash.map(H256::from); - loop { - let conn = self.connector.ensure_connected().await; - let res = match block_hash { - Some(h) => conn.subxt.blocks().at(h).await, - None => conn.subxt.blocks().at_latest().await, - }; - let err = match res { - Ok(it) => { - let header = it.header(); - let body = match it.extrinsics().await { - Ok(it) => it, - Err(err) => { - tracing::error!(?err, "failed to query block"); - self.connector.reset().await; - continue; + + self.connection + .run(|conn| async move { + let res = match block_hash { + Some(h) => conn.subxt.blocks().at(h).await, + None => conn.subxt.blocks().at_latest().await, + }; + + match res { + Ok(it) => { + let header = it.header(); + let body = match it.extrinsics().await { + Ok(it) => it, + Err(err) => { + return Err(anyhow::anyhow!("failed to query block: {}", err)) + } } + .iter() + .collect::, _>>()?; + Ok(Ok((header.clone(), body))) } - .iter() - .collect::, _>>()?; - return Ok((header.clone(), body)); + Err(err) => Err(anyhow::anyhow!("failed to query block: {}", err)), } - Err(err) => err, - }; - tracing::error!(?err, "failed to query block"); - self.connector.reset().await; - } + }) + .await } /// Returns the data of the block identified by the given block hash. If the block is not found @@ -170,17 +152,22 @@ impl Client { namespace: sugondat_nmt::Namespace, key: Keypair, ) -> anyhow::Result<[u8; 32]> { + let conn = match self.connection { + conn::ConnectionType::Persistent(ref connector) => connector.ensure_connected().await, + conn::ConnectionType::Single(ref conn) => conn.clone(), + }; + let extrinsic = sugondat_subxt::sugondat::tx() .blobs() .submit_blob(UnvalidatedNamespace(namespace.to_raw_bytes()), blob); - let conn = self.connector.ensure_connected().await; let signed = conn .subxt .tx() .create_signed(&extrinsic, &key, Default::default()) .await .with_context(|| format!("failed to validate or sign extrinsic"))?; + let events = signed .submit_and_watch() .await