Skip to content

Update subxt #864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 4, 2025
433 changes: 226 additions & 207 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ subxt = { default-features = false, features = [
"jsonrpsee",
"native",
"unstable-light-client",
], version = "0.38.0" }
], version = "0.42.1" }
base64 = "0.22.1"
thiserror = "1.0.69"
time = { version = "0.3.36", features = ["formatting"] }
Expand Down
5 changes: 3 additions & 2 deletions block-time/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
io::{Write, stdout},
net::ToSocketAddrs,
};
use subxt::config::Header;
use subxt::config::Hasher;
use tokio::select;

#[derive(Clone, Debug, Parser)]
Expand Down Expand Up @@ -331,7 +331,8 @@ async fn populate_view(

for _ in 0..blocks_to_fetch {
if let Ok(Some(header)) = executor.get_block_head(url, parent_hash).await {
let ts = executor.get_block_timestamp(url, header.hash()).await.unwrap();
let hasher = executor.hasher(url).unwrap();
let ts = executor.get_block_timestamp(url, hasher.hash_of(&header)).await.unwrap();

if prev_ts != 0 {
block_times.push(prev_ts.saturating_sub(ts));
Expand Down
20 changes: 13 additions & 7 deletions essentials/src/api/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::{
polkadot_staging_primitives::CoreState,
},
types::{
AccountId32, BlockNumber, ClaimQueue, H256, Header, InherentData, QueuedKeys, SessionKeys, SubxtHrmpChannel,
Timestamp,
AccountId32, BlockNumber, ClaimQueue, H256, Header, InherentData, PolkadotHasher, QueuedKeys, SessionKeys,
SubxtHrmpChannel, Timestamp,
},
};
use clap::ValueEnum;
Expand Down Expand Up @@ -67,9 +67,14 @@ where
{
client: T,
legacy_rpc_methods: LegacyRpcMethods<PolkadotConfig>,
hasher: PolkadotHasher,
}

impl<T: OnlineClientT<PolkadotConfig>> ApiClient<T> {
pub fn hasher(&self) -> PolkadotHasher {
self.hasher
}

fn storage(&self) -> StorageClient<PolkadotConfig, T> {
self.client.storage()
}
Expand Down Expand Up @@ -329,19 +334,19 @@ impl<T: OnlineClientT<PolkadotConfig>> ApiClient<T> {
maybe_block_number: Option<BlockNumber>,
) -> Result<Option<H256>, subxt::Error> {
let maybe_block_number = maybe_block_number.map(|v| NumberOrHex::Number(v.into()));
self.legacy_rpc_methods.chain_get_block_hash(maybe_block_number).await
Ok(self.legacy_rpc_methods.chain_get_block_hash(maybe_block_number).await?)
}

pub async fn legacy_get_chain_name(&self) -> Result<String, subxt::Error> {
self.legacy_rpc_methods.system_chain().await
Ok(self.legacy_rpc_methods.system_chain().await?)
}

pub async fn stream_best_block_headers(&self) -> Result<HeaderStream, subxt::Error> {
self.client.backend().stream_best_block_headers().await
self.client.backend().stream_best_block_headers(self.hasher()).await
}

pub async fn stream_finalized_block_headers(&self) -> Result<HeaderStream, subxt::Error> {
self.client.backend().stream_finalized_block_headers().await
self.client.backend().stream_finalized_block_headers(self.hasher()).await
}
}

Expand Down Expand Up @@ -372,8 +377,9 @@ pub async fn build_online_client(
},
};
let legacy_rpc_methods = LegacyRpcMethods::<PolkadotConfig>::new(rpc_client);
let hasher = client.hasher();

Ok(ApiClient { client, legacy_rpc_methods })
Ok(ApiClient { client, legacy_rpc_methods, hasher })
}

async fn join_requests<I, T>(fut: I) -> Result<Vec<T>, subxt::Error>
Expand Down
22 changes: 15 additions & 7 deletions essentials/src/api/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
},
types::{
AccountId32, BlockNumber, ClaimQueue, CoreOccupied, H256, Header, InboundOutBoundHrmpChannels, InherentData,
SessionKeys, SubxtHrmpChannel, Timestamp,
PolkadotHasher, SessionKeys, SubxtHrmpChannel, Timestamp,
},
utils::{Retry, RetryOptions},
};
Expand Down Expand Up @@ -257,6 +257,10 @@ impl RequestExecutorBackend {

Ok(response)
}

pub fn hasher(&self) -> PolkadotHasher {
self.client.hasher()
}
}

pub trait RequestExecutorNodes {
Expand All @@ -282,11 +286,11 @@ impl RequestExecutorNodes for &str {
}

#[derive(Clone)]
pub struct RequestExecutor(HashMap<String, PrioritySender<ExecutorMessage>>);
pub struct RequestExecutor(HashMap<String, (PrioritySender<ExecutorMessage>, PolkadotHasher)>);

macro_rules! wrap_backend_call {
($self:expr, $url:expr, $request_ty:ident, $response_ty:ident) => {
if let Some(to_backend) = $self.0.get_mut($url) {
if let Some((to_backend, _)) = $self.0.get_mut($url) {
let (tx, rx) = tokio::sync::oneshot::channel::<Response>();
let request = Request::$request_ty;
to_backend.send(ExecutorMessage::Rpc(tx, Request::$request_ty)).await?;
Expand All @@ -302,7 +306,7 @@ macro_rules! wrap_backend_call {
}
};
($self:expr, $url:expr, $request_ty:ident, $response_ty:ident, $($arg:expr),*) => {
if let Some(to_backend) = $self.0.get_mut($url) {
if let Some((to_backend, _)) = $self.0.get_mut($url) {
let (tx, rx) = tokio::sync::oneshot::channel::<Response>();
let request = Request::$request_ty($($arg),*);
to_backend.send(ExecutorMessage::Rpc(tx, request.clone())).await?;
Expand Down Expand Up @@ -330,8 +334,8 @@ impl RequestExecutor {
let mut clients = HashMap::new();
for node in nodes.unique_nodes() {
let (to_backend, from_frontend) = channel(MAX_MSG_QUEUE_SIZE);
let _ = clients.insert(node.clone(), to_backend);
let mut backend = RequestExecutorBackend::build(retry.clone(), node, api_client_mode).await?;
let mut backend = RequestExecutorBackend::build(retry.clone(), node.clone(), api_client_mode).await?;
let _ = clients.insert(node, (to_backend, backend.hasher()));
let shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
if let Err(e) = backend.run(from_frontend).await {
Expand All @@ -344,9 +348,13 @@ impl RequestExecutor {
Ok(RequestExecutor(clients))
}

pub fn hasher(&self, url: &str) -> Option<PolkadotHasher> {
self.0.get(url).map(|(_, hasher)| *hasher)
}

/// Closes all RPC clients
pub async fn close(&mut self) {
for to_backend in self.0.values_mut() {
for (to_backend, _) in self.0.values_mut() {
let _ = to_backend.send(ExecutorMessage::Close).await;
}
}
Expand Down
22 changes: 15 additions & 7 deletions essentials/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
mod tests {
use super::*;
use crate::{api::api_client::ApiClientMode, init, storage::StorageEntry, types::H256, utils::RetryOptions};
use subxt::config::{Hasher, Header, substrate::BlakeTwo256};
use subxt::config::Hasher;

fn rpc_node_url() -> &'static str {
const RPC_NODE_URL: &str = "wss://rpc.polkadot.io:443";
Expand All @@ -102,9 +102,11 @@ mod tests {

#[tokio::test]
async fn basic_storage_test() {
let api = ApiService::new_with_storage(RecordsStorageConfig { max_blocks: 10 }, request_executor().await);
let executor = request_executor().await;
let hasher = executor.hasher(rpc_node_url()).unwrap();
let api = ApiService::new_with_storage(RecordsStorageConfig { max_blocks: 10 }, executor);
let storage = api.storage();
let key = BlakeTwo256::hash_of(&100);
let key = hasher.hash_of(&100);
storage
.storage_write(key, StorageEntry::new_onchain(1.into(), "some data"))
.await
Expand All @@ -118,10 +120,14 @@ mod tests {
let api =
ApiService::<H256>::new_with_storage(RecordsStorageConfig { max_blocks: 10 }, request_executor().await);
let mut subxt = api.executor();
let hasher = subxt.hasher(rpc_node_url()).unwrap();

let head = subxt.get_block_head(rpc_node_url(), None).await.unwrap().unwrap();
let timestamp = subxt.get_block_timestamp(rpc_node_url(), head.hash()).await.unwrap();
let _block = subxt.get_block_number(rpc_node_url(), Some(head.hash())).await.unwrap();
let timestamp = subxt.get_block_timestamp(rpc_node_url(), hasher.hash_of(&head)).await.unwrap();
let _block = subxt
.get_block_number(rpc_node_url(), Some(hasher.hash_of(&head)))
.await
.unwrap();
assert!(timestamp > 0);
}

Expand All @@ -142,9 +148,10 @@ mod tests {
let api =
ApiService::<H256>::new_with_storage(RecordsStorageConfig { max_blocks: 1 }, request_executor().await);
let mut subxt = api.executor();
let hasher = subxt.hasher(rpc_node_url()).unwrap();

let head = subxt.get_block_head(rpc_node_url(), None).await.unwrap().unwrap();
let cores = subxt.get_occupied_cores(rpc_node_url(), head.hash()).await;
let cores = subxt.get_occupied_cores(rpc_node_url(), hasher.hash_of(&head)).await;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether you can avoid needing to expose the hasher for many of these calls by doing one of the following:

  1. Have a function like get_block_hash rather (or as well as) get_block_head. In Subxt, blocks have a .hash() method for instance that could return this.
  2. Have functions accept a block header rather than a block hash (so that the hashing can be handled "internally" by Subxt). Perhaps this is difficult because it looks like you pass things through a channel, so would need owned values and headers are larger than hashes to have to be sending etc.
  3. Functions could also accept something like a trait IntoBlockHash { fn hash() } type trait and then we have impls for hashes but also blocks or whatever. Maybe suffers from the same issue as 2 does.


// TODO: fix zombie net instance to return valid cores
assert!(cores.is_err());
Expand All @@ -155,9 +162,10 @@ mod tests {
let api =
ApiService::<H256>::new_with_storage(RecordsStorageConfig { max_blocks: 1 }, request_executor().await);
let mut subxt = api.executor();
let hasher = subxt.hasher(rpc_node_url()).unwrap();

let head = subxt.get_block_head(rpc_node_url(), None).await.unwrap().unwrap();
let groups = subxt.get_backing_groups(rpc_node_url(), head.hash()).await;
let groups = subxt.get_backing_groups(rpc_node_url(), hasher.hash_of(&head)).await;

assert!(groups.is_ok());
}
Expand Down
58 changes: 30 additions & 28 deletions essentials/src/chain_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ use crate::{
},
polkadot_staging_primitives::CandidateDescriptorV2,
},
types::{H256, Header, OnDemandOrder},
types::{H256, Header, OnDemandOrder, PolkadotHash, PolkadotHasher},
};
use color_eyre::{Result, eyre::eyre};
use parity_scale_codec::{Decode, Encode};
use serde::Serialize;
use subxt::{
PolkadotConfig,
config::{Hasher, substrate::BlakeTwo256},
};
use subxt::{PolkadotConfig, config::Hasher};

#[derive(Debug)]
pub enum ChainEvent<T: subxt::Config> {
Expand All @@ -47,9 +44,9 @@ pub enum ChainEvent<T: subxt::Config> {
/// Backing, inclusion, time out for a parachain candidate
CandidateChanged(Box<SubxtCandidateEvent>),
/// On-demand parachain placed its order
OnDemandOrderPlaced(<PolkadotConfig as subxt::Config>::Hash, OnDemandOrder),
OnDemandOrderPlaced(PolkadotHash, OnDemandOrder),
/// Anything undecoded
RawEvent(<PolkadotConfig as subxt::Config>::Hash, subxt::events::EventDetails<T>),
RawEvent(PolkadotHash, subxt::events::EventDetails<T>),
}

#[derive(Debug)]
Expand All @@ -66,9 +63,9 @@ pub enum SubxtCandidateEventType {
#[derive(Debug)]
pub struct SubxtCandidateEvent {
/// Result of candidate receipt hashing
pub candidate_hash: <PolkadotConfig as subxt::Config>::Hash,
pub candidate_hash: PolkadotHash,
/// Full candidate receipt if needed
pub candidate_descriptor: CandidateDescriptorV2<<PolkadotConfig as subxt::Config>::Hash>,
pub candidate_descriptor: CandidateDescriptorV2<PolkadotHash>,
/// The parachain id
pub parachain_id: u32,
/// The event type
Expand All @@ -81,9 +78,9 @@ pub struct SubxtCandidateEvent {
#[derive(Debug, Clone, Encode, Decode)]
pub struct SubxtDispute {
/// Relay chain block where a dispute has taken place
pub relay_parent_block: <PolkadotConfig as subxt::Config>::Hash,
pub relay_parent_block: PolkadotHash,
/// Specific candidate being disputed about
pub candidate_hash: <PolkadotConfig as subxt::Config>::Hash,
pub candidate_hash: PolkadotHash,
}

/// Dispute result as seen by subxt event
Expand All @@ -98,21 +95,22 @@ pub enum SubxtDisputeResult {
TimedOut,
}

pub async fn decode_chain_event<T: subxt::Config>(
block_hash: <PolkadotConfig as subxt::Config>::Hash,
event: subxt::events::EventDetails<T>,
) -> Result<ChainEvent<T>> {
if is_specific_event::<DisputeInitiated, T>(&event) {
let decoded = decode_to_specific_event::<DisputeInitiated, T>(&event)?;
pub async fn decode_chain_event(
block_hash: PolkadotHash,
event: subxt::events::EventDetails<PolkadotConfig>,
hasher: PolkadotHasher,
) -> Result<ChainEvent<PolkadotConfig>> {
if is_specific_event::<DisputeInitiated, PolkadotConfig>(&event) {
let decoded = decode_to_specific_event::<DisputeInitiated, PolkadotConfig>(&event)?;
return Ok(ChainEvent::DisputeInitiated(SubxtDispute {
relay_parent_block: block_hash,
candidate_hash: decoded.0.0,
}))
}

if is_specific_event::<DisputeConcluded, T>(&event) {
if is_specific_event::<DisputeConcluded, PolkadotConfig>(&event) {
use crate::metadata::polkadot::runtime_types::polkadot_runtime_parachains::disputes;
let decoded = decode_to_specific_event::<DisputeConcluded, T>(&event)?;
let decoded = decode_to_specific_event::<DisputeConcluded, PolkadotConfig>(&event)?;
let outcome = match decoded.1 {
disputes::DisputeResult::Valid => SubxtDisputeResult::Valid,
disputes::DisputeResult::Invalid => SubxtDisputeResult::Invalid,
Expand All @@ -123,33 +121,36 @@ pub async fn decode_chain_event<T: subxt::Config>(
))
}

if is_specific_event::<CandidateBacked, T>(&event) {
let decoded = decode_to_specific_event::<CandidateBacked, T>(&event)?;
if is_specific_event::<CandidateBacked, PolkadotConfig>(&event) {
let decoded = decode_to_specific_event::<CandidateBacked, PolkadotConfig>(&event)?;
return Ok(ChainEvent::CandidateChanged(Box::new(create_candidate_event(
decoded.0.commitments_hash,
decoded.0.descriptor,
decoded.2.0,
SubxtCandidateEventType::Backed,
hasher,
))))
}

if is_specific_event::<CandidateIncluded, T>(&event) {
let decoded = decode_to_specific_event::<CandidateIncluded, T>(&event)?;
if is_specific_event::<CandidateIncluded, PolkadotConfig>(&event) {
let decoded = decode_to_specific_event::<CandidateIncluded, PolkadotConfig>(&event)?;
return Ok(ChainEvent::CandidateChanged(Box::new(create_candidate_event(
decoded.0.commitments_hash,
decoded.0.descriptor,
decoded.2.0,
SubxtCandidateEventType::Included,
hasher,
))))
}

if is_specific_event::<CandidateTimedOut, T>(&event) {
let decoded = decode_to_specific_event::<CandidateTimedOut, T>(&event)?;
if is_specific_event::<CandidateTimedOut, PolkadotConfig>(&event) {
let decoded = decode_to_specific_event::<CandidateTimedOut, PolkadotConfig>(&event)?;
return Ok(ChainEvent::CandidateChanged(Box::new(create_candidate_event(
decoded.0.commitments_hash,
decoded.0.descriptor,
decoded.2.0,
SubxtCandidateEventType::TimedOut,
hasher,
))))
}

Expand Down Expand Up @@ -193,12 +194,13 @@ fn decode_to_specific_event<E: subxt::events::StaticEvent, C: subxt::Config>(
}

fn create_candidate_event(
commitments_hash: <PolkadotConfig as subxt::Config>::Hash,
candidate_descriptor: CandidateDescriptorV2<<PolkadotConfig as subxt::Config>::Hash>,
commitments_hash: PolkadotHash,
candidate_descriptor: CandidateDescriptorV2<PolkadotHash>,
core_idx: u32,
event_type: SubxtCandidateEventType,
hasher: PolkadotHasher,
) -> SubxtCandidateEvent {
let candidate_hash = BlakeTwo256::hash_of(&(&candidate_descriptor, commitments_hash));
let candidate_hash = hasher.hash_of(&(&candidate_descriptor, commitments_hash));
let parachain_id = candidate_descriptor.para_id.0;
SubxtCandidateEvent { event_type, candidate_descriptor, parachain_id, candidate_hash, core_idx }
}
Loading
Loading