diff --git a/Cargo.lock b/Cargo.lock index 138b92205bf..98c2f9d2616 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6557,6 +6557,7 @@ dependencies = [ "futures", "halo2_proofs", "hex", + "indexmap", "insta", "itertools", "jubjub", diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index a2a413a8930..9baef5b8399 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -235,15 +235,7 @@ where let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND); // block verification - - let list = CheckpointList::new(network); - - let max_checkpoint_height = if config.checkpoint_sync { - list.max_height() - } else { - list.min_height_in_range(network.mandatory_checkpoint_height()..) - .expect("hardcoded checkpoint list extends past canopy activation") - }; + let (list, max_checkpoint_height) = init_checkpoint_list(config, network); let tip = match state_service .ready() @@ -275,3 +267,20 @@ where max_checkpoint_height, ) } + +/// Parses the checkpoint list for `network` and `config`. +/// Returns the checkpoint list and maximum checkpoint height. +pub fn init_checkpoint_list(config: Config, network: Network) -> (CheckpointList, Height) { + // TODO: Zebra parses the checkpoint list twice at startup. + // Instead, cache the checkpoint list for each `network`. + let list = CheckpointList::new(network); + + let max_checkpoint_height = if config.checkpoint_sync { + list.max_height() + } else { + list.min_height_in_range(network.mandatory_checkpoint_height()..) + .expect("hardcoded checkpoint list extends past canopy activation") + }; + + (list, max_checkpoint_height) +} diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index d62abfe59fb..8816032716c 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -15,6 +15,7 @@ dirs = "4.0.0" displaydoc = "0.2.3" futures = "0.3.24" hex = "0.4.3" +indexmap = "1.9.1" itertools = "0.10.5" lazy_static = "1.4.0" metrics = "0.20.1" diff --git a/zebra-state/src/error.rs b/zebra-state/src/error.rs index 4a08edb1786..e8e5f95ad90 100644 --- a/zebra-state/src/error.rs +++ b/zebra-state/src/error.rs @@ -51,6 +51,10 @@ pub struct CommitBlockError(#[from] ValidateContextError); #[non_exhaustive] #[allow(missing_docs)] pub enum ValidateContextError { + #[error("block parent not found in any chain")] + #[non_exhaustive] + NotReadyToBeCommitted, + #[error("block height {candidate_height:?} is lower than the current finalized height {finalized_tip_height:?}")] #[non_exhaustive] OrphanedBlock { diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index b3ee4d392dd..be1df4d7673 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -19,7 +19,7 @@ use std::{ convert, future::Future, pin::Pin, - sync::{Arc, Mutex}, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -44,6 +44,7 @@ use crate::{ MAX_LEGACY_CHAIN_BLOCKS, }, service::{ + block_iter::any_ancestor_blocks, chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip}, finalized_state::{FinalizedState, ZebraDb}, non_finalized_state::NonFinalizedState, @@ -51,8 +52,8 @@ use crate::{ queued_blocks::QueuedBlocks, watch_receiver::WatchReceiver, }, - BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, ReadRequest, - ReadResponse, Request, Response, ValidateContextError, + BoxError, CloneError, Config, FinalizedBlock, PreparedBlock, ReadRequest, ReadResponse, + Request, Response, }; pub mod block_iter; @@ -61,8 +62,8 @@ pub mod watch_receiver; pub(crate) mod check; -mod finalized_state; -mod non_finalized_state; +pub(crate) mod finalized_state; +pub(crate) mod non_finalized_state; mod pending_utxos; mod queued_blocks; pub(crate) mod read; @@ -76,7 +77,7 @@ mod tests; pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation}; -use self::queued_blocks::{QueuedFinalized, QueuedNonFinalized}; +use self::queued_blocks::{QueuedFinalized, QueuedNonFinalized, SentHashes}; /// A read-write service for Zebra's cached blockchain state. /// @@ -103,6 +104,13 @@ pub(crate) struct StateService { /// The configured Zcash network. network: Network, + /// The height that we start storing UTXOs from finalized blocks. + /// + /// This height should be lower than the last few checkpoints, + /// so the full verifier can verify UTXO spends from those blocks, + /// even if they haven't been committed to the finalized state yet. + full_verifier_utxo_lookahead: block::Height, + // Queued Blocks // /// Queued blocks for the [`NonFinalizedState`] that arrived out of order. @@ -115,23 +123,8 @@ pub(crate) struct StateService { /// Indexed by their parent block hash. queued_finalized_blocks: HashMap, - // Exclusively Writeable State - // - /// The non-finalized chain state, including its in-memory chain forks. - // - // TODO: get rid of this struct member, and just let the block write task own the NonFinalizedState. - mem: NonFinalizedState, - - /// The finalized chain state, including its on-disk database. - // - // TODO: get rid of this struct member, and just let the ReadStateService - // and block write task share ownership of the database. - pub(crate) disk: FinalizedState, - /// A channel to send blocks to the `block_write_task`, /// so they can be written to the [`NonFinalizedState`]. - // - // TODO: actually send blocks on this channel non_finalized_block_write_sender: Option>, @@ -156,7 +149,11 @@ pub(crate) struct StateService { // - turn this into an IndexMap containing recent non-finalized block hashes and heights // (they are all potential tips) // - remove block hashes once their heights are strictly less than the finalized tip - last_block_hash_sent: block::Hash, + last_sent_finalized_block_hash: block::Hash, + + /// A set of non-finalized block hashes that have been sent to the block write task. + /// Hashes of blocks below the finalized tip height are periodically pruned. + sent_non_finalized_block_hashes: SentHashes, /// If an invalid block is sent on `finalized_block_write_sender` /// or `non_finalized_block_write_sender`, @@ -175,17 +172,6 @@ pub(crate) struct StateService { // Updating Concurrently Readable State // - /// A sender channel used to update the current best chain tip for - /// [`LatestChainTip`] and [`ChainTipChange`]. - // - // TODO: remove this copy of the chain tip sender, and get rid of the mutex in the block write task - chain_tip_sender: Arc>, - - /// A sender channel used to update the recent non-finalized state for the [`ReadStateService`]. - non_finalized_state_sender: watch::Sender, - - // Concurrently Readable State - // /// A cloneable [`ReadStateService`], used to answer concurrent read requests. /// /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`. @@ -257,7 +243,12 @@ impl Drop for StateService { std::mem::drop(self.finalized_block_write_sender.take()); std::mem::drop(self.non_finalized_block_write_sender.take()); - self.clear_finalized_block_queue("dropping the state: dropped unused queued block"); + self.clear_finalized_block_queue( + "dropping the state: dropped unused queued finalized block", + ); + self.clear_non_finalized_block_queue( + "dropping the state: dropped unused queued non-finalized block", + ); // Then drop self.read_service, which checks the block write task for panics, // and tries to shut down the database. @@ -298,12 +289,18 @@ impl Drop for ReadStateService { impl StateService { const PRUNE_INTERVAL: Duration = Duration::from_secs(30); - /// Create a new read-write state service. + /// Creates a new state service for the state `config` and `network`. + /// + /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit` + /// to work out when it is near the final checkpoint. + /// /// Returns the read-write and read-only state services, /// and read-only watch channels for its best chain tip. pub fn new( config: Config, network: Network, + max_checkpoint_height: block::Height, + checkpoint_verify_concurrency_limit: usize, ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { let timer = CodeTimer::start(); @@ -321,10 +318,12 @@ impl StateService { let timer = CodeTimer::start(); let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(initial_tip, network); - let chain_tip_sender = Arc::new(Mutex::new(chain_tip_sender)); let non_finalized_state = NonFinalizedState::new(network); + let (non_finalized_state_sender, non_finalized_state_receiver) = + watch::channel(NonFinalizedState::new(finalized_state.network())); + // Security: The number of blocks in these channels is limited by // the syncer and inbound lookahead limits. let (non_finalized_block_write_sender, non_finalized_block_write_receiver) = @@ -335,40 +334,47 @@ impl StateService { tokio::sync::mpsc::unbounded_channel(); let finalized_state_for_writing = finalized_state.clone(); - let chain_tip_sender_for_writing = chain_tip_sender.clone(); let block_write_task = std::thread::spawn(move || { write::write_blocks_from_channels( finalized_block_write_receiver, non_finalized_block_write_receiver, finalized_state_for_writing, + non_finalized_state, invalid_block_reset_sender, - chain_tip_sender_for_writing, + chain_tip_sender, + non_finalized_state_sender, ) }); let block_write_task = Arc::new(block_write_task); - let (read_service, non_finalized_state_sender) = - ReadStateService::new(&finalized_state, block_write_task); + let read_service = ReadStateService::new( + &finalized_state, + block_write_task, + non_finalized_state_receiver, + ); + + let full_verifier_utxo_lookahead = max_checkpoint_height + - i32::try_from(checkpoint_verify_concurrency_limit).expect("fits in i32"); + let full_verifier_utxo_lookahead = + full_verifier_utxo_lookahead.expect("unexpected negative height"); let queued_non_finalized_blocks = QueuedBlocks::default(); let pending_utxos = PendingUtxos::default(); - let last_block_hash_sent = finalized_state.db.finalized_tip_hash(); + let last_sent_finalized_block_hash = finalized_state.db.finalized_tip_hash(); let state = Self { network, + full_verifier_utxo_lookahead, queued_non_finalized_blocks, queued_finalized_blocks: HashMap::new(), - mem: non_finalized_state, - disk: finalized_state, non_finalized_block_write_sender: Some(non_finalized_block_write_sender), finalized_block_write_sender: Some(finalized_block_write_sender), - last_block_hash_sent, + last_sent_finalized_block_hash, + sent_non_finalized_block_hashes: SentHashes::default(), invalid_block_reset_receiver, pending_utxos, last_prune: Instant::now(), - chain_tip_sender, - non_finalized_state_sender, read_service: read_service.clone(), max_queued_finalized_height: f64::NAN, }; @@ -384,7 +390,11 @@ impl StateService { if let Err(error) = check::legacy_chain( nu5_activation_height, - state.any_ancestor_blocks(tip.1), + any_ancestor_blocks( + &state.read_service.latest_non_finalized_state(), + &state.read_service.db, + tip.1, + ), state.network, MAX_LEGACY_CHAIN_BLOCKS, ) { @@ -421,6 +431,13 @@ impl StateService { let queued_prev_hash = finalized.block.header.previous_block_hash; let queued_height = finalized.height; + // If we're close to the final checkpoint, make the block's UTXOs available for + // full verification of non-finalized blocks, even when it is in the channel. + if self.is_close_to_final_checkpoint(queued_height) { + self.sent_non_finalized_block_hashes + .add_finalized(&finalized) + } + let (rsp_tx, rsp_rx) = oneshot::channel(); let queued = (finalized, rsp_tx); @@ -497,7 +514,7 @@ impl StateService { // If a block failed, we need to start again from a valid tip. match self.invalid_block_reset_receiver.try_recv() { - Ok(reset_tip_hash) => self.last_block_hash_sent = reset_tip_hash, + Ok(reset_tip_hash) => self.last_sent_finalized_block_hash = reset_tip_hash, Err(TryRecvError::Disconnected) => { info!("Block commit task closed the block reset channel. Is Zebra shutting down?"); return; @@ -508,9 +525,9 @@ impl StateService { while let Some(queued_block) = self .queued_finalized_blocks - .remove(&self.last_block_hash_sent) + .remove(&self.last_sent_finalized_block_hash) { - self.last_block_hash_sent = queued_block.0.hash; + self.last_sent_finalized_block_hash = queued_block.0.hash; // If we've finished sending finalized blocks, ignore any repeated blocks. // (Blocks can be repeated after a syncer reset.) @@ -550,6 +567,23 @@ impl StateService { std::mem::drop(finalized); } + /// Drops all queued non-finalized blocks, and sends an error on their result channels. + fn clear_non_finalized_block_queue(&mut self, error: impl Into + Clone) { + for (_hash, queued) in self.queued_non_finalized_blocks.drain() { + Self::send_non_finalized_block_error(queued, error.clone()); + } + } + + /// Send an error on a `QueuedNonFinalized` block's result channel, and drop the block + fn send_non_finalized_block_error(queued: QueuedNonFinalized, error: impl Into) { + let (finalized, rsp_tx) = queued; + + // The block sender might have already given up on this block, + // so ignore any channel send errors. + let _ = rsp_tx.send(Err(error.into())); + std::mem::drop(finalized); + } + /// Queue a non finalized block for verification and check if any queued /// blocks are ready to be verified and committed to the state. /// @@ -565,11 +599,20 @@ impl StateService { tracing::debug!(block = %prepared.block, "queueing block for contextual verification"); let parent_hash = prepared.block.header.previous_block_hash; - if self.mem.any_chain_contains(&prepared.hash) - || self.read_service.db.hash(prepared.height).is_some() + if self + .sent_non_finalized_block_hashes + .contains(&prepared.hash) { let (rsp_tx, rsp_rx) = oneshot::channel(); - let _ = rsp_tx.send(Err("block is already committed to the state".into())); + let _ = rsp_tx.send(Err("block already sent to be committed to the state".into())); + return rsp_rx; + } + + if self.read_service.db.contains_height(prepared.height) { + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Err( + "block height is already committed to the finalized state".into(), + )); return rsp_rx; } @@ -601,8 +644,8 @@ impl StateService { if self.finalized_block_write_sender.is_some() && self .queued_non_finalized_blocks - .has_queued_children(self.last_block_hash_sent) - && self.read_service.db.finalized_tip_hash() == self.last_block_hash_sent + .has_queued_children(self.last_sent_finalized_block_hash) + && self.read_service.db.finalized_tip_hash() == self.last_sent_finalized_block_hash { // Tell the block write task to stop committing finalized blocks, // and move on to committing non-finalized blocks. @@ -623,201 +666,93 @@ impl StateService { return rsp_rx; } - // TODO: move this code into the state block commit task: - // - process_queued()'s validate_and_commit() call becomes a send to the block commit channel - // - run validate_and_commit() in the state block commit task - // - run all the rest of the code in this function in the state block commit task - // - move all that code to the inner service - self.process_queued(parent_hash); - - while self.mem.best_chain_len() > crate::constants::MAX_BLOCK_REORG_HEIGHT { - tracing::trace!("finalizing block past the reorg limit"); - let finalized_with_trees = self.mem.finalize(); - self.disk - .commit_finalized_direct(finalized_with_trees, "best non-finalized chain root") - .expect( - "expected that errors would not occur when writing to disk or updating note commitment and history trees", - ); - } - - let finalized_tip_height = self.read_service.db.finalized_tip_height().expect( - "Finalized state must have at least one block before committing non-finalized state", - ); - self.queued_non_finalized_blocks - .prune_by_height(finalized_tip_height); - - let tip_block_height = self.update_latest_chain_channels(); - - // update metrics using the best non-finalized tip - if let Some(tip_block_height) = tip_block_height { - metrics::gauge!( - "state.full_verifier.committed.block.height", - tip_block_height.0 as f64, - ); + // Wait until block commit task is ready to write non-finalized blocks before dequeuing them + if self.finalized_block_write_sender.is_none() { + self.send_ready_non_finalized_queued(parent_hash); - // This height gauge is updated for both fully verified and checkpoint blocks. - // These updates can't conflict, because the state makes sure that blocks - // are committed in order. - metrics::gauge!( - "zcash.chain.verified.block.height", - tip_block_height.0 as f64, + let finalized_tip_height = self.read_service.db.finalized_tip_height().expect( + "Finalized state must have at least one block before committing non-finalized state", ); - } - - tracing::trace!("finished processing queued block"); - rsp_rx - } - - /// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender` - /// channels with the latest non-finalized [`ChainTipBlock`] and - /// [`Chain`][1]. - /// - /// Returns the latest non-finalized chain tip height, or `None` if the - /// non-finalized state is empty. - /// - /// [1]: non_finalized_state::Chain - // - // TODO: remove this clippy allow when we remove self.chain_tip_sender - #[allow(clippy::unwrap_in_result)] - #[instrument(level = "debug", skip(self))] - fn update_latest_chain_channels(&mut self) -> Option { - let best_chain = self.mem.best_chain(); - let tip_block = best_chain - .and_then(|chain| chain.tip_block()) - .cloned() - .map(ChainTipBlock::from); - let tip_block_height = tip_block.as_ref().map(|block| block.height); - - // If the final receiver was just dropped, ignore the error. - let _ = self.non_finalized_state_sender.send(self.mem.clone()); - self.chain_tip_sender - .lock() - .expect("unexpected panic in block commit task or state") - .set_best_non_finalized_tip(tip_block); + self.queued_non_finalized_blocks + .prune_by_height(finalized_tip_height); - tip_block_height - } - - /// Run contextual validation on the prepared block and add it to the - /// non-finalized state if it is contextually valid. - #[tracing::instrument(level = "debug", skip(self, prepared))] - fn validate_and_commit(&mut self, prepared: PreparedBlock) -> Result<(), CommitBlockError> { - self.check_contextual_validity(&prepared)?; - let parent_hash = prepared.block.header.previous_block_hash; - - if self.disk.db.finalized_tip_hash() == parent_hash { - self.mem.commit_new_chain(prepared, &self.disk.db)?; - } else { - self.mem.commit_block(prepared, &self.disk.db)?; + self.sent_non_finalized_block_hashes + .prune_by_height(finalized_tip_height); } - Ok(()) + rsp_rx } /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks. fn can_fork_chain_at(&self, hash: &block::Hash) -> bool { - self.mem.any_chain_contains(hash) || &self.read_service.db.finalized_tip_hash() == hash + self.sent_non_finalized_block_hashes.contains(hash) + || &self.read_service.db.finalized_tip_hash() == hash + } + + /// Returns `true` if `queued_height` is near the final checkpoint. + /// + /// The non-finalized block verifier needs access to UTXOs from finalized blocks + /// near the final checkpoint, so that it can verify blocks that spend those UTXOs. + /// + /// If it doesn't have the required UTXOs, some blocks will time out, + /// but succeed after a syncer restart. + fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool { + queued_height >= self.full_verifier_utxo_lookahead } - /// Attempt to validate and commit all queued blocks whose parents have - /// recently arrived starting from `new_parent`, in breadth-first ordering. + /// Sends all queued blocks whose parents have recently arrived starting from `new_parent` + /// in breadth-first ordering to the block write task which will attempt to validate and commit them #[tracing::instrument(level = "debug", skip(self, new_parent))] - fn process_queued(&mut self, new_parent: block::Hash) { - let mut new_parents: Vec<(block::Hash, Result<(), CloneError>)> = - vec![(new_parent, Ok(()))]; + fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) { + use tokio::sync::mpsc::error::SendError; + if let Some(non_finalized_block_write_sender) = &self.non_finalized_block_write_sender { + let mut new_parents: Vec = vec![new_parent]; + + while let Some(parent_hash) = new_parents.pop() { + let queued_children = self + .queued_non_finalized_blocks + .dequeue_children(parent_hash); + + for queued_child in queued_children { + let (PreparedBlock { hash, .. }, _) = queued_child; + + self.sent_non_finalized_block_hashes.add(&queued_child.0); + let send_result = non_finalized_block_write_sender.send(queued_child); + + if let Err(SendError(queued)) = send_result { + // If Zebra is shutting down, drop blocks and return an error. + Self::send_non_finalized_block_error( + queued, + "block commit task exited. Is Zebra shutting down?", + ); - while let Some((parent_hash, parent_result)) = new_parents.pop() { - let queued_children = self - .queued_non_finalized_blocks - .dequeue_children(parent_hash); + self.clear_non_finalized_block_queue( + "block commit task exited. Is Zebra shutting down?", + ); - for (child, rsp_tx) in queued_children { - let child_hash = child.hash; - let result; + return; + }; - // If the block is invalid, reject any descendant blocks. - // - // At this point, we know that the block and all its descendants - // are invalid, because we checked all the consensus rules before - // committing the block to the non-finalized state. - // (These checks also bind the transaction data to the block - // header, using the transaction merkle tree and authorizing data - // commitment.) - if let Err(ref parent_error) = parent_result { - tracing::trace!( - ?child_hash, - ?parent_error, - "rejecting queued child due to parent error" - ); - result = Err(parent_error.clone()); - } else { - tracing::trace!(?child_hash, "validating queued child"); - result = self.validate_and_commit(child).map_err(CloneError::from); - if result.is_ok() { - // Update the metrics if semantic and contextual validation passes - metrics::counter!("state.full_verifier.committed.block.count", 1); - metrics::counter!("zcash.chain.verified.block.total", 1); - } + new_parents.push(hash); } - - let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from)); - new_parents.push((child_hash, result)); } - } - } - - /// Check that the prepared block is contextually valid for the configured - /// network, based on the committed finalized and non-finalized state. - /// - /// Note: some additional contextual validity checks are performed by the - /// non-finalized [`Chain`](non_finalized_state::Chain). - fn check_contextual_validity( - &mut self, - prepared: &PreparedBlock, - ) -> Result<(), ValidateContextError> { - let relevant_chain = self.any_ancestor_blocks(prepared.block.header.previous_block_hash); - - // Security: check proof of work before any other checks - check::block_is_valid_for_recent_chain( - prepared, - self.network, - self.disk.db.finalized_tip_height(), - relevant_chain, - )?; - check::nullifier::no_duplicates_in_finalized_chain(prepared, &self.disk.db)?; - - Ok(()) + self.sent_non_finalized_block_hashes.finish_batch(); + }; } /// Return the tip of the current best chain. pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> { - self.mem.best_tip().or_else(|| self.read_service.db.tip()) + read::best_tip( + &self.read_service.latest_non_finalized_state(), + &self.read_service.db, + ) } - /// Return the height for the block at `hash` in any chain. - pub fn any_height_by_hash(&self, hash: block::Hash) -> Option { - self.mem - .any_height_by_hash(hash) - .or_else(|| self.read_service.db.height(hash)) - } - - /// Return an iterator over the relevant chain of the block identified by - /// `hash`, in order from the largest height to the genesis block. - /// - /// The block identified by `hash` is included in the chain of blocks yielded - /// by the iterator. `hash` can come from any chain. - pub fn any_ancestor_blocks(&self, hash: block::Hash) -> block_iter::Iter<'_> { - block_iter::Iter { - service: self, - state: block_iter::IterState::NonFinalized(hash), - } - } - - /// Assert some assumptions about the prepared `block` before it is validated. + /// Assert some assumptions about the prepared `block` before it is queued. fn assert_block_can_be_validated(&self, block: &PreparedBlock) { - // required by validate_and_commit, moved here to make testing easier + // required by CommitBlock call assert!( block.height > self.network.mandatory_checkpoint_height(), "invalid non-finalized block height: the canopy checkpoint is mandatory, pre-canopy \ @@ -836,10 +771,8 @@ impl ReadStateService { pub(crate) fn new( finalized_state: &FinalizedState, block_write_task: Arc>, - ) -> (Self, watch::Sender) { - let (non_finalized_state_sender, non_finalized_state_receiver) = - watch::channel(NonFinalizedState::new(finalized_state.network())); - + non_finalized_state_receiver: watch::Receiver, + ) -> Self { let read_service = Self { network: finalized_state.network(), db: finalized_state.db.clone(), @@ -849,7 +782,12 @@ impl ReadStateService { tracing::info!("created new read-only state service"); - (read_service, non_finalized_state_sender) + read_service + } + + /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver` + fn latest_non_finalized_state(&self) -> NonFinalizedState { + self.non_finalized_state_receiver.cloned_watch_data() } } @@ -1017,6 +955,16 @@ impl Service for StateService { return response_fut; } + // Check the sent non-finalized blocks + if let Some(utxo) = self.sent_non_finalized_block_hashes.utxo(&outpoint) { + self.pending_utxos.respond(&outpoint, utxo); + + // We're finished, the returned future gets the UTXO from the respond() channel. + timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized"); + + return response_fut; + } + // We ignore any UTXOs in FinalizedState.queued_finalized_blocks, // because it is only used during checkpoint verification. // @@ -1567,6 +1515,9 @@ impl Service for ReadStateService { /// /// Each `network` has its own separate on-disk database. /// +/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit` +/// to work out when it is near the final checkpoint. +/// /// To share access to the state, wrap the returned service in a `Buffer`, /// or clone the returned [`ReadStateService`]. /// @@ -1576,6 +1527,8 @@ impl Service for ReadStateService { pub fn init( config: Config, network: Network, + max_checkpoint_height: block::Height, + checkpoint_verify_concurrency_limit: usize, ) -> ( BoxService, ReadStateService, @@ -1583,7 +1536,12 @@ pub fn init( ChainTipChange, ) { let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) = - StateService::new(config, network); + StateService::new( + config, + network, + max_checkpoint_height, + checkpoint_verify_concurrency_limit, + ); ( BoxService::new(state_service), @@ -1599,13 +1557,22 @@ pub fn init( pub fn spawn_init( config: Config, network: Network, + max_checkpoint_height: block::Height, + checkpoint_verify_concurrency_limit: usize, ) -> tokio::task::JoinHandle<( BoxService, ReadStateService, LatestChainTip, ChainTipChange, )> { - tokio::task::spawn_blocking(move || init(config, network)) + tokio::task::spawn_blocking(move || { + init( + config, + network, + max_checkpoint_height, + checkpoint_verify_concurrency_limit, + ) + }) } /// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot. @@ -1615,7 +1582,10 @@ pub fn spawn_init( /// See also [`init`]. #[cfg(any(test, feature = "proptest-impl"))] pub fn init_test(network: Network) -> Buffer, Request> { - let (state_service, _, _, _) = StateService::new(Config::ephemeral(), network); + // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit + // if we ever need to test final checkpoint sent UTXO queries + let (state_service, _, _, _) = + StateService::new(Config::ephemeral(), network, block::Height::MAX, 0); Buffer::new(BoxService::new(state_service), 1) } @@ -1633,8 +1603,10 @@ pub fn init_test_services( LatestChainTip, ChainTipChange, ) { + // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit + // if we ever need to test final checkpoint sent UTXO queries let (state_service, read_state_service, latest_chain_tip, chain_tip_change) = - StateService::new(Config::ephemeral(), network); + StateService::new(Config::ephemeral(), network, block::Height::MAX, 0); let state_service = Buffer::new(BoxService::new(state_service), 1); diff --git a/zebra-state/src/service/arbitrary.rs b/zebra-state/src/service/arbitrary.rs index 7b0d6195f72..4eafb92b532 100644 --- a/zebra-state/src/service/arbitrary.rs +++ b/zebra-state/src/service/arbitrary.rs @@ -13,7 +13,7 @@ use tokio::time::timeout; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use zebra_chain::{ - block::Block, + block::{Block, Height}, fmt::{humantime_seconds, SummaryDebug}, history_tree::HistoryTree, parameters::{Network, NetworkUpgrade}, @@ -201,8 +201,10 @@ pub async fn populated_state( .into_iter() .map(|block| Request::CommitFinalizedBlock(block.into())); + // TODO: write a test that checks the finalized to non-finalized transition with UTXOs, + // and set max_checkpoint_height and checkpoint_verify_concurrency_limit correctly. let (state, read_state, latest_chain_tip, mut chain_tip_change) = - StateService::new(Config::ephemeral(), network); + StateService::new(Config::ephemeral(), network, Height::MAX, 0); let mut state = Buffer::new(BoxService::new(state), 1); let mut responses = FuturesUnordered::new(); diff --git a/zebra-state/src/service/block_iter.rs b/zebra-state/src/service/block_iter.rs index 90ea29d2b38..9fc9bc8ff3c 100644 --- a/zebra-state/src/service/block_iter.rs +++ b/zebra-state/src/service/block_iter.rs @@ -4,14 +4,17 @@ use std::sync::Arc; use zebra_chain::block::{self, Block}; -use crate::{service::StateService, HashOrHeight}; +use crate::{service::non_finalized_state::NonFinalizedState, HashOrHeight}; + +use super::finalized_state::ZebraDb; /// Iterator for state blocks. /// /// Starts at any block in any non-finalized or finalized chain, /// and iterates in reverse height order. (Towards the genesis block.) pub(crate) struct Iter<'a> { - pub(super) service: &'a StateService, + pub(super) non_finalized_state: &'a NonFinalizedState, + pub(super) db: &'a ZebraDb, pub(super) state: IterState, } @@ -23,14 +26,18 @@ pub(super) enum IterState { impl Iter<'_> { fn next_non_finalized_block(&mut self) -> Option> { - let Iter { service, state } = self; + let Iter { + non_finalized_state, + db: _, + state, + } = self; let hash = match state { IterState::NonFinalized(hash) => *hash, IterState::Finalized(_) | IterState::Finished => unreachable!(), }; - if let Some(block) = service.mem.any_block_by_hash(hash) { + if let Some(block) = non_finalized_state.any_block_by_hash(hash) { let hash = block.header.previous_block_hash; self.state = IterState::NonFinalized(hash); Some(block) @@ -41,7 +48,11 @@ impl Iter<'_> { #[allow(clippy::unwrap_in_result)] fn next_finalized_block(&mut self) -> Option> { - let Iter { service, state } = self; + let Iter { + non_finalized_state: _, + db, + state, + } = self; let hash_or_height: HashOrHeight = match *state { IterState::Finalized(height) => height.into(), @@ -49,7 +60,7 @@ impl Iter<'_> { IterState::Finished => unreachable!(), }; - if let Some(block) = service.read_service.db.block(hash_or_height) { + if let Some(block) = db.block(hash_or_height) { let height = block .coinbase_height() .expect("valid blocks have a coinbase height"); @@ -66,6 +77,13 @@ impl Iter<'_> { None } } + + /// Return the height for the block at `hash` in any chain. + fn any_height_by_hash(&self, hash: block::Hash) -> Option { + self.non_finalized_state + .any_height_by_hash(hash) + .or_else(|| self.db.height(hash)) + } } impl Iterator for Iter<'_> { @@ -93,7 +111,6 @@ impl ExactSizeIterator for Iter<'_> { fn len(&self) -> usize { match self.state { IterState::NonFinalized(hash) => self - .service .any_height_by_hash(hash) .map(|height| (height.0 + 1) as _) .unwrap_or(0), @@ -102,3 +119,20 @@ impl ExactSizeIterator for Iter<'_> { } } } + +/// Return an iterator over the relevant chain of the block identified by +/// `hash`, in order from the largest height to the genesis block. +/// +/// The block identified by `hash` is included in the chain of blocks yielded +/// by the iterator. `hash` can come from any chain. +pub(crate) fn any_ancestor_blocks<'a>( + non_finalized_state: &'a NonFinalizedState, + db: &'a ZebraDb, + hash: block::Hash, +) -> Iter<'a> { + Iter { + non_finalized_state, + db, + state: IterState::NonFinalized(hash), + } +} diff --git a/zebra-state/src/service/check.rs b/zebra-state/src/service/check.rs index 3372b588524..e540941f33d 100644 --- a/zebra-state/src/service/check.rs +++ b/zebra-state/src/service/check.rs @@ -12,11 +12,21 @@ use zebra_chain::{ work::difficulty::CompactDifficulty, }; -use crate::{BoxError, PreparedBlock, ValidateContextError}; +use crate::{ + service::{ + block_iter::any_ancestor_blocks, finalized_state::FinalizedState, + non_finalized_state::NonFinalizedState, + }, + BoxError, PreparedBlock, ValidateContextError, +}; // use self as check use super::check; +// These types are used in doc links +#[allow(unused_imports)] +use crate::service::non_finalized_state::Chain; + pub(crate) mod anchors; pub(crate) mod difficulty; pub(crate) mod nullifier; @@ -353,3 +363,31 @@ where Ok(()) } + +/// Perform initial contextual validity checks for the configured network, +/// based on the committed finalized and non-finalized state. +/// +/// Additional contextual validity checks are performed by the non-finalized [`Chain`]. +pub(crate) fn initial_contextual_validity( + finalized_state: &FinalizedState, + non_finalized_state: &NonFinalizedState, + prepared: &PreparedBlock, +) -> Result<(), ValidateContextError> { + let relevant_chain = any_ancestor_blocks( + non_finalized_state, + &finalized_state.db, + prepared.block.header.previous_block_hash, + ); + + // Security: check proof of work before any other checks + check::block_is_valid_for_recent_chain( + prepared, + finalized_state.network(), + finalized_state.db.finalized_tip_height(), + relevant_chain, + )?; + + check::nullifier::no_duplicates_in_finalized_chain(prepared, &finalized_state.db)?; + + Ok(()) +} diff --git a/zebra-state/src/service/check/tests/anchors.rs b/zebra-state/src/service/check/tests/anchors.rs index 34b8815936a..d8ac3c0d963 100644 --- a/zebra-state/src/service/check/tests/anchors.rs +++ b/zebra-state/src/service/check/tests/anchors.rs @@ -13,6 +13,7 @@ use zebra_chain::{ use crate::{ arbitrary::Prepare, + service::write::validate_and_commit_non_finalized, tests::setup::{new_state_with_mainnet_genesis, transaction_v4_from_coinbase}, PreparedBlock, }; @@ -25,7 +26,7 @@ use crate::{ fn check_sprout_anchors() { let _init_guard = zebra_test::init(); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Bootstrap a block at height == 1. let block_1 = zebra_test::vectors::BLOCK_MAINNET_1_BYTES @@ -42,7 +43,10 @@ fn check_sprout_anchors() { // Validate and commit [`block_1`]. This will add an anchor referencing the // empty note commitment tree to the state. - assert!(state.validate_and_commit(block_1).is_ok()); + assert!( + validate_and_commit_non_finalized(&finalized_state, &mut non_finalized_state, block_1) + .is_ok() + ); // Bootstrap a block at height == 2 that references the Sprout note commitment tree state // from [`block_1`]. @@ -60,7 +64,10 @@ fn check_sprout_anchors() { let block_2 = prepare_sprout_block(block_2, block_396); // Validate and commit [`block_2`]. This will also check the anchors. - assert_eq!(state.validate_and_commit(block_2), Ok(())); + assert_eq!( + validate_and_commit_non_finalized(&finalized_state, &mut non_finalized_state, block_2), + Ok(()) + ); } fn prepare_sprout_block(mut block_to_prepare: Block, reference_block: Block) -> PreparedBlock { @@ -135,7 +142,7 @@ fn prepare_sprout_block(mut block_to_prepare: Block, reference_block: Block) -> fn check_sapling_anchors() { let _init_guard = zebra_test::init(); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Bootstrap a block at height == 1 that has the first Sapling note commitments let mut block1 = zebra_test::vectors::BLOCK_MAINNET_1_BYTES @@ -181,7 +188,10 @@ fn check_sapling_anchors() { }); let block1 = Arc::new(block1).prepare(); - assert!(state.validate_and_commit(block1).is_ok()); + assert!( + validate_and_commit_non_finalized(&finalized_state, &mut non_finalized_state, block1) + .is_ok() + ); // Bootstrap a block at height == 2 that references the Sapling note commitment tree state // from earlier block @@ -228,5 +238,8 @@ fn check_sapling_anchors() { }); let block2 = Arc::new(block2).prepare(); - assert_eq!(state.validate_and_commit(block2), Ok(())); + assert_eq!( + validate_and_commit_non_finalized(&finalized_state, &mut non_finalized_state, block2), + Ok(()) + ); } diff --git a/zebra-state/src/service/check/tests/nullifier.rs b/zebra-state/src/service/check/tests/nullifier.rs index 89a4f4fa490..a6229eec61e 100644 --- a/zebra-state/src/service/check/tests/nullifier.rs +++ b/zebra-state/src/service/check/tests/nullifier.rs @@ -19,6 +19,7 @@ use zebra_chain::{ use crate::{ arbitrary::Prepare, + service::{read, write::validate_and_commit_non_finalized}, tests::setup::{new_state_with_mainnet_genesis, transaction_v4_from_coinbase}, FinalizedBlock, ValidateContextError::{ @@ -72,50 +73,50 @@ proptest! { block1.transactions.push(transaction.into()); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (mut finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); // randomly choose to commit the block to the finalized or non-finalized state if use_finalized_state { let block1 = FinalizedBlock::from(Arc::new(block1)); - let commit_result = state.disk.commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); // the block was committed - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); // the non-finalized state didn't change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); // the finalized state has the nullifiers - prop_assert!(state - .disk + prop_assert!(finalized_state .contains_sprout_nullifier(&expected_nullifiers[0])); - prop_assert!(state - .disk + prop_assert!(finalized_state .contains_sprout_nullifier(&expected_nullifiers[1])); } else { let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, + block1.clone() + ); // the block was committed prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); // the block data is in the non-finalized state - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_mem)); // the non-finalized state has the nullifiers - prop_assert_eq!(state.mem.chain_set.len(), 1); - prop_assert!(state - .mem + prop_assert_eq!(non_finalized_state.chain_set.len(), 1); + prop_assert!(non_finalized_state .best_contains_sprout_nullifier(&expected_nullifiers[0])); - prop_assert!(state - .mem + prop_assert!(non_finalized_state .best_contains_sprout_nullifier(&expected_nullifiers[1])); } } @@ -144,15 +145,17 @@ proptest! { block1.transactions.push(transaction.into()); - let (mut state, genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); // if the random proptest data produces other errors, // we might need to just check `is_err()` here @@ -165,8 +168,8 @@ proptest! { .into()) ); // block was rejected - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } /// Make sure duplicate sprout nullifiers are rejected by state contextual validation, @@ -201,15 +204,17 @@ proptest! { block1.transactions.push(transaction.into()); - let (mut state, genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); prop_assert_eq!( commit_result, @@ -219,8 +224,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } /// Make sure duplicate sprout nullifiers are rejected by state contextual validation, @@ -258,15 +263,17 @@ proptest! { .transactions .extend([transaction1.into(), transaction2.into()]); - let (mut state, genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); prop_assert_eq!( commit_result, @@ -276,8 +283,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } /// Make sure duplicate sprout nullifiers are rejected by state contextual validation, @@ -320,51 +327,51 @@ proptest! { block1.transactions.push(transaction1.into()); block2.transactions.push(transaction2.into()); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (mut finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); - state.disk.populate_with_anchors(&block2); + finalized_state.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block2); - let mut previous_mem = state.mem.clone(); + let mut previous_mem = non_finalized_state.clone(); let block1_hash; // randomly choose to commit the next block to the finalized or non-finalized state if duplicate_in_finalized_state { let block1 = FinalizedBlock::from(Arc::new(block1)); - let commit_result = state.disk.commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state - .disk + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(finalized_state .contains_sprout_nullifier(&expected_nullifiers[0])); - prop_assert!(state - .disk + prop_assert!(finalized_state .contains_sprout_nullifier(&expected_nullifiers[1])); block1_hash = block1.hash; } else { let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1.clone()); prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state - .mem + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state .best_contains_sprout_nullifier(&expected_nullifiers[0])); - prop_assert!(state - .mem + prop_assert!(non_finalized_state .best_contains_sprout_nullifier(&expected_nullifiers[1])); block1_hash = block1.hash; - previous_mem = state.mem.clone(); + previous_mem = non_finalized_state.clone(); } let block2 = Arc::new(block2).prepare(); - let commit_result = state.validate_and_commit(block2); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block2); prop_assert_eq!( commit_result, @@ -374,8 +381,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(1), block1_hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(1), block1_hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } // sapling @@ -406,31 +413,32 @@ proptest! { block1.transactions.push(transaction.into()); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (mut finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); // randomly choose to commit the block to the finalized or non-finalized state if use_finalized_state { let block1 = FinalizedBlock::from(Arc::new(block1)); - let commit_result = state.disk.commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state.disk.contains_sapling_nullifier(&expected_nullifier)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(finalized_state.contains_sapling_nullifier(&expected_nullifier)); } else { let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1.clone()); prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state - .mem + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state .best_contains_sapling_nullifier(&expected_nullifier)); } } @@ -462,15 +470,17 @@ proptest! { block1.transactions.push(transaction.into()); - let (mut state, genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); prop_assert_eq!( commit_result, @@ -480,8 +490,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } /// Make sure duplicate sapling nullifiers are rejected by state contextual validation, @@ -514,15 +524,17 @@ proptest! { .transactions .extend([transaction1.into(), transaction2.into()]); - let (mut state, genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); prop_assert_eq!( commit_result, @@ -532,8 +544,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } /// Make sure duplicate sapling nullifiers are rejected by state contextual validation, @@ -570,43 +582,47 @@ proptest! { block1.transactions.push(transaction1.into()); block2.transactions.push(transaction2.into()); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (mut finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); - state.disk.populate_with_anchors(&block2); + finalized_state.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block2); - let mut previous_mem = state.mem.clone(); + let mut previous_mem = non_finalized_state.clone(); let block1_hash; // randomly choose to commit the next block to the finalized or non-finalized state if duplicate_in_finalized_state { let block1 = FinalizedBlock::from(Arc::new(block1)); - let commit_result = state.disk.commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state.disk.contains_sapling_nullifier(&duplicate_nullifier)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(finalized_state.contains_sapling_nullifier(&duplicate_nullifier)); block1_hash = block1.hash; } else { let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1.clone()); prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state - .mem + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state + .best_contains_sapling_nullifier(&duplicate_nullifier)); block1_hash = block1.hash; - previous_mem = state.mem.clone(); + previous_mem = non_finalized_state.clone(); } let block2 = Arc::new(block2).prepare(); - let commit_result = state.validate_and_commit(block2); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block2); prop_assert_eq!( commit_result, @@ -616,8 +632,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(1), block1_hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(1), block1_hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } // orchard @@ -650,31 +666,33 @@ proptest! { block1.transactions.push(transaction.into()); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (mut finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); // randomly choose to commit the block to the finalized or non-finalized state if use_finalized_state { let block1 = FinalizedBlock::from(Arc::new(block1)); - let commit_result = state.disk.commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state.disk.contains_orchard_nullifier(&expected_nullifier)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(finalized_state.contains_orchard_nullifier(&expected_nullifier)); } else { let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1.clone()); prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state - .mem + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state + .best_contains_orchard_nullifier(&expected_nullifier)); } } @@ -706,15 +724,17 @@ proptest! { block1.transactions.push(transaction.into()); - let (mut state, genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); prop_assert_eq!( commit_result, @@ -724,8 +744,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } /// Make sure duplicate orchard nullifiers are rejected by state contextual validation, @@ -762,15 +782,17 @@ proptest! { .transactions .extend([transaction1.into(), transaction2.into()]); - let (mut state, genesis) = new_state_with_mainnet_genesis(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block1); - let previous_mem = state.mem.clone(); + let previous_mem = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); prop_assert_eq!( commit_result, @@ -780,8 +802,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } /// Make sure duplicate orchard nullifiers are rejected by state contextual validation, @@ -822,43 +844,46 @@ proptest! { block1.transactions.push(transaction1.into()); block2.transactions.push(transaction2.into()); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); + let (mut finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); // Allows anchor checks to pass - state.disk.populate_with_anchors(&block1); - state.disk.populate_with_anchors(&block2); + finalized_state.populate_with_anchors(&block1); + finalized_state.populate_with_anchors(&block2); - let mut previous_mem = state.mem.clone(); + let mut previous_mem = non_finalized_state.clone(); let block1_hash; // randomly choose to commit the next block to the finalized or non-finalized state if duplicate_in_finalized_state { let block1 = FinalizedBlock::from(Arc::new(block1)); - let commit_result = state.disk.commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state.disk.contains_orchard_nullifier(&duplicate_nullifier)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(finalized_state.contains_orchard_nullifier(&duplicate_nullifier)); block1_hash = block1.hash; } else { let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1.clone()); prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); - prop_assert!(state - .mem + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state .best_contains_orchard_nullifier(&duplicate_nullifier)); block1_hash = block1.hash; - previous_mem = state.mem.clone(); + previous_mem = non_finalized_state.clone(); } let block2 = Arc::new(block2).prepare(); - let commit_result = state.validate_and_commit(block2); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block2); prop_assert_eq!( commit_result, @@ -868,8 +893,8 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(1), block1_hash)), state.best_tip()); - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert_eq!(Some((Height(1), block1_hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_mem)); } } diff --git a/zebra-state/src/service/check/tests/utxo.rs b/zebra-state/src/service/check/tests/utxo.rs index 6ab272bc371..efb70c66504 100644 --- a/zebra-state/src/service/check/tests/utxo.rs +++ b/zebra-state/src/service/check/tests/utxo.rs @@ -16,8 +16,10 @@ use zebra_chain::{ use crate::{ arbitrary::Prepare, constants::MIN_TRANSPARENT_COINBASE_MATURITY, - service::check, - service::StateService, + service::{ + check, finalized_state::FinalizedState, non_finalized_state::NonFinalizedState, read, + write::validate_and_commit_non_finalized, + }, tests::setup::{new_state_with_mainnet_genesis, transaction_v4_from_coinbase}, FinalizedBlock, ValidateContextError::{ @@ -170,40 +172,41 @@ proptest! { .transactions .extend([output_transaction.into(), spend_transaction.into()]); - let (mut state, _genesis) = new_state_with_mainnet_genesis(); - let previous_mem = state.mem.clone(); + let (mut finalized_state, mut non_finalized_state, _genesis) = new_state_with_mainnet_genesis(); + let previous_non_finalized_state = non_finalized_state.clone(); // randomly choose to commit the block to the finalized or non-finalized state if use_finalized_state { let block1 = FinalizedBlock::from(Arc::new(block1)); - let commit_result = state.disk.commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); // the block was committed - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); // the non-finalized state didn't change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the finalized state added then spent the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); // the non-finalized state does not have the UTXO - prop_assert!(state.mem.any_utxo(&expected_outpoint).is_none()); + prop_assert!(non_finalized_state.any_utxo(&expected_outpoint).is_none()); } else { let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1.clone()); // the block was committed prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); // the block data is in the non-finalized state - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the non-finalized state has created and spent the UTXO - prop_assert_eq!(state.mem.chain_set.len(), 1); - let chain = state - .mem + prop_assert_eq!(non_finalized_state.chain_set.len(), 1); + let chain = non_finalized_state .chain_set .iter() .next() @@ -213,7 +216,7 @@ proptest! { prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } } @@ -239,9 +242,9 @@ proptest! { .expect("block should deserialize"); let TestState { - mut state, block1, .. + mut finalized_state, mut non_finalized_state, block1, .. } = new_state_with_mainnet_transparent_data([], [], [output.0.clone()], use_finalized_state_output); - let previous_mem = state.mem.clone(); + let previous_non_finalized_state = non_finalized_state.clone(); let expected_outpoint = transparent::OutPoint { hash: block1.transactions[1].hash(), @@ -262,32 +265,33 @@ proptest! { if use_finalized_state_spend { let block2 = FinalizedBlock::from(Arc::new(block2)); - let commit_result = state.disk.commit_finalized_direct(block2.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block2.clone().into(), "test"); // the block was committed - prop_assert_eq!(Some((Height(2), block2.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(2), block2.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); // the non-finalized state didn't change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the finalized state has spent the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } else { let block2 = Arc::new(block2).prepare(); - let commit_result = state.validate_and_commit(block2.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block2.clone()); // the block was committed prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(2), block2.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(2), block2.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); // the block data is in the non-finalized state - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the UTXO is spent - prop_assert_eq!(state.mem.chain_set.len(), 1); - let chain = state - .mem + prop_assert_eq!(non_finalized_state.chain_set.len(), 1); + let chain = non_finalized_state .chain_set .iter() .next() @@ -299,14 +303,14 @@ proptest! { prop_assert!(!chain.created_utxos.contains_key(&expected_outpoint)); prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); // the finalized state has the UTXO, but it will get deleted on commit - prop_assert!(state.disk.utxo(&expected_outpoint).is_some()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_some()); } else { // the chain has spent its own UTXO prop_assert!(!chain.unspent_utxos().contains_key(&expected_outpoint)); prop_assert!(chain.created_utxos.contains_key(&expected_outpoint)); prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } } } @@ -348,11 +352,13 @@ proptest! { .transactions .extend([output_transaction.into(), spend_transaction.into()]); - let (mut state, genesis) = new_state_with_mainnet_genesis(); - let previous_mem = state.mem.clone(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); + let previous_non_finalized_state = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); // the block was rejected prop_assert_eq!( @@ -363,13 +369,13 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); // the non-finalized state did not change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } /// Make sure a duplicate transparent spend, by two inputs in the same transaction, @@ -389,9 +395,9 @@ proptest! { .expect("block should deserialize"); let TestState { - mut state, block1, .. + finalized_state, mut non_finalized_state, block1, .. } = new_state_with_mainnet_transparent_data([], [], [output.0.clone()], use_finalized_state_output); - let previous_mem = state.mem.clone(); + let previous_non_finalized_state = non_finalized_state.clone(); let expected_outpoint = transparent::OutPoint { hash: block1.transactions[1].hash(), @@ -412,7 +418,9 @@ proptest! { block2.transactions.push(spend_transaction.into()); let block2 = Arc::new(block2).prepare(); - let commit_result = state.validate_and_commit(block2); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block2); // the block was rejected prop_assert_eq!( @@ -423,19 +431,18 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(1), block1.hash())), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash())), read::best_tip(&non_finalized_state, &finalized_state.db)); // the non-finalized state did not change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); if use_finalized_state_output { // the finalized state has the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_some()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_some()); // the non-finalized state has no chains (so it can't have the UTXO) - prop_assert!(state.mem.chain_set.iter().next().is_none()); + prop_assert!(non_finalized_state.chain_set.iter().next().is_none()); } else { - let chain = state - .mem + let chain = non_finalized_state .chain_set .iter() .next() @@ -443,7 +450,7 @@ proptest! { // the non-finalized state has the UTXO prop_assert!(chain.unspent_utxos().contains_key(&expected_outpoint)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } } @@ -465,9 +472,9 @@ proptest! { .expect("block should deserialize"); let TestState { - mut state, block1, .. + finalized_state, mut non_finalized_state, block1, .. } = new_state_with_mainnet_transparent_data([], [], [output.0.clone()], use_finalized_state_output); - let previous_mem = state.mem.clone(); + let previous_non_finalized_state = non_finalized_state.clone(); let expected_outpoint = transparent::OutPoint { hash: block1.transactions[1].hash(), @@ -495,7 +502,9 @@ proptest! { .extend([spend_transaction1.into(), spend_transaction2.into()]); let block2 = Arc::new(block2).prepare(); - let commit_result = state.validate_and_commit(block2); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block2); // the block was rejected prop_assert_eq!( @@ -506,19 +515,18 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(1), block1.hash())), state.best_tip()); + prop_assert_eq!(Some((Height(1), block1.hash())), read::best_tip(&non_finalized_state, &finalized_state.db)); // the non-finalized state did not change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); if use_finalized_state_output { // the finalized state has the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_some()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_some()); // the non-finalized state has no chains (so it can't have the UTXO) - prop_assert!(state.mem.chain_set.iter().next().is_none()); + prop_assert!(non_finalized_state.chain_set.iter().next().is_none()); } else { - let chain = state - .mem + let chain = non_finalized_state .chain_set .iter() .next() @@ -526,7 +534,7 @@ proptest! { // the non-finalized state has the UTXO prop_assert!(chain.unspent_utxos().contains_key(&expected_outpoint)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } } @@ -558,9 +566,9 @@ proptest! { .expect("block should deserialize"); let TestState { - mut state, block1, .. + mut finalized_state, mut non_finalized_state, block1, .. } = new_state_with_mainnet_transparent_data([], [], [output.0.clone()], use_finalized_state_output); - let mut previous_mem = state.mem.clone(); + let mut previous_non_finalized_state = non_finalized_state.clone(); let expected_outpoint = transparent::OutPoint { hash: block1.transactions[1].hash(), @@ -591,33 +599,34 @@ proptest! { if use_finalized_state_spend { let block2 = FinalizedBlock::from(block2.clone()); - let commit_result = state.disk.commit_finalized_direct(block2.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block2.clone().into(), "test"); // the block was committed - prop_assert_eq!(Some((Height(2), block2.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(2), block2.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); prop_assert!(commit_result.is_ok()); // the non-finalized state didn't change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the finalized state has spent the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); // the non-finalized state does not have the UTXO - prop_assert!(state.mem.any_utxo(&expected_outpoint).is_none()); + prop_assert!(non_finalized_state.any_utxo(&expected_outpoint).is_none()); } else { let block2 = block2.clone().prepare(); - let commit_result = state.validate_and_commit(block2.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block2.clone()); // the block was committed prop_assert_eq!(commit_result, Ok(())); - prop_assert_eq!(Some((Height(2), block2.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(2), block2.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); // the block data is in the non-finalized state - prop_assert!(!state.mem.eq_internal_state(&previous_mem)); + prop_assert!(!non_finalized_state.eq_internal_state(&previous_non_finalized_state)); - prop_assert_eq!(state.mem.chain_set.len(), 1); - let chain = state - .mem + prop_assert_eq!(non_finalized_state.chain_set.len(), 1); + let chain = non_finalized_state .chain_set .iter() .next() @@ -625,7 +634,7 @@ proptest! { if use_finalized_state_output { // the finalized state has the unspent UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_some()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_some()); // the non-finalized state has spent the UTXO prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); } else { @@ -634,14 +643,16 @@ proptest! { prop_assert!(chain.created_utxos.contains_key(&expected_outpoint)); prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } - previous_mem = state.mem.clone(); + previous_non_finalized_state = non_finalized_state.clone(); } let block3 = Arc::new(block3).prepare(); - let commit_result = state.validate_and_commit(block3); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block3); // the block was rejected if use_finalized_state_spend { @@ -663,23 +674,23 @@ proptest! { .into()) ); } - prop_assert_eq!(Some((Height(2), block2.hash())), state.best_tip()); + prop_assert_eq!(Some((Height(2), block2.hash())), read::best_tip(&non_finalized_state, &finalized_state.db)); // the non-finalized state did not change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // Since the non-finalized state has not changed, we don't need to check it again if use_finalized_state_spend { // the finalized state has spent the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } else if use_finalized_state_output { // the finalized state has the unspent UTXO // but the non-finalized state has spent it - prop_assert!(state.disk.utxo(&expected_outpoint).is_some()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_some()); } else { // the non-finalized state has created and spent the UTXO // and the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } } @@ -709,11 +720,13 @@ proptest! { block1.transactions.push(spend_transaction.into()); - let (mut state, genesis) = new_state_with_mainnet_genesis(); - let previous_mem = state.mem.clone(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); + let previous_non_finalized_state = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); // the block was rejected prop_assert_eq!( @@ -724,13 +737,13 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); // the non-finalized state did not change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } /// Make sure transparent output spends are rejected by state contextual validation, @@ -772,11 +785,13 @@ proptest! { .transactions .extend([spend_transaction.into(), output_transaction.into()]); - let (mut state, genesis) = new_state_with_mainnet_genesis(); - let previous_mem = state.mem.clone(); + let (finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); + let previous_non_finalized_state = non_finalized_state.clone(); let block1 = Arc::new(block1).prepare(); - let commit_result = state.validate_and_commit(block1); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, block1); // the block was rejected prop_assert_eq!( @@ -786,20 +801,23 @@ proptest! { } .into()) ); - prop_assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); + prop_assert_eq!(Some((Height(0), genesis.hash)), read::best_tip(&non_finalized_state, &finalized_state.db)); // the non-finalized state did not change - prop_assert!(state.mem.eq_internal_state(&previous_mem)); + prop_assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); // the finalized state does not have the UTXO - prop_assert!(state.disk.utxo(&expected_outpoint).is_none()); + prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } } /// State associated with transparent UTXO tests. struct TestState { - /// The pre-populated state service. - state: StateService, + /// The pre-populated finalized state. + finalized_state: FinalizedState, + + /// The pre-populated non-finalized state. + non_finalized_state: NonFinalizedState, /// The genesis block that has already been committed to the `state` service's /// finalized state. @@ -818,8 +836,8 @@ fn new_state_with_mainnet_transparent_data( outputs: impl IntoIterator, use_finalized_state: bool, ) -> TestState { - let (mut state, genesis) = new_state_with_mainnet_genesis(); - let previous_mem = state.mem.clone(); + let (mut finalized_state, mut non_finalized_state, genesis) = new_state_with_mainnet_genesis(); + let previous_non_finalized_state = non_finalized_state.clone(); let mut block1 = zebra_test::vectors::BLOCK_MAINNET_1_BYTES .zcash_deserialize_into::() @@ -846,26 +864,31 @@ fn new_state_with_mainnet_transparent_data( if use_finalized_state { let block1 = FinalizedBlock::from(block1.clone()); - let commit_result = state - .disk - .commit_finalized_direct(block1.clone().into(), "test"); + let commit_result = finalized_state.commit_finalized_direct(block1.clone().into(), "test"); // the block was committed - assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + assert_eq!( + Some((Height(1), block1.hash)), + read::best_tip(&non_finalized_state, &finalized_state.db) + ); assert!(commit_result.is_ok()); // the non-finalized state didn't change - assert!(state.mem.eq_internal_state(&previous_mem)); + assert!(non_finalized_state.eq_internal_state(&previous_non_finalized_state)); for expected_outpoint in expected_outpoints { // the finalized state has the UTXOs - assert!(state.disk.utxo(&expected_outpoint).is_some()); + assert!(finalized_state.utxo(&expected_outpoint).is_some()); // the non-finalized state does not have the UTXOs - assert!(state.mem.any_utxo(&expected_outpoint).is_none()); + assert!(non_finalized_state.any_utxo(&expected_outpoint).is_none()); } } else { let block1 = block1.clone().prepare(); - let commit_result = state.validate_and_commit(block1.clone()); + let commit_result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, + block1.clone(), + ); // the block was committed assert_eq!( @@ -877,17 +900,19 @@ fn new_state_with_mainnet_transparent_data( block1.block.transactions[0], block1.block.transactions[1], ); - assert_eq!(Some((Height(1), block1.hash)), state.best_tip()); + assert_eq!( + Some((Height(1), block1.hash)), + read::best_tip(&non_finalized_state, &finalized_state.db) + ); // the block data is in the non-finalized state - assert!(!state.mem.eq_internal_state(&previous_mem)); + assert!(!non_finalized_state.eq_internal_state(&previous_non_finalized_state)); - assert_eq!(state.mem.chain_set.len(), 1); + assert_eq!(non_finalized_state.chain_set.len(), 1); for expected_outpoint in expected_outpoints { // the non-finalized state has the unspent UTXOs - assert!(state - .mem + assert!(non_finalized_state .chain_set .iter() .next() @@ -895,12 +920,13 @@ fn new_state_with_mainnet_transparent_data( .unspent_utxos() .contains_key(&expected_outpoint)); // the finalized state does not have the UTXOs - assert!(state.disk.utxo(&expected_outpoint).is_none()); + assert!(finalized_state.utxo(&expected_outpoint).is_none()); } } TestState { - state, + finalized_state, + non_finalized_state, genesis, block1, } diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 515074b1909..4bde4debae6 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -25,7 +25,7 @@ use zebra_chain::{block, parameters::Network}; use crate::{ request::FinalizedWithTrees, service::{check, QueuedFinalized}, - BoxError, Config, FinalizedBlock, + BoxError, CloneError, Config, FinalizedBlock, }; mod disk_db; @@ -171,15 +171,13 @@ impl FinalizedState { ); }; - // Some io errors can't be cloned, so we format them instead. - let owned_result = result - .as_ref() - .map(|_hash| finalized) - .map_err(|error| format!("{:?}", error).into()); + // Make the error cloneable, so we can send it to the block verify future, + // and the block write task. + let result = result.map_err(CloneError::from); - let _ = rsp_tx.send(result); + let _ = rsp_tx.send(result.clone().map_err(BoxError::from)); - owned_result + result.map(|_hash| finalized).map_err(BoxError::from) } /// Immediately commit a `finalized` block to the finalized state. diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index cc707f51770..b6affaf1610 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -67,6 +67,14 @@ impl ZebraDb { self.db.zs_last_key_value(&hash_by_height) } + /// Returns `true` if `height` is present in the finalized state. + #[allow(clippy::unwrap_in_result)] + pub fn contains_height(&self, height: block::Height) -> bool { + let hash_by_height = self.db.cf_handle("hash_by_height").unwrap(); + + self.db.zs_contains(&hash_by_height, &height) + } + /// Returns the finalized hash for a given `block::Height` if it is present. #[allow(clippy::unwrap_in_result)] pub fn hash(&self, height: block::Height) -> Option { diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index fe887799304..0c9b23a5170 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -322,6 +322,7 @@ impl NonFinalizedState { /// Returns `true` if `hash` is contained in the non-finalized portion of any /// known chain. + #[allow(dead_code)] pub fn any_chain_contains(&self, hash: &block::Hash) -> bool { self.chain_set .iter() @@ -377,6 +378,7 @@ impl NonFinalizedState { } /// Returns the tip of the best chain. + #[allow(dead_code)] pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> { let best_chain = self.best_chain()?; let height = best_chain.non_finalized_tip_height(); @@ -478,9 +480,8 @@ impl NonFinalizedState { ) .transpose() }) - .expect( - "commit_block is only called with blocks that are ready to be committed", - )?; + .transpose()? + .ok_or(ValidateContextError::NotReadyToBeCommitted)?; Ok(Arc::new(fork_chain)) } diff --git a/zebra-state/src/service/queued_blocks.rs b/zebra-state/src/service/queued_blocks.rs index 4cfa12d89ec..35cd3bbcd04 100644 --- a/zebra-state/src/service/queued_blocks.rs +++ b/zebra-state/src/service/queued_blocks.rs @@ -1,7 +1,7 @@ //! Queued blocks that are awaiting their parent block for verification. use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{hash_map::Drain, BTreeMap, HashMap, HashSet, VecDeque}, mem, }; @@ -52,26 +52,26 @@ impl QueuedBlocks { let new_height = new.0.height; let parent_hash = new.0.block.header.previous_block_hash; + if self.blocks.contains_key(&new_hash) { + // Skip queueing the block and return early if the hash is not unique + return; + } + // Track known UTXOs in queued blocks. for (outpoint, ordered_utxo) in new.0.new_outputs.iter() { self.known_utxos .insert(*outpoint, ordered_utxo.utxo.clone()); } - let replaced = self.blocks.insert(new_hash, new); - assert!(replaced.is_none(), "hashes must be unique"); - let inserted = self - .by_height + self.blocks.insert(new_hash, new); + self.by_height .entry(new_height) .or_default() .insert(new_hash); - assert!(inserted, "hashes must be unique"); - let inserted = self - .by_parent + self.by_parent .entry(parent_hash) .or_default() .insert(new_hash); - assert!(inserted, "hashes must be unique"); tracing::trace!(%parent_hash, queued = %self.blocks.len(), "queued block"); self.update_metrics(); @@ -196,4 +196,129 @@ impl QueuedBlocks { pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option { self.known_utxos.get(outpoint).cloned() } + + /// Clears known_utxos, by_parent, and by_height, then drains blocks. + /// Returns all key-value pairs of blocks as an iterator + pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedNonFinalized> { + self.known_utxos.clear(); + self.known_utxos.shrink_to_fit(); + self.by_parent.clear(); + self.by_parent.shrink_to_fit(); + self.by_height.clear(); + + self.blocks.drain() + } +} + +#[derive(Debug, Default)] +pub(crate) struct SentHashes { + /// A list of previously sent block batches, each batch is in increasing height order. + /// We use this list to efficiently prune outdated hashes that are at or below the finalized tip. + bufs: Vec>, + + /// The list of blocks sent in the current batch, in increasing height order. + curr_buf: VecDeque<(block::Hash, block::Height)>, + + /// Stores a set of hashes that have been sent to the block write task but + /// may not be in the finalized state yet. + sent: HashMap>, + + /// Known UTXOs. + known_utxos: HashMap, +} + +impl SentHashes { + /// Stores the `block`'s hash, height, and UTXOs, so they can be used to check if a block or UTXO + /// is available in the state. + /// + /// Assumes that blocks are added in the order of their height between `finish_batch` calls + /// for efficient pruning. + pub fn add(&mut self, block: &PreparedBlock) { + // Track known UTXOs in sent blocks. + let outpoints = block + .new_outputs + .iter() + .map(|(outpoint, ordered_utxo)| { + self.known_utxos + .insert(*outpoint, ordered_utxo.utxo.clone()); + outpoint + }) + .cloned() + .collect(); + + self.curr_buf.push_back((block.hash, block.height)); + self.sent.insert(block.hash, outpoints); + } + + /// Stores the finalized `block`'s hash, height, and UTXOs, so they can be used to check if a + /// block or UTXO is available in the state. + /// + /// Used for finalized blocks close to the final checkpoint, so non-finalized blocks can look up + /// their UTXOs. + /// + /// For more details see `add()`. + pub fn add_finalized(&mut self, block: &FinalizedBlock) { + // Track known UTXOs in sent blocks. + let outpoints = block + .new_outputs + .iter() + .map(|(outpoint, utxo)| { + self.known_utxos.insert(*outpoint, utxo.clone()); + outpoint + }) + .cloned() + .collect(); + + self.curr_buf.push_back((block.hash, block.height)); + self.sent.insert(block.hash, outpoints); + } + + /// Try to look up this UTXO in any sent block. + #[instrument(skip(self))] + pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option { + self.known_utxos.get(outpoint).cloned() + } + + /// Finishes the current block batch, and stores it for efficient pruning. + pub fn finish_batch(&mut self) { + if !self.curr_buf.is_empty() { + self.bufs.push(std::mem::take(&mut self.curr_buf)); + } + } + + /// Prunes sent blocks at or below `height_bound`. + /// + /// Finishes the batch if `finish_batch()` hasn't been called already. + /// + /// Assumes that blocks will be added in order of their heights between each `finish_batch()` call, + /// so that blocks can be efficiently and reliably removed by height. + pub fn prune_by_height(&mut self, height_bound: block::Height) { + self.finish_batch(); + + // Iterates over each buf in `sent_bufs`, removing sent blocks until reaching + // the first block with a height above the `height_bound`. + self.bufs.retain_mut(|buf| { + while let Some((hash, height)) = buf.pop_front() { + if height > height_bound { + buf.push_front((hash, height)); + return true; + } else if let Some(expired_outpoints) = self.sent.remove(&hash) { + // TODO: only remove UTXOs if there are no queued blocks with that UTXO + // (known_utxos is best-effort, so this is ok for now) + for outpoint in expired_outpoints.iter() { + self.known_utxos.remove(outpoint); + } + } + } + + false + }); + + self.sent.shrink_to_fit(); + } + + /// Returns true if SentHashes contains the `hash` + pub fn contains(&self, hash: &block::Hash) -> bool { + self.sent.contains_key(hash) + } } diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index d1f7f4c0a81..f39e2c543a3 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -29,7 +29,7 @@ pub use address::{ }; pub use block::{any_utxo, block, block_header, transaction, transaction_hashes_for_block, utxo}; pub use find::{ - block_locator, chain_contains_hash, depth, find_chain_hashes, find_chain_headers, + best_tip, block_locator, chain_contains_hash, depth, find_chain_hashes, find_chain_headers, hash_by_height, height_by_hash, tip, tip_height, }; pub use tree::{orchard_tree, sapling_tree}; diff --git a/zebra-state/src/service/read/find.rs b/zebra-state/src/service/read/find.rs index 50c7bf7d1a4..c78271335b9 100644 --- a/zebra-state/src/service/read/find.rs +++ b/zebra-state/src/service/read/find.rs @@ -21,12 +21,24 @@ use zebra_chain::block::{self, Height}; use crate::{ constants, - service::{finalized_state::ZebraDb, non_finalized_state::Chain, read::block::block_header}, + service::{ + finalized_state::ZebraDb, + non_finalized_state::{Chain, NonFinalizedState}, + read::block::block_header, + }, }; #[cfg(test)] mod tests; +/// Returns the tip of the best chain in the non-finalized or finalized state. +pub fn best_tip( + non_finalized_state: &NonFinalizedState, + db: &ZebraDb, +) -> Option<(block::Height, block::Hash)> { + tip(non_finalized_state.best_chain(), db) +} + /// Returns the tip of `chain`. /// If there is no chain, returns the tip of `db`. pub fn tip(chain: Option, db: &ZebraDb) -> Option<(Height, block::Hash)> diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index fbb7b23632c..4723e9b6856 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -7,7 +7,7 @@ use std::{env, sync::Arc, time::Duration}; use tower::{buffer::Buffer, util::BoxService}; use zebra_chain::{ - block::{self, Block, CountedHeader}, + block::{self, Block, CountedHeader, Height}, chain_tip::ChainTip, fmt::SummaryDebug, parameters::{Network, NetworkUpgrade}, @@ -400,11 +400,12 @@ proptest! { ) { let _init_guard = zebra_test::init(); - let (mut state_service, _, _, _) = StateService::new(Config::ephemeral(), network); + // We're waiting to verify each block here, so we don't need the maximum checkpoint height. + let (mut state_service, _, _, _) = StateService::new(Config::ephemeral(), network, Height::MAX, 0); - prop_assert_eq!(state_service.disk.finalized_value_pool(), ValueBalance::zero()); + prop_assert_eq!(state_service.read_service.db.finalized_value_pool(), ValueBalance::zero()); prop_assert_eq!( - state_service.mem.best_chain().map(|chain| chain.chain_value_pools).unwrap_or_else(ValueBalance::zero), + state_service.read_service.latest_non_finalized_state().best_chain().map(|chain| chain.chain_value_pools).unwrap_or_else(ValueBalance::zero), ValueBalance::zero() ); @@ -429,7 +430,7 @@ proptest! { prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result); prop_assert_eq!( - state_service.disk.finalized_value_pool(), + state_service.read_service.db.finalized_value_pool(), expected_finalized_value_pool.clone()?.constrain()? ); @@ -438,7 +439,7 @@ proptest! { let transparent_value = ValueBalance::from_transparent_amount(transparent_value); expected_transparent_pool = (expected_transparent_pool + transparent_value).unwrap(); prop_assert_eq!( - state_service.disk.finalized_value_pool(), + state_service.read_service.db.finalized_value_pool(), expected_transparent_pool ); } @@ -455,7 +456,7 @@ proptest! { prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result); prop_assert_eq!( - state_service.mem.best_chain().unwrap().chain_value_pools, + state_service.read_service.latest_non_finalized_state().best_chain().unwrap().chain_value_pools, expected_non_finalized_value_pool.clone()?.constrain()? ); @@ -464,7 +465,7 @@ proptest! { let transparent_value = ValueBalance::from_transparent_amount(transparent_value); expected_transparent_pool = (expected_transparent_pool + transparent_value).unwrap(); prop_assert_eq!( - state_service.mem.best_chain().unwrap().chain_value_pools, + state_service.read_service.latest_non_finalized_state().best_chain().unwrap().chain_value_pools, expected_transparent_pool ); } @@ -491,7 +492,8 @@ proptest! { ) { let _init_guard = zebra_test::init(); - let (mut state_service, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network); + // We're waiting to verify each block here, so we don't need the maximum checkpoint height. + let (mut state_service, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network, Height::MAX, 0); prop_assert_eq!(latest_chain_tip.best_tip_height(), None); prop_assert_eq!(chain_tip_change.last_tip_change(), None); diff --git a/zebra-state/src/service/watch_receiver.rs b/zebra-state/src/service/watch_receiver.rs index a3cb4826477..c52ca560503 100644 --- a/zebra-state/src/service/watch_receiver.rs +++ b/zebra-state/src/service/watch_receiver.rs @@ -82,17 +82,19 @@ where // Without this change, an eager reader can repeatedly block the channel writer. // This seems to happen easily in RPC & ReadStateService futures. // (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.) - let cloned_data = { - let borrow_guard = self.receiver.borrow(); - let cloned_data = borrow_guard.clone(); - std::mem::drop(borrow_guard); - - cloned_data - }; + let cloned_data = self.cloned_watch_data(); f(cloned_data) } + /// Returns a clone of the watch data in the channel. + /// This helps avoid deadlocks. + /// + /// See `with_watch_data()` for details. + pub fn cloned_watch_data(&self) -> T { + self.receiver.borrow().clone() + } + /// Calls [`watch::Receiver::changed`] and returns the result. pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> { self.receiver.changed().await diff --git a/zebra-state/src/service/write.rs b/zebra-state/src/service/write.rs index f3aa6e86f12..5ba92218431 100644 --- a/zebra-state/src/service/write.rs +++ b/zebra-state/src/service/write.rs @@ -1,33 +1,110 @@ //! Writing blocks to the finalized and non-finalized states. -use std::sync::{Arc, Mutex}; +use indexmap::IndexMap; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + watch, +}; use zebra_chain::block::{self, Height}; +use crate::{ + constants::MAX_BLOCK_REORG_HEIGHT, + service::{ + check, + finalized_state::FinalizedState, + non_finalized_state::NonFinalizedState, + queued_blocks::{QueuedFinalized, QueuedNonFinalized}, + BoxError, ChainTipBlock, ChainTipSender, CloneError, + }, + CommitBlockError, PreparedBlock, +}; + +// These types are used in doc links +#[allow(unused_imports)] use crate::service::{ - finalized_state::FinalizedState, - queued_blocks::{QueuedFinalized, QueuedNonFinalized}, - ChainTipBlock, ChainTipSender, + chain_tip::{ChainTipChange, LatestChainTip}, + non_finalized_state::Chain, }; -/// Reads blocks from the channels, writes them to the `finalized_state`, -/// and updates the `chain_tip_sender`. +/// The maximum size of the parent error map. +/// +/// We allow enough space for multiple concurrent chain forks with errors. +const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2; + +/// Run contextual validation on the prepared block and add it to the +/// non-finalized state if it is contextually valid. +#[tracing::instrument(level = "debug", skip(prepared), fields(height = ?prepared.height, hash = %prepared.hash))] +pub(crate) fn validate_and_commit_non_finalized( + finalized_state: &FinalizedState, + non_finalized_state: &mut NonFinalizedState, + prepared: PreparedBlock, +) -> Result<(), CommitBlockError> { + check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?; + let parent_hash = prepared.block.header.previous_block_hash; + + if finalized_state.db.finalized_tip_hash() == parent_hash { + non_finalized_state.commit_new_chain(prepared, &finalized_state.db)?; + } else { + non_finalized_state.commit_block(prepared, &finalized_state.db)?; + } + + Ok(()) +} + +/// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender` +/// channels with the latest non-finalized [`ChainTipBlock`] and +/// [`Chain`]. +/// +/// Returns the latest non-finalized chain tip height. +/// +/// # Panics /// -/// TODO: pass the non-finalized state and associated update channel to this function +/// If the `non_finalized_state` is empty. +#[instrument(level = "debug", skip(chain_tip_sender, non_finalized_state_sender))] +fn update_latest_chain_channels( + non_finalized_state: &NonFinalizedState, + chain_tip_sender: &mut ChainTipSender, + non_finalized_state_sender: &watch::Sender, +) -> block::Height { + let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels"); + + let tip_block = best_chain + .tip_block() + .expect("unexpected empty chain: must commit at least one block before updating channels") + .clone(); + let tip_block = ChainTipBlock::from(tip_block); + + let tip_block_height = tip_block.height; + + // If the final receiver was just dropped, ignore the error. + let _ = non_finalized_state_sender.send(non_finalized_state.clone()); + + chain_tip_sender.set_best_non_finalized_tip(tip_block); + + tip_block_height +} + +/// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`, +/// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and +/// `non_finalized_state_sender`. +// TODO: make the task an object +#[allow(clippy::too_many_arguments)] #[instrument(skip( finalized_block_write_receiver, non_finalized_block_write_receiver, invalid_block_reset_sender, - chain_tip_sender + chain_tip_sender, + non_finalized_state_sender, ))] pub fn write_blocks_from_channels( - mut finalized_block_write_receiver: tokio::sync::mpsc::UnboundedReceiver, - mut non_finalized_block_write_receiver: tokio::sync::mpsc::UnboundedReceiver< - QueuedNonFinalized, - >, + mut finalized_block_write_receiver: UnboundedReceiver, + mut non_finalized_block_write_receiver: UnboundedReceiver, mut finalized_state: FinalizedState, - invalid_block_reset_sender: tokio::sync::mpsc::UnboundedSender, - chain_tip_sender: Arc>, + mut non_finalized_state: NonFinalizedState, + invalid_block_reset_sender: UnboundedSender, + mut chain_tip_sender: ChainTipSender, + non_finalized_state_sender: watch::Sender, ) { // Write all the finalized blocks sent by the state, // until the state closes the finalized block channel's sender. @@ -70,12 +147,7 @@ pub fn write_blocks_from_channels( Ok(finalized) => { let tip_block = ChainTipBlock::from(finalized); - // TODO: update the chain tip sender with non-finalized blocks in this function, - // and get rid of the mutex - chain_tip_sender - .lock() - .expect("unexpected panic in block commit task or state") - .set_finalized_tip(tip_block); + chain_tip_sender.set_finalized_tip(tip_block); } Err(error) => { let finalized_tip = finalized_state.db.tip(); @@ -108,31 +180,108 @@ pub fn write_blocks_from_channels( return; } - // Write all the finalized blocks sent by the state, until Zebra shuts down. - while let Some(_block) = non_finalized_block_write_receiver.blocking_recv() { - if invalid_block_reset_sender.is_closed() { - info!("StateService closed the block reset channel. Is Zebra shutting down?"); - return; + // Save any errors to propagate down to queued child blocks + let mut parent_error_map: IndexMap = IndexMap::new(); + + while let Some((queued_child, rsp_tx)) = non_finalized_block_write_receiver.blocking_recv() { + let child_hash = queued_child.hash; + let parent_hash = queued_child.block.header.previous_block_hash; + let parent_error = parent_error_map.get(&parent_hash); + + let result; + + // If the parent block was marked as rejected, also reject all its children. + // + // At this point, we know that all the block's descendants + // are invalid, because we checked all the consensus rules before + // committing the failing ancestor block to the non-finalized state. + if let Some(parent_error) = parent_error { + tracing::trace!( + ?child_hash, + ?parent_error, + "rejecting queued child due to parent error" + ); + result = Err(parent_error.clone()); + } else { + tracing::trace!(?child_hash, "validating queued child"); + result = validate_and_commit_non_finalized( + &finalized_state, + &mut non_finalized_state, + queued_child, + ) + .map_err(CloneError::from); } - // TODO: - // - read from the channel - // - commit blocks to the non-finalized state - // - if there are any ready, commit blocks to the finalized state - // - handle errors by sending a reset with all the block hashes in the non-finalized state, and the finalized tip - // - update the chain tip sender and cached non-finalized state - error!("handle non-finalized block writes here"); - } + // TODO: fix the test timing bugs that require the result to be sent + // after `update_latest_chain_channels()`, + // and send the result on rsp_tx here + + if let Err(ref error) = result { + // Update the caller with the error. + let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from)); + + // If the block is invalid, mark any descendant blocks as rejected. + parent_error_map.insert(child_hash, error.clone()); + + // Make sure the error map doesn't get too big. + if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT { + // We only add one hash at a time, so we only need to remove one extra here. + parent_error_map.shift_remove_index(0); + } + + // Skip the things we only need to do for successfully committed blocks + continue; + } + + // Committing blocks to the finalized state keeps the same chain, + // so we can update the chain seen by the rest of the application now. + // + // TODO: if this causes state request errors due to chain conflicts, + // fix the `service::read` bugs, + // or do the channel update after the finalized state commit + let tip_block_height = update_latest_chain_channels( + &non_finalized_state, + &mut chain_tip_sender, + &non_finalized_state_sender, + ); + + // Update the caller with the result. + let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from)); + + while non_finalized_state.best_chain_len() > MAX_BLOCK_REORG_HEIGHT { + tracing::trace!("finalizing block past the reorg limit"); + let finalized_with_trees = non_finalized_state.finalize(); + finalized_state + .commit_finalized_direct(finalized_with_trees, "best non-finalized chain root") + .expect( + "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state", + ); + } - // We're finished receiving non-finalized blocks from the state. - // - // TODO: - // - make the task an object, and do this in the drop impl? - // - does the drop order matter here? - non_finalized_block_write_receiver.close(); - std::mem::drop(non_finalized_block_write_receiver); + // Update the metrics if semantic and contextual validation passes + // + // TODO: split this out into a function? + metrics::counter!("state.full_verifier.committed.block.count", 1); + metrics::counter!("zcash.chain.verified.block.total", 1); + + metrics::gauge!( + "state.full_verifier.committed.block.height", + tip_block_height.0 as f64, + ); + + // This height gauge is updated for both fully verified and checkpoint blocks. + // These updates can't conflict, because this block write task makes sure that blocks + // are committed in order. + metrics::gauge!( + "zcash.chain.verified.block.height", + tip_block_height.0 as f64, + ); + + tracing::trace!("finished processing queued block"); + } - // We're done writing to the finalized state, so we can force it to shut down. + // We're finished receiving non-finalized blocks from the state, and + // done writing to the finalized state, so we can force it to shut down. finalized_state.db.shutdown(true); std::mem::drop(finalized_state); } diff --git a/zebra-state/src/tests/setup.rs b/zebra-state/src/tests/setup.rs index 11e2f2a26c3..899866379e3 100644 --- a/zebra-state/src/tests/setup.rs +++ b/zebra-state/src/tests/setup.rs @@ -15,7 +15,9 @@ use zebra_chain::{ }; use crate::{ - service::{check, StateService}, + service::{ + check, finalized_state::FinalizedState, non_finalized_state::NonFinalizedState, read, + }, Config, FinalizedBlock, }; @@ -81,24 +83,34 @@ pub(crate) fn partial_nu5_chain_strategy( /// Return a new `StateService` containing the mainnet genesis block. /// Also returns the finalized genesis block itself. -pub(crate) fn new_state_with_mainnet_genesis() -> (StateService, FinalizedBlock) { +pub(crate) fn new_state_with_mainnet_genesis() -> (FinalizedState, NonFinalizedState, FinalizedBlock) +{ let genesis = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES .zcash_deserialize_into::>() .expect("block should deserialize"); - let (mut state, _, _, _) = StateService::new(Config::ephemeral(), Mainnet); + let config = Config::ephemeral(); + let network = Mainnet; + + let mut finalized_state = FinalizedState::new(&config, network); + let non_finalized_state = NonFinalizedState::new(network); - assert_eq!(None, state.best_tip()); + assert_eq!( + None, + read::best_tip(&non_finalized_state, &finalized_state.db) + ); let genesis = FinalizedBlock::from(genesis); - state - .disk + finalized_state .commit_finalized_direct(genesis.clone().into(), "test") .expect("unexpected invalid genesis block test vector"); - assert_eq!(Some((Height(0), genesis.hash)), state.best_tip()); + assert_eq!( + Some((Height(0), genesis.hash)), + read::best_tip(&non_finalized_state, &finalized_state.db) + ); - (state, genesis) + (finalized_state, non_finalized_state, genesis) } /// Return a `Transaction::V4` with the coinbase data from `coinbase`. diff --git a/zebra-state/tests/basic.rs b/zebra-state/tests/basic.rs index d7f0f150b97..9aebfeb775e 100644 --- a/zebra-state/tests/basic.rs +++ b/zebra-state/tests/basic.rs @@ -5,7 +5,11 @@ use std::sync::Arc; use color_eyre::eyre::Report; use once_cell::sync::Lazy; -use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserialize}; +use zebra_chain::{ + block::{Block, Height}, + parameters::Network, + serialization::ZcashDeserialize, +}; use zebra_test::transcript::{ExpectedTranscriptError, Transcript}; use zebra_state::*; @@ -73,7 +77,8 @@ async fn check_transcripts(network: Network) -> Result<(), Report> { Network::Mainnet => mainnet_transcript, Network::Testnet => testnet_transcript, } { - let (service, _, _, _) = zebra_state::init(Config::ephemeral(), network); + // We're not verifying UTXOs here. + let (service, _, _, _) = zebra_state::init(Config::ephemeral(), network, Height::MAX, 0); let transcript = Transcript::from(transcript_data.iter().cloned()); /// SPANDOC: check the on disk service against the transcript transcript.check(service).await?; diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs index ee87c68c46a..de51dce0d88 100644 --- a/zebrad/src/commands/copy_state.rs +++ b/zebrad/src/commands/copy_state.rs @@ -111,13 +111,16 @@ impl CopyStateCmd { ); let source_start_time = Instant::now(); + + // We're not verifying UTXOs here, so we don't need the maximum checkpoint height. + // // TODO: use ReadStateService for the source? let ( mut source_state, _source_read_only_state_service, _source_latest_chain_tip, _source_chain_tip_change, - ) = old_zs::spawn_init(source_config.clone(), network).await?; + ) = old_zs::spawn_init(source_config.clone(), network, Height::MAX, 0).await?; let elapsed = source_start_time.elapsed(); info!(?elapsed, "finished initializing source state service"); @@ -128,6 +131,8 @@ impl CopyStateCmd { ); let target_start_time = Instant::now(); + // We're not verifying UTXOs here, so we don't need the maximum checkpoint height. + // // TODO: call Options::PrepareForBulkLoad() // See "What's the fastest way to load data into RocksDB?" in // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ @@ -136,7 +141,7 @@ impl CopyStateCmd { _target_read_only_state_service, _target_latest_chain_tip, _target_chain_tip_change, - ) = new_zs::spawn_init(target_config.clone(), network).await?; + ) = new_zs::spawn_init(target_config.clone(), network, Height::MAX, 0).await?; let elapsed = target_start_time.elapsed(); info!(?elapsed, "finished initializing target state service"); diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 455397598c8..3169ede3fbf 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -81,7 +81,7 @@ use crate::{ components::{ inbound::{self, InboundSetupData}, mempool::{self, Mempool}, - sync::{self, show_block_chain_progress}, + sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER}, tokio::{RuntimeRun, TokioComponent}, ChainSync, Inbound, }, @@ -103,10 +103,22 @@ impl StartCmd { info!(?config); info!("initializing node state"); - info!("opening database, this may take a couple minutes"); + let (_, max_checkpoint_height) = zebra_consensus::chain::init_checkpoint_list( + config.consensus.clone(), + config.network.network, + ); + + info!("opening database, this may take a few minutes"); let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) = - zebra_state::spawn_init(config.state.clone(), config.network.network).await?; + zebra_state::spawn_init( + config.state.clone(), + config.network.network, + max_checkpoint_height, + config.sync.checkpoint_verify_concurrency_limit + * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1), + ) + .await?; let state = ServiceBuilder::new() .buffer(Self::state_buffer_bound()) diff --git a/zebrad/src/commands/tip_height.rs b/zebrad/src/commands/tip_height.rs index 733f82e848c..bc045175d4c 100644 --- a/zebrad/src/commands/tip_height.rs +++ b/zebrad/src/commands/tip_height.rs @@ -8,7 +8,11 @@ use std::path::PathBuf; use abscissa_core::{Command, Options, Runnable}; use color_eyre::eyre::{eyre, Result}; -use zebra_chain::{block, chain_tip::ChainTip, parameters::Network}; +use zebra_chain::{ + block::{self, Height}, + chain_tip::ChainTip, + parameters::Network, +}; use zebra_state::LatestChainTip; use crate::prelude::app_config; @@ -56,8 +60,9 @@ impl TipHeightCmd { config.cache_dir = cache_dir; } + // UTXO verification isn't used here: we're not updating the state. let (_state_service, _read_state_service, latest_chain_tip, _chain_tip_change) = - zebra_state::init(config, self.network); + zebra_state::init(config, self.network, Height::MAX, 0); latest_chain_tip } diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index 8962ecad4e4..47e1a1bab4e 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -16,7 +16,7 @@ use tracing::Span; use zebra_chain::{ amount::Amount, - block::Block, + block::{Block, Height}, fmt::humantime_seconds, parameters::Network::{self, *}, serialization::ZcashDeserializeInto, @@ -764,8 +764,10 @@ async fn setup( ); let address_book = Arc::new(std::sync::Mutex::new(address_book)); let (sync_status, mut recent_syncs) = SyncStatus::new(); + + // UTXO verification doesn't matter for these tests. let (state, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = - zebra_state::init(state_config.clone(), network); + zebra_state::init(state_config.clone(), network, Height::MAX, 0); let mut state_service = ServiceBuilder::new().buffer(1).service(state); diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index 3e3cb6bed21..977cef2e1d4 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -13,7 +13,7 @@ use tower::{ }; use zebra_chain::{ - block::{self, Block}, + block::{self, Block, Height}, parameters::Network, serialization::ZcashDeserializeInto, transaction::{AuthDigest, Hash as TxHash, Transaction, UnminedTx, UnminedTxId, WtxId}, @@ -642,9 +642,10 @@ async fn setup( .service(inbound_service); // State + // UTXO verification doesn't matter for these tests. let state_config = StateConfig::ephemeral(); let (state_service, _read_only_state_service, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config, network); + zebra_state::init(state_config, network, Height::MAX, 0); let state_service = ServiceBuilder::new().buffer(10).service(state_service); // Network diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 29b16868ead..6eefa02611d 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -796,9 +796,10 @@ async fn setup( ) { let peer_set = MockService::build().for_unit_tests(); + // UTXO verification doesn't matter here. let state_config = StateConfig::ephemeral(); let (state, _read_only_state_service, latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config, network); + zebra_state::init(state_config, network, Height::MAX, 0); let state_service = ServiceBuilder::new().buffer(1).service(state); let tx_verifier = MockService::build().for_unit_tests(); diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index f3c0a4b91a3..4bb8f9ff97d 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -39,6 +39,7 @@ mod tests; use downloads::{AlwaysHedge, Downloads}; +pub use downloads::VERIFICATION_PIPELINE_SCALING_MULTIPLIER; pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError}; pub use progress::show_block_chain_progress; pub use recent_sync_lengths::RecentSyncLengths; @@ -81,10 +82,8 @@ pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPO /// The default for the user-specified lookahead limit. /// /// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details. -/// -/// TODO: increase to `MAX_CHECKPOINT_HEIGHT_GAP * 5`, after we implement orchard batching pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = - zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3; + zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 2; /// A lower bound on the user-specified concurrency limit. /// @@ -123,7 +122,9 @@ pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); /// /// If this timeout is set too low, the syncer will sometimes get stuck in a /// failure loop. -pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); +/// +/// We set the timeout so that it requires under 1 Mbps bandwidth for a full 2 MB block. +pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20); /// Controls how long we wait for a block verify request to complete. /// @@ -152,8 +153,20 @@ pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); /// If this timeout is set too low, the syncer will sometimes get stuck in a /// failure loop. /// -/// TODO: reduce to `6 * 60`, after we implement orchard batching? -pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(10 * 60); +/// We've observed spurious 15 minute timeouts when a lot of blocks are being committed to +/// the state, so we allow double that time here. +pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(30 * 60); + +/// A shorter timeout used for the first few blocks after the final checkpoint. +/// +/// This is a workaround for bug #5125, where the first fully validated blocks +/// after the final checkpoint fail with a timeout, due to a UTXO race condition. +const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(5 * 60); + +/// The number of blocks after the final checkpoint that get the shorter timeout. +/// +/// We've only seen this error on the first few blocks after the final checkpoint. +const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT: i32 = 100; /// Controls how long we wait to restart syncing after finishing a sync run. /// @@ -386,6 +399,7 @@ where checkpoint_verify_concurrency_limit, full_verify_concurrency_limit, ), + max_checkpoint_height, )), state, latest_chain_tip, diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 4c65ae92aa9..f50b391225c 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -2,20 +2,20 @@ use std::{ collections::HashMap, - convert::TryFrom, + convert::{self, TryFrom}, pin::Pin, sync::Arc, task::{Context, Poll}, }; use futures::{ - future::TryFutureExt, + future::{FutureExt, TryFutureExt}, ready, stream::{FuturesUnordered, Stream}, }; use pin_project::pin_project; use thiserror::Error; -use tokio::{sync::oneshot, task::JoinHandle}; +use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; use tower::{hedge, Service, ServiceExt}; use tracing_futures::Instrument; @@ -26,6 +26,10 @@ use zebra_chain::{ use zebra_network as zn; use zebra_state as zs; +use crate::components::sync::{ + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT, +}; + type BoxError = Box; /// A multiplier used to calculate the extra number of blocks we allow in the @@ -44,7 +48,7 @@ type BoxError = Box; /// the rest of the capacity is reserved for the other queues. /// There is no reserved capacity for the syncer queue: /// if the other queues stay full, the syncer will eventually time out and reset. -const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2; +pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 3; #[derive(Copy, Clone, Debug)] pub(super) struct AlwaysHedge; @@ -166,6 +170,9 @@ where /// The configured lookahead limit, after applying the minimum limit. lookahead_limit: usize, + /// The largest block height for the checkpoint verifier, based on the current config. + max_checkpoint_height: Height, + // Internal downloads state /// A list of pending block download and verify tasks. #[pin] @@ -238,18 +245,28 @@ where ZSTip: ChainTip + Clone + Send + 'static, { /// Initialize a new download stream with the provided `network` and - /// `verifier` services. Uses the `latest_chain_tip` and `lookahead_limit` - /// to drop blocks that are too far ahead of the current state tip. + /// `verifier` services. + /// + /// Uses the `latest_chain_tip` and `lookahead_limit` to drop blocks + /// that are too far ahead of the current state tip. + /// Uses `max_checkpoint_height` to work around a known block timeout (#5125). /// /// The [`Downloads`] stream is agnostic to the network policy, so retry and /// timeout limits should be applied to the `network` service passed into /// this constructor. - pub fn new(network: ZN, verifier: ZV, latest_chain_tip: ZSTip, lookahead_limit: usize) -> Self { + pub fn new( + network: ZN, + verifier: ZV, + latest_chain_tip: ZSTip, + lookahead_limit: usize, + max_checkpoint_height: Height, + ) -> Self { Self { network, verifier, latest_chain_tip, lookahead_limit, + max_checkpoint_height, pending: FuturesUnordered::new(), cancel_handles: HashMap::new(), } @@ -290,6 +307,7 @@ where let mut verifier = self.verifier.clone(); let latest_chain_tip = self.latest_chain_tip.clone(); let lookahead_limit = self.lookahead_limit; + let max_checkpoint_height = self.max_checkpoint_height; let task = tokio::spawn( async move { @@ -418,9 +436,17 @@ where }; // Verify the block. - let rsp = verifier + let mut rsp = verifier .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })? - .call(block); + .call(block).boxed(); + + // Add a shorter timeout to workaround a known bug (#5125) + let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range"); + if block_height >= max_checkpoint_height && block_height <= short_timeout_max { + rsp = timeout(FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, rsp) + .map_err(|timeout| format!("initial fully verified block timed out: retrying: {:?}", timeout).into()) + .map(|nested_result| nested_result.and_then(convert::identity)).boxed(); + } let verification = tokio::select! { biased; diff --git a/zebrad/src/components/tracing/endpoint.rs b/zebrad/src/components/tracing/endpoint.rs index 831cf26307c..c661e9ca886 100644 --- a/zebrad/src/components/tracing/endpoint.rs +++ b/zebrad/src/components/tracing/endpoint.rs @@ -37,7 +37,7 @@ async fn read_filter(req: Request) -> Result { impl TracingEndpoint { /// Create the component. pub fn new(config: &ZebradConfig) -> Result { - if !cfg!(feature = "filter-reload") { + if config.tracing.endpoint_addr.is_some() && !cfg!(feature = "filter-reload") { warn!(addr = ?config.tracing.endpoint_addr, "unable to activate configured tracing filter endpoint: \ enable the 'filter-reload' feature when compiling zebrad", diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 51ec860c13f..6a15c56f0c5 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -126,7 +126,7 @@ use color_eyre::{ }; use zebra_chain::{ - block, + block::{self, Height}, parameters::Network::{self, *}, }; use zebra_network::constants::PORT_IN_USE_ERROR; @@ -329,7 +329,9 @@ async fn db_init_outside_future_executor() -> Result<()> { let start = Instant::now(); - let db_init_handle = zebra_state::spawn_init(config.state.clone(), config.network.network); + // This test doesn't need UTXOs to be verified efficiently, because it uses an empty state. + let db_init_handle = + zebra_state::spawn_init(config.state.clone(), config.network.network, Height::MAX, 0); // it's faster to panic if it takes longer than expected, since the executor // will wait indefinitely for blocking operation to finish once started @@ -661,13 +663,16 @@ fn last_config_is_stored() -> Result<()> { fs::read_to_string(generated_config_path).expect("Should have been able to read the file"); // We need to replace the cache dir path as stored configs has a dummy `cache_dir` string there. - let processed_generated_content = generated_content.replace( - zebra_state::Config::default() - .cache_dir - .to_str() - .expect("a valid cache dir"), - "cache_dir", - ); + let processed_generated_content = generated_content + .replace( + zebra_state::Config::default() + .cache_dir + .to_str() + .expect("a valid cache dir"), + "cache_dir", + ) + .trim() + .to_string(); // Loop all the stored configs for config_file in configs_dir() @@ -677,17 +682,23 @@ fn last_config_is_stored() -> Result<()> { { // Read stored config let stored_content = fs::read_to_string(config_file_full_path(config_file.path())) - .expect("Should have been able to read the file"); + .expect("Should have been able to read the file") + .trim() + .to_string(); // If any stored config is equal to the generated then we are good. if stored_content.eq(&processed_generated_content) { return Ok(()); } } + Err(eyre!( "latest zebrad config is not being tested for compatibility.\n\ - Run `zebrad generate -o zebrad/tests/common/configs/.toml`\n\ - and commit the latest config to Zebra's git repository" + Run:\n\ + zebrad generate |\n\ + sed \"s/cache_dir = '.*'/cache_dir = 'cache_dir'/\" >\n\ + zebrad/tests/common/configs/.toml\n\ + and commit the latest config to Zebra's git repository" )) } diff --git a/zebrad/tests/common/cached_state.rs b/zebrad/tests/common/cached_state.rs index a2e97dbd32d..d758a7fffcb 100644 --- a/zebrad/tests/common/cached_state.rs +++ b/zebrad/tests/common/cached_state.rs @@ -12,7 +12,11 @@ use tempfile::TempDir; use tokio::fs; use tower::{util::BoxService, Service}; -use zebra_chain::{block, chain_tip::ChainTip, parameters::Network}; +use zebra_chain::{ + block::{self, Height}, + chain_tip::ChainTip, + parameters::Network, +}; use zebra_state::{ChainTipChange, LatestChainTip}; use crate::common::config::testdir; @@ -44,7 +48,8 @@ pub async fn start_state_service_with_cache_dir( ..zebra_state::Config::default() }; - Ok(zebra_state::init(config, network)) + // These tests don't need UTXOs to be verified efficiently, because they use cached states. + Ok(zebra_state::init(config, network, Height::MAX, 0)) } /// Loads the chain tip height from the state stored in a specified directory. diff --git a/zebrad/tests/common/configs/v1.0.0-rc.0.toml b/zebrad/tests/common/configs/v1.0.0-rc.0.toml new file mode 100644 index 00000000000..72f8c6bc693 --- /dev/null +++ b/zebrad/tests/common/configs/v1.0.0-rc.0.toml @@ -0,0 +1,70 @@ +# Default configuration for zebrad. +# +# This file can be used as a skeleton for custom configs. +# +# Unspecified fields use default values. Optional fields are Some(field) if the +# field is present and None if it is absent. +# +# This file is generated as an example using zebrad's current defaults. +# You should set only the config options you want to keep, and delete the rest. +# Only a subset of fields are present in the skeleton, since optional values +# whose default is None are omitted. +# +# The config format (including a complete list of sections and fields) is +# documented here: +# https://doc.zebra.zfnd.org/zebrad/config/struct.ZebradConfig.html +# +# zebrad attempts to load configs in the following order: +# +# 1. The -c flag on the command line, e.g., `zebrad -c myconfig.toml start`; +# 2. The file `zebrad.toml` in the users's preference directory (platform-dependent); +# 3. The default config. + +[consensus] +checkpoint_sync = true +debug_skip_parameter_preload = false + +[mempool] +eviction_memory_time = '1h' +tx_cost_limit = 80000000 + +[metrics] + +[network] +crawl_new_peer_interval = '1m 1s' +initial_mainnet_peers = [ + 'dnsseed.z.cash:8233', + 'dnsseed.str4d.xyz:8233', + 'mainnet.seeder.zfnd.org:8233', + 'mainnet.is.yolo.money:8233', +] +initial_testnet_peers = [ + 'dnsseed.testnet.z.cash:18233', + 'testnet.seeder.zfnd.org:18233', + 'testnet.is.yolo.money:18233', +] +listen_addr = '0.0.0.0:8233' +network = 'Mainnet' +peerset_initial_target_size = 25 + +[rpc] +debug_force_finished_sync = false +parallel_cpu_threads = 1 + +[state] +cache_dir = 'cache_dir' +delete_old_database = true +ephemeral = false + +[sync] +checkpoint_verify_concurrency_limit = 800 +download_concurrency_limit = 50 +full_verify_concurrency_limit = 20 +parallel_cpu_threads = 0 + +[tracing] +buffer_limit = 128000 +force_use_color = false +use_color = true +use_journald = false + diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 18165a20139..8b6d4356fb4 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -186,7 +186,7 @@ pub async fn run() -> Result<()> { assert_eq!(response, expected_response); } - // The timing of verification logs are unrealiable, so we've disabled this check for now. + // The timing of verification logs are unreliable, so we've disabled this check for now. // // TODO: when lightwalletd starts returning transactions again: // re-enable this check, find a better way to check, or delete this commented-out check