diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index f7092282547..18c0ee4ea50 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -15,6 +15,7 @@ //! chain tip changes. use std::{ + collections::HashMap, convert, future::Future, pin::Pin, @@ -41,8 +42,9 @@ use crate::{ service::{ chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip}, finalized_state::{FinalizedState, ZebraDb}, - non_finalized_state::{NonFinalizedState, QueuedBlocks}, + non_finalized_state::NonFinalizedState, pending_utxos::PendingUtxos, + queued_blocks::QueuedBlocks, watch_receiver::WatchReceiver, }, BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, ReadRequest, @@ -58,6 +60,7 @@ pub(crate) mod check; mod finalized_state; mod non_finalized_state; mod pending_utxos; +mod queued_blocks; pub(crate) mod read; #[cfg(any(test, feature = "proptest-impl"))] @@ -68,10 +71,7 @@ mod tests; pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation}; -pub type QueuedFinalized = ( - FinalizedBlock, - oneshot::Sender>, -); +use self::queued_blocks::QueuedFinalized; /// A read-write service for Zebra's cached blockchain state. /// @@ -98,19 +98,30 @@ pub(crate) struct StateService { /// The configured Zcash network. network: Network, - // Exclusively Writeable State + // Queued Blocks // - /// The finalized chain state, including its on-disk database. - pub(crate) disk: FinalizedState, + /// Queued blocks for the [`NonFinalizedState`] that arrived out of order. + /// These blocks are awaiting their parent blocks before they can do contextual verification. + queued_non_finalized_blocks: QueuedBlocks, + + /// Queued blocks for the [`FinalizedState`] that arrived out of order. + /// These blocks are awaiting their parent blocks before they can do contextual verification. + /// + /// Indexed by their parent block hash. + queued_finalized_blocks: HashMap, + // Exclusively Writeable State + // /// The non-finalized chain state, including its in-memory chain forks. + // + // TODO: get rid of this struct member, and just let the block write task own the NonFinalizedState. mem: NonFinalizedState, - // Queued Blocks + /// The finalized chain state, including its on-disk database. // - /// Blocks for the [`NonFinalizedState`], which are awaiting their parent blocks - /// before they can do contextual verification. - queued_blocks: QueuedBlocks, + // TODO: get rid of this struct member, and just let the ReadStateService + // and block write task share ownership of the database. + pub(crate) disk: FinalizedState, // Pending UTXO Request Tracking // @@ -133,6 +144,14 @@ pub(crate) struct StateService { /// /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`. read_service: ReadStateService, + + // Metrics + // + /// A metric tracking the maximum height that's currently in `queued_finalized_blocks` + /// + /// Set to `f64::NAN` if `queued_finalized_blocks` is empty, because grafana shows NaNs + /// as a break in the graph. + max_queued_height: f64, } /// A read-only service for accessing Zebra's cached blockchain state. @@ -155,6 +174,12 @@ pub struct ReadStateService { // Shared Concurrently Readable State // + /// A watch channel for a recent [`NonFinalizedState`]. + /// + /// This state is only updated between requests, + /// so it might include some block data that is also on `disk`. + non_finalized_state_receiver: WatchReceiver, + /// The shared inner on-disk database for the finalized state. /// /// RocksDB allows reads and writes via a shared reference, @@ -163,12 +188,6 @@ pub struct ReadStateService { /// This chain is updated concurrently with requests, /// so it might include some block data that is also in `best_mem`. db: ZebraDb, - - /// A watch channel for a recent [`NonFinalizedState`]. - /// - /// This state is only updated between requests, - /// so it might include some block data that is also on `disk`. - non_finalized_state_receiver: WatchReceiver, } impl StateService { @@ -182,6 +201,7 @@ impl StateService { network: Network, ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { let timer = CodeTimer::start(); + let disk = FinalizedState::new(&config, network); timer.finish(module_path!(), line!(), "opening finalized state database"); @@ -201,19 +221,21 @@ impl StateService { let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk); - let queued_blocks = QueuedBlocks::default(); + let queued_non_finalized_blocks = QueuedBlocks::default(); let pending_utxos = PendingUtxos::default(); let state = Self { network, - disk, + queued_non_finalized_blocks, + queued_finalized_blocks: HashMap::new(), mem, - queued_blocks, + disk, pending_utxos, last_prune: Instant::now(), chain_tip_sender, non_finalized_state_sender, read_service: read_service.clone(), + max_queued_height: f64::NAN, }; timer.finish(module_path!(), line!(), "initializing state service"); @@ -262,8 +284,7 @@ impl StateService { // - run the set_finalized_tip() in this function in the state block commit task // - move all that code to the inner service let tip_block = self - .disk - .queue_and_commit_finalized((finalized, rsp_tx)) + .drain_queue_and_commit_finalized((finalized, rsp_tx)) .map(ChainTipBlock::from); self.chain_tip_sender.set_finalized_tip(tip_block); @@ -271,6 +292,56 @@ impl StateService { rsp_rx } + /// Queue a finalized block to be committed to the state. + /// + /// After queueing a finalized block, this method checks whether the newly + /// queued block (and any of its descendants) can be committed to the state. + /// + /// Returns the highest finalized tip block committed from the queue, + /// or `None` if no blocks were committed in this call. + /// (Use `tip_block` to get the finalized tip, regardless of when it was committed.) + pub fn drain_queue_and_commit_finalized( + &mut self, + queued: QueuedFinalized, + ) -> Option { + let mut highest_queue_commit = None; + + let prev_hash = queued.0.block.header.previous_block_hash; + let height = queued.0.height; + self.queued_finalized_blocks.insert(prev_hash, queued); + + while let Some(queued_block) = self + .queued_finalized_blocks + .remove(&self.disk.db().finalized_tip_hash()) + { + if let Ok(finalized) = self.disk.commit_finalized(queued_block) { + highest_queue_commit = Some(finalized); + } else { + // the last block in the queue failed, so we can't commit the next block + break; + } + } + + if self.queued_finalized_blocks.is_empty() { + self.max_queued_height = f64::NAN; + } else if self.max_queued_height.is_nan() || self.max_queued_height < height.0 as f64 { + // if there are still blocks in the queue, then either: + // - the new block was lower than the old maximum, and there was a gap before it, + // so the maximum is still the same (and we skip this code), or + // - the new block is higher than the old maximum, and there is at least one gap + // between the finalized tip and the new maximum + self.max_queued_height = height.0 as f64; + } + + metrics::gauge!("state.checkpoint.queued.max.height", self.max_queued_height); + metrics::gauge!( + "state.checkpoint.queued.block.count", + self.queued_finalized_blocks.len() as f64, + ); + + highest_queue_commit + } + /// Queue a non finalized block for verification and check if any queued /// blocks are ready to be verified and committed to the state. /// @@ -297,7 +368,9 @@ impl StateService { // Request::CommitBlock contract: a request to commit a block which has // been queued but not yet committed to the state fails the older // request and replaces it with the newer request. - let rsp_rx = if let Some((_, old_rsp_tx)) = self.queued_blocks.get_mut(&prepared.hash) { + let rsp_rx = if let Some((_, old_rsp_tx)) = + self.queued_non_finalized_blocks.get_mut(&prepared.hash) + { tracing::debug!("replacing older queued request with new request"); let (mut rsp_tx, rsp_rx) = oneshot::channel(); std::mem::swap(old_rsp_tx, &mut rsp_tx); @@ -305,7 +378,7 @@ impl StateService { rsp_rx } else { let (rsp_tx, rsp_rx) = oneshot::channel(); - self.queued_blocks.queue((prepared, rsp_tx)); + self.queued_non_finalized_blocks.queue((prepared, rsp_tx)); rsp_rx }; @@ -337,7 +410,8 @@ impl StateService { let finalized_tip_height = self.disk.db().finalized_tip_height().expect( "Finalized state must have at least one block before committing non-finalized state", ); - self.queued_blocks.prune_by_height(finalized_tip_height); + self.queued_non_finalized_blocks + .prune_by_height(finalized_tip_height); let tip_block_height = self.update_latest_chain_channels(); @@ -415,7 +489,9 @@ impl StateService { vec![(new_parent, Ok(()))]; while let Some((parent_hash, parent_result)) = new_parents.pop() { - let queued_children = self.queued_blocks.dequeue_children(parent_hash); + let queued_children = self + .queued_non_finalized_blocks + .dequeue_children(parent_hash); for (child, rsp_tx) in queued_children { let child_hash = child.hash; @@ -572,7 +648,7 @@ impl Service for StateService { #[instrument(name = "state", skip(self, req))] fn call(&mut self, req: Request) -> Self::Future { match req { - // Uses queued_blocks and pending_utxos in the StateService + // Uses queued_non_finalized_blocks and pending_utxos in the StateService // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb. Request::CommitBlock(prepared) => { metrics::counter!( @@ -624,8 +700,8 @@ impl Service for StateService { .boxed() } - // Uses queued_by_prev_hash in the FinalizedState and pending_utxos in the StateService. - // Accesses shared writeable state in the StateService, FinalizedState, and ZebraDb. + // Uses queued_finalized_blocks and pending_utxos in the StateService. + // Accesses shared writeable state in the StateService and ZebraDb. Request::CommitFinalizedBlock(finalized) => { metrics::counter!( "state.requests", @@ -679,7 +755,7 @@ impl Service for StateService { .boxed() } - // Uses pending_utxos and queued_blocks in the StateService. + // Uses pending_utxos and queued_non_finalized_blocks in the StateService. // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService. Request::AwaitUtxo(outpoint) => { metrics::counter!( @@ -700,7 +776,7 @@ impl Service for StateService { // Check the non-finalized block queue outside the returned future, // so we can access mutable state fields. - if let Some(utxo) = self.queued_blocks.utxo(&outpoint) { + if let Some(utxo) = self.queued_non_finalized_blocks.utxo(&outpoint) { self.pending_utxos.respond(&outpoint, utxo); // We're finished, the returned future gets the UTXO from the respond() channel. @@ -709,7 +785,7 @@ impl Service for StateService { return response_fut; } - // We ignore any UTXOs in FinalizedState.queued_by_prev_hash, + // We ignore any UTXOs in FinalizedState.queued_finalized_blocks, // because it is only used during checkpoint verification. // // This creates a rare race condition, but it doesn't seem to happen much in practice. diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 611c7fcfa0b..5b6cae48911 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -16,7 +16,6 @@ //! be incremented each time the database format (column, serialization, etc) changes. use std::{ - collections::HashMap, io::{stderr, stdout, Write}, path::Path, sync::Arc, @@ -45,39 +44,47 @@ pub use disk_format::{OutputIndex, OutputLocation, TransactionLocation}; pub(super) use zebra_db::ZebraDb; /// The finalized part of the chain state, stored in the db. -#[derive(Debug)] +/// +/// `rocksdb` allows concurrent writes through a shared reference, +/// so finalized state instances are cloneable. When the final clone is dropped, +/// the database is closed. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct FinalizedState { - /// The underlying database. - db: ZebraDb, - - /// Queued blocks that arrived out of order, indexed by their parent block hash. - queued_by_prev_hash: HashMap, - - /// A metric tracking the maximum height that's currently in `queued_by_prev_hash` - /// - /// Set to `f64::NAN` if `queued_by_prev_hash` is empty, because grafana shows NaNs - /// as a break in the graph. - max_queued_height: f64, + // Configuration + // + // This configuration cannot be modified after the database is initialized, + // because some clones would have different values. + // + /// The configured network. + network: Network, /// The configured stop height. /// /// Commit blocks to the finalized state up to this height, then exit Zebra. debug_stop_at_height: Option, - /// The configured network. - network: Network, + // Owned State + // + // Everything contained in this state must be shared by all clones, or read-only. + // + /// The underlying database. + /// + /// `rocksdb` allows reads and writes via a shared reference, + /// so this database object can be freely cloned. + /// The last instance that is dropped will close the underlying database. + db: ZebraDb, } impl FinalizedState { + /// Returns an on-disk database instance for `config` and `network`. + /// If there is no existing database, creates a new database on disk. pub fn new(config: &Config, network: Network) -> Self { let db = ZebraDb::new(config, network); let new_state = Self { - queued_by_prev_hash: HashMap::new(), - max_queued_height: f64::NAN, - db, - debug_stop_at_height: config.debug_stop_at_height.map(block::Height), network, + debug_stop_at_height: config.debug_stop_at_height.map(block::Height), + db, }; // TODO: move debug_stop_at_height into a task in the start command (#3442) @@ -137,63 +144,14 @@ impl FinalizedState { &self.db } - /// Queue a finalized block to be committed to the state. - /// - /// After queueing a finalized block, this method checks whether the newly - /// queued block (and any of its descendants) can be committed to the state. - /// - /// Returns the highest finalized tip block committed from the queue, - /// or `None` if no blocks were committed in this call. - /// (Use `tip_block` to get the finalized tip, regardless of when it was committed.) - pub fn queue_and_commit_finalized( - &mut self, - queued: QueuedFinalized, - ) -> Option { - let mut highest_queue_commit = None; - - let prev_hash = queued.0.block.header.previous_block_hash; - let height = queued.0.height; - self.queued_by_prev_hash.insert(prev_hash, queued); - - while let Some(queued_block) = self - .queued_by_prev_hash - .remove(&self.db.finalized_tip_hash()) - { - if let Ok(finalized) = self.commit_finalized(queued_block) { - highest_queue_commit = Some(finalized); - } else { - // the last block in the queue failed, so we can't commit the next block - break; - } - } - - if self.queued_by_prev_hash.is_empty() { - self.max_queued_height = f64::NAN; - } else if self.max_queued_height.is_nan() || self.max_queued_height < height.0 as f64 { - // if there are still blocks in the queue, then either: - // - the new block was lower than the old maximum, and there was a gap before it, - // so the maximum is still the same (and we skip this code), or - // - the new block is higher than the old maximum, and there is at least one gap - // between the finalized tip and the new maximum - self.max_queued_height = height.0 as f64; - } - - metrics::gauge!("state.checkpoint.queued.max.height", self.max_queued_height); - metrics::gauge!( - "state.checkpoint.queued.block.count", - self.queued_by_prev_hash.len() as f64, - ); - - highest_queue_commit - } - /// Commit a finalized block to the state. /// /// It's the caller's responsibility to ensure that blocks are committed in - /// order. This function is called by [`Self::queue_and_commit_finalized`], - /// which ensures order. It is intentionally not exposed as part of the - /// public API of the [`FinalizedState`]. - fn commit_finalized(&mut self, queued_block: QueuedFinalized) -> Result { + /// order. + pub fn commit_finalized( + &mut self, + queued_block: QueuedFinalized, + ) -> Result { let (finalized, rsp_tx) = queued_block; let result = self.commit_finalized_direct(finalized.clone().into(), "CommitFinalized request"); diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index db7c628ed24..d183b4159be 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -34,6 +34,10 @@ pub type DB = rocksdb::DBWithThreadMode; /// Wrapper struct to ensure low-level database access goes through the correct API. /// +/// `rocksdb` allows concurrent writes through a shared reference, +/// so database instances are cloneable. When the final clone is dropped, +/// the database is closed. +/// /// # Correctness /// /// Reading transactions from the database using RocksDB iterators causes hangs. @@ -48,6 +52,20 @@ pub type DB = rocksdb::DBWithThreadMode; /// (Or it might be fixed by future RocksDB upgrades.) #[derive(Clone, Debug)] pub struct DiskDb { + // Configuration + // + // This configuration cannot be modified after the database is initialized, + // because some clones would have different values. + // + /// The configured temporary database setting. + /// + /// If true, the database files are deleted on drop. + ephemeral: bool, + + // Owned State + // + // Everything contained in this state must be shared by all clones, or read-only. + // /// The shared inner RocksDB database. /// /// RocksDB allows reads and writes via a shared reference. @@ -58,11 +76,6 @@ pub struct DiskDb { /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode, /// only [`Drop`] requires exclusive access. db: Arc, - - /// The configured temporary database setting. - /// - /// If true, the database files are deleted on drop. - ephemeral: bool, } /// Wrapper struct to ensure low-level database writes go through the correct API. @@ -434,8 +447,8 @@ impl DiskDb { info!("Opened Zebra state cache at {}", path.display()); let db = DiskDb { - db: Arc::new(db), ephemeral: config.ephemeral, + db: Arc::new(db), }; db.assert_default_cf_is_empty(); diff --git a/zebra-state/src/service/finalized_state/zebra_db.rs b/zebra-state/src/service/finalized_state/zebra_db.rs index 60bb11389c2..9789e50f577 100644 --- a/zebra-state/src/service/finalized_state/zebra_db.rs +++ b/zebra-state/src/service/finalized_state/zebra_db.rs @@ -28,10 +28,17 @@ pub mod transparent; pub mod arbitrary; /// Wrapper struct to ensure high-level typed database access goes through the correct API. -#[derive(Clone, Debug)] +/// +/// `rocksdb` allows concurrent writes through a shared reference, +/// so database instances are cloneable. When the final clone is dropped, +/// the database is closed. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct ZebraDb { + // Owned State + // + // Everything contained in this state must be shared by all clones, or read-only. + // /// The inner low-level database wrapper for the RocksDB database. - /// This wrapper can be cloned and shared. db: DiskDb, } diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index dfc16440434..0e34e23414a 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -23,13 +23,10 @@ use crate::{ }; mod chain; -mod queued_blocks; #[cfg(test)] mod tests; -pub use queued_blocks::QueuedBlocks; - pub(crate) use chain::Chain; /// The state of the chains in memory, including queued blocks. diff --git a/zebra-state/src/service/non_finalized_state/queued_blocks.rs b/zebra-state/src/service/queued_blocks.rs similarity index 54% rename from zebra-state/src/service/non_finalized_state/queued_blocks.rs rename to zebra-state/src/service/queued_blocks.rs index 292103a3bc3..ac0270c9276 100644 --- a/zebra-state/src/service/non_finalized_state/queued_blocks.rs +++ b/zebra-state/src/service/queued_blocks.rs @@ -10,10 +10,19 @@ use tracing::instrument; use zebra_chain::{block, transparent}; -use crate::{BoxError, PreparedBlock}; +use crate::{BoxError, FinalizedBlock, PreparedBlock}; + +#[cfg(test)] +mod tests; + +/// A queued finalized block, and its corresponding [`Result`] channel. +pub type QueuedFinalized = ( + FinalizedBlock, + oneshot::Sender>, +); /// A queued non-finalized block, and its corresponding [`Result`] channel. -pub type QueuedBlock = ( +pub type QueuedNonFinalized = ( PreparedBlock, oneshot::Sender>, ); @@ -22,7 +31,7 @@ pub type QueuedBlock = ( #[derive(Debug, Default)] pub struct QueuedBlocks { /// Blocks awaiting their parent blocks for contextual verification. - blocks: HashMap, + blocks: HashMap, /// Hashes from `queued_blocks`, indexed by parent hash. by_parent: HashMap>, /// Hashes from `queued_blocks`, indexed by block height. @@ -38,7 +47,7 @@ impl QueuedBlocks { /// /// - if a block with the same `block::Hash` has already been queued. #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))] - pub fn queue(&mut self, new: QueuedBlock) { + pub fn queue(&mut self, new: QueuedNonFinalized) { let new_hash = new.0.hash; let new_height = new.0.height; let parent_hash = new.0.block.header.previous_block_hash; @@ -71,7 +80,7 @@ impl QueuedBlocks { /// Dequeue and return all blocks that were waiting for the arrival of /// `parent`. #[instrument(skip(self), fields(%parent_hash))] - pub fn dequeue_children(&mut self, parent_hash: block::Hash) -> Vec { + pub fn dequeue_children(&mut self, parent_hash: block::Hash) -> Vec { let queued_children = self .by_parent .remove(&parent_hash) @@ -161,7 +170,7 @@ impl QueuedBlocks { } /// Return the queued block if it has already been registered - pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedBlock> { + pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedNonFinalized> { self.blocks.get_mut(hash) } @@ -182,142 +191,3 @@ impl QueuedBlocks { self.known_utxos.get(outpoint).cloned() } } - -// TODO: move these tests into their own `tests/vectors.rs` module -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use tokio::sync::oneshot; - use zebra_chain::{block::Block, serialization::ZcashDeserializeInto}; - use zebra_test::prelude::*; - - use crate::{arbitrary::Prepare, tests::FakeChainHelper}; - - use super::*; - - // Quick helper trait for making queued blocks with throw away channels - trait IntoQueued { - fn into_queued(self) -> QueuedBlock; - } - - impl IntoQueued for Arc { - fn into_queued(self) -> QueuedBlock { - let (rsp_tx, _) = oneshot::channel(); - (self.prepare(), rsp_tx) - } - } - - #[test] - fn dequeue_gives_right_children() -> Result<()> { - let _init_guard = zebra_test::init(); - - let block1: Arc = - zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?; - let child1: Arc = - zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?; - let child2 = block1.make_fake_child(); - - let parent = block1.header.previous_block_hash; - - let mut queue = QueuedBlocks::default(); - // Empty to start - assert_eq!(0, queue.blocks.len()); - assert_eq!(0, queue.by_parent.len()); - assert_eq!(0, queue.by_height.len()); - assert_eq!(0, queue.known_utxos.len()); - - // Inserting the first block gives us 1 in each table, and some UTXOs - queue.queue(block1.clone().into_queued()); - assert_eq!(1, queue.blocks.len()); - assert_eq!(1, queue.by_parent.len()); - assert_eq!(1, queue.by_height.len()); - assert_eq!(2, queue.known_utxos.len()); - - // The second gives us another in each table because its a child of the first, - // and a lot of UTXOs - queue.queue(child1.clone().into_queued()); - assert_eq!(2, queue.blocks.len()); - assert_eq!(2, queue.by_parent.len()); - assert_eq!(2, queue.by_height.len()); - assert_eq!(632, queue.known_utxos.len()); - - // The 3rd only increments blocks, because it is also a child of the - // first block, so for the second and third tables it gets added to the - // existing HashSet value - queue.queue(child2.clone().into_queued()); - assert_eq!(3, queue.blocks.len()); - assert_eq!(2, queue.by_parent.len()); - assert_eq!(2, queue.by_height.len()); - assert_eq!(634, queue.known_utxos.len()); - - // Dequeueing the first block removes 1 block from each list - let children = queue.dequeue_children(parent); - assert_eq!(1, children.len()); - assert_eq!(block1, children[0].0.block); - assert_eq!(2, queue.blocks.len()); - assert_eq!(1, queue.by_parent.len()); - assert_eq!(1, queue.by_height.len()); - assert_eq!(632, queue.known_utxos.len()); - - // Dequeueing the children of the first block removes both of the other - // blocks, and empties all lists - let parent = children[0].0.block.hash(); - let children = queue.dequeue_children(parent); - assert_eq!(2, children.len()); - assert!(children - .iter() - .any(|(block, _)| block.hash == child1.hash())); - assert!(children - .iter() - .any(|(block, _)| block.hash == child2.hash())); - assert_eq!(0, queue.blocks.len()); - assert_eq!(0, queue.by_parent.len()); - assert_eq!(0, queue.by_height.len()); - assert_eq!(0, queue.known_utxos.len()); - - Ok(()) - } - - #[test] - fn prune_removes_right_children() -> Result<()> { - let _init_guard = zebra_test::init(); - - let block1: Arc = - zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?; - let child1: Arc = - zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?; - let child2 = block1.make_fake_child(); - - let mut queue = QueuedBlocks::default(); - queue.queue(block1.clone().into_queued()); - queue.queue(child1.clone().into_queued()); - queue.queue(child2.clone().into_queued()); - assert_eq!(3, queue.blocks.len()); - assert_eq!(2, queue.by_parent.len()); - assert_eq!(2, queue.by_height.len()); - assert_eq!(634, queue.known_utxos.len()); - - // Pruning the first height removes only block1 - queue.prune_by_height(block1.coinbase_height().unwrap()); - assert_eq!(2, queue.blocks.len()); - assert_eq!(1, queue.by_parent.len()); - assert_eq!(1, queue.by_height.len()); - assert!(queue.get_mut(&block1.hash()).is_none()); - assert!(queue.get_mut(&child1.hash()).is_some()); - assert!(queue.get_mut(&child2.hash()).is_some()); - assert_eq!(632, queue.known_utxos.len()); - - // Pruning the children of the first block removes both of the other - // blocks, and empties all lists - queue.prune_by_height(child1.coinbase_height().unwrap()); - assert_eq!(0, queue.blocks.len()); - assert_eq!(0, queue.by_parent.len()); - assert_eq!(0, queue.by_height.len()); - assert!(queue.get_mut(&child1.hash()).is_none()); - assert!(queue.get_mut(&child2.hash()).is_none()); - assert_eq!(0, queue.known_utxos.len()); - - Ok(()) - } -} diff --git a/zebra-state/src/service/queued_blocks/tests.rs b/zebra-state/src/service/queued_blocks/tests.rs new file mode 100644 index 00000000000..823e8b6d7c5 --- /dev/null +++ b/zebra-state/src/service/queued_blocks/tests.rs @@ -0,0 +1,3 @@ +//! Tests for block queues. + +mod vectors; diff --git a/zebra-state/src/service/queued_blocks/tests/vectors.rs b/zebra-state/src/service/queued_blocks/tests/vectors.rs new file mode 100644 index 00000000000..bd8dcbeb8e2 --- /dev/null +++ b/zebra-state/src/service/queued_blocks/tests/vectors.rs @@ -0,0 +1,139 @@ +//! Fixed test vectors for block queues. + +use std::sync::Arc; + +use tokio::sync::oneshot; + +use zebra_chain::{block::Block, serialization::ZcashDeserializeInto}; +use zebra_test::prelude::*; + +use crate::{ + arbitrary::Prepare, + service::queued_blocks::{QueuedBlocks, QueuedNonFinalized}, + tests::FakeChainHelper, +}; + +// Quick helper trait for making queued blocks with throw away channels +trait IntoQueued { + fn into_queued(self) -> QueuedNonFinalized; +} + +impl IntoQueued for Arc { + fn into_queued(self) -> QueuedNonFinalized { + let (rsp_tx, _) = oneshot::channel(); + (self.prepare(), rsp_tx) + } +} + +#[test] +fn dequeue_gives_right_children() -> Result<()> { + let _init_guard = zebra_test::init(); + + let block1: Arc = + zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?; + let child1: Arc = + zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?; + let child2 = block1.make_fake_child(); + + let parent = block1.header.previous_block_hash; + + let mut queue = QueuedBlocks::default(); + // Empty to start + assert_eq!(0, queue.blocks.len()); + assert_eq!(0, queue.by_parent.len()); + assert_eq!(0, queue.by_height.len()); + assert_eq!(0, queue.known_utxos.len()); + + // Inserting the first block gives us 1 in each table, and some UTXOs + queue.queue(block1.clone().into_queued()); + assert_eq!(1, queue.blocks.len()); + assert_eq!(1, queue.by_parent.len()); + assert_eq!(1, queue.by_height.len()); + assert_eq!(2, queue.known_utxos.len()); + + // The second gives us another in each table because its a child of the first, + // and a lot of UTXOs + queue.queue(child1.clone().into_queued()); + assert_eq!(2, queue.blocks.len()); + assert_eq!(2, queue.by_parent.len()); + assert_eq!(2, queue.by_height.len()); + assert_eq!(632, queue.known_utxos.len()); + + // The 3rd only increments blocks, because it is also a child of the + // first block, so for the second and third tables it gets added to the + // existing HashSet value + queue.queue(child2.clone().into_queued()); + assert_eq!(3, queue.blocks.len()); + assert_eq!(2, queue.by_parent.len()); + assert_eq!(2, queue.by_height.len()); + assert_eq!(634, queue.known_utxos.len()); + + // Dequeueing the first block removes 1 block from each list + let children = queue.dequeue_children(parent); + assert_eq!(1, children.len()); + assert_eq!(block1, children[0].0.block); + assert_eq!(2, queue.blocks.len()); + assert_eq!(1, queue.by_parent.len()); + assert_eq!(1, queue.by_height.len()); + assert_eq!(632, queue.known_utxos.len()); + + // Dequeueing the children of the first block removes both of the other + // blocks, and empties all lists + let parent = children[0].0.block.hash(); + let children = queue.dequeue_children(parent); + assert_eq!(2, children.len()); + assert!(children + .iter() + .any(|(block, _)| block.hash == child1.hash())); + assert!(children + .iter() + .any(|(block, _)| block.hash == child2.hash())); + assert_eq!(0, queue.blocks.len()); + assert_eq!(0, queue.by_parent.len()); + assert_eq!(0, queue.by_height.len()); + assert_eq!(0, queue.known_utxos.len()); + + Ok(()) +} + +#[test] +fn prune_removes_right_children() -> Result<()> { + let _init_guard = zebra_test::init(); + + let block1: Arc = + zebra_test::vectors::BLOCK_MAINNET_419200_BYTES.zcash_deserialize_into()?; + let child1: Arc = + zebra_test::vectors::BLOCK_MAINNET_419201_BYTES.zcash_deserialize_into()?; + let child2 = block1.make_fake_child(); + + let mut queue = QueuedBlocks::default(); + queue.queue(block1.clone().into_queued()); + queue.queue(child1.clone().into_queued()); + queue.queue(child2.clone().into_queued()); + assert_eq!(3, queue.blocks.len()); + assert_eq!(2, queue.by_parent.len()); + assert_eq!(2, queue.by_height.len()); + assert_eq!(634, queue.known_utxos.len()); + + // Pruning the first height removes only block1 + queue.prune_by_height(block1.coinbase_height().unwrap()); + assert_eq!(2, queue.blocks.len()); + assert_eq!(1, queue.by_parent.len()); + assert_eq!(1, queue.by_height.len()); + assert!(queue.get_mut(&block1.hash()).is_none()); + assert!(queue.get_mut(&child1.hash()).is_some()); + assert!(queue.get_mut(&child2.hash()).is_some()); + assert_eq!(632, queue.known_utxos.len()); + + // Pruning the children of the first block removes both of the other + // blocks, and empties all lists + queue.prune_by_height(child1.coinbase_height().unwrap()); + assert_eq!(0, queue.blocks.len()); + assert_eq!(0, queue.by_parent.len()); + assert_eq!(0, queue.by_height.len()); + assert!(queue.get_mut(&child1.hash()).is_none()); + assert!(queue.get_mut(&child2.hash()).is_none()); + assert_eq!(0, queue.known_utxos.len()); + + Ok(()) +}