Skip to content

shim: use ConnectioType #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sugondat/shim/src/cmd/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ pub async fn run(params: Params) -> anyhow::Result<()> {
async fn connect_rpc(
conn_params: crate::cli::SugondatRpcParams,
) -> anyhow::Result<sugondat_rpc::Client> {
sugondat_rpc::Client::new(conn_params.node_url).await
sugondat_rpc::Client::new(conn_params.node_url, conn_params.no_retry).await
}
6 changes: 3 additions & 3 deletions sugondat/shim/src/cmd/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pass --submit-dev-alice or --submit-private-key=<..> to fix."
);
}
let server = Server::builder().build(listen_on).await?;
let client = connect_client(&params.rpc.node_url).await?;
let client = connect_client(&params.rpc.node_url, params.rpc.no_retry).await?;
let methods = dock::init(dock::Config {
// TODO: whenever there are more docks, the logic of checking if any at least one is enabled
// and other similar stuff should be in CLI.
Expand All @@ -30,7 +30,7 @@ Pass --submit-dev-alice or --submit-private-key=<..> to fix."
Ok(())
}

async fn connect_client(url: &str) -> anyhow::Result<Client> {
let client = Client::new(url.to_string()).await?;
async fn connect_client(url: &str, no_retry: bool) -> anyhow::Result<Client> {
let client = Client::new(url.to_string(), no_retry).await?;
Ok(client)
}
127 changes: 57 additions & 70 deletions sugondat/shim/src/sugondat_rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use crate::key::Keypair;
use anyhow::Context;
use subxt::{config::Header as _, rpc_params, utils::H256};
Expand Down Expand Up @@ -28,7 +26,7 @@ mod conn;
/// This is a thin wrapper that can be cloned cheaply.
#[derive(Clone)]
pub struct Client {
connector: Arc<conn::Connector>,
connection: conn::ConnectionType,
}

impl Client {
Expand All @@ -38,68 +36,53 @@ impl Client {
/// The RPC URL must be a valid URL pointing to a sugondat node. If it's not a malformed URL,
/// returns an error.
#[tracing::instrument(level = Level::DEBUG)]
pub async fn new(rpc_url: String) -> anyhow::Result<Self> {
pub async fn new(rpc_url: String, no_retry: bool) -> anyhow::Result<Self> {
anyhow::ensure!(
url::Url::parse(&rpc_url).is_ok(),
"invalid RPC URL: {}",
rpc_url
);

tracing::info!("connecting to sugondat node: {}", rpc_url);
let rpc_url = Arc::new(rpc_url);
let me = Self {
connector: Arc::new(conn::Connector::new(rpc_url)),
};
me.connector.ensure_connected().await;
Ok(me)
let connection = conn::ConnectionType::new(rpc_url, no_retry).await?;
Ok(Self { connection })
}

/// Blocks until the sugondat node has finalized a block at the given height. Returns
/// the block hash of the block at the given height.
#[tracing::instrument(level = Level::DEBUG, skip(self))]
pub async fn wait_finalized_height(&self, height: u64) -> [u8; 32] {
loop {
let conn = self.connector.ensure_connected().await;
match conn.finalized.wait_until_finalized(self, height).await {
Some(block_hash) => return block_hash,
None => {
// The watcher task has terminated. Reset the connection and retry.
self.connector.reset().await;
}
}
}
self.connection
.run(|conn| async move {
conn.finalized
.wait_until_finalized(self, height)
.await
.ok_or(anyhow::anyhow!("failed to wait for last finalized block"))
})
.await
}

/// Returns the block hash of the block at the given height.
///
/// If there is no block at the given height, returns `None`.
#[tracing::instrument(level = Level::DEBUG, skip(self))]
pub async fn block_hash(&self, height: u64) -> anyhow::Result<Option<H256>> {
loop {
let conn = self.connector.ensure_connected().await;
let block_hash: Option<H256> = match conn
.raw
.request("chain_getBlockHash", rpc_params![height])
.await
{
Ok(it) => it,
Err(err) => {
tracing::error!(?err, "failed to query block hash");
self.connector.reset().await;
continue;
}
};

break match block_hash {
None => Ok(None),
Some(h) if h == H256::zero() => {
// Little known fact: the sugondat node returns a zero block hash if there is no block
// at the given height.
Ok(None)
}
Some(block_hash) => Ok(Some(block_hash)),
};
}
self.connection
.run(|conn| async move {
conn.raw
.request("chain_getBlockHash", rpc_params![height])
.await
.and_then(|block_hash| match block_hash {
None => Ok(Ok(None)),
Some(h) if h == H256::zero() => {
// Little known fact: the sugondat node returns a zero block hash if there is no block
// at the given height.
Ok(Ok(None))
}
Some(block_hash) => Ok(Ok(Some(block_hash))),
})
.map_err(|err| anyhow::anyhow!("failed to query block hash: {}", err))
})
.await
}

/// Returns the header and the body of the block with the given hash, automatically retrying
Expand All @@ -109,32 +92,31 @@ impl Client {
block_hash: Option<[u8; 32]>,
) -> anyhow::Result<(Header, Vec<sugondat_subxt::ExtrinsicDetails>)> {
let block_hash = block_hash.map(H256::from);
loop {
let conn = self.connector.ensure_connected().await;
let res = match block_hash {
Some(h) => conn.subxt.blocks().at(h).await,
None => conn.subxt.blocks().at_latest().await,
};
let err = match res {
Ok(it) => {
let header = it.header();
let body = match it.extrinsics().await {
Ok(it) => it,
Err(err) => {
tracing::error!(?err, "failed to query block");
self.connector.reset().await;
continue;

self.connection
.run(|conn| async move {
let res = match block_hash {
Some(h) => conn.subxt.blocks().at(h).await,
None => conn.subxt.blocks().at_latest().await,
};

match res {
Ok(it) => {
let header = it.header();
let body = match it.extrinsics().await {
Ok(it) => it,
Err(err) => {
return Err(anyhow::anyhow!("failed to query block: {}", err))
}
}
.iter()
.collect::<Result<Vec<_>, _>>()?;
Ok(Ok((header.clone(), body)))
}
.iter()
.collect::<Result<Vec<_>, _>>()?;
return Ok((header.clone(), body));
Err(err) => Err(anyhow::anyhow!("failed to query block: {}", err)),
}
Err(err) => err,
};
tracing::error!(?err, "failed to query block");
self.connector.reset().await;
}
})
.await
}

/// Returns the data of the block identified by the given block hash. If the block is not found
Expand Down Expand Up @@ -170,17 +152,22 @@ impl Client {
namespace: sugondat_nmt::Namespace,
key: Keypair,
) -> anyhow::Result<[u8; 32]> {
let conn = match self.connection {
conn::ConnectionType::Persistent(ref connector) => connector.ensure_connected().await,
conn::ConnectionType::Single(ref conn) => conn.clone(),
};

let extrinsic = sugondat_subxt::sugondat::tx()
.blobs()
.submit_blob(UnvalidatedNamespace(namespace.to_raw_bytes()), blob);

let conn = self.connector.ensure_connected().await;
let signed = conn
.subxt
.tx()
.create_signed(&extrinsic, &key, Default::default())
.await
.with_context(|| format!("failed to validate or sign extrinsic"))?;

let events = signed
.submit_and_watch()
.await
Expand Down