Skip to content

Commit

Permalink
3. change(state): Move the finalized queue to the StateService (#5152)
Browse files Browse the repository at this point in the history
* Move the finalized block queue into the StateService

* Move the queued_blocks module to the state service

* Move QueuedFinalized into queued_blocks

* Move the queued_blocks tests into their own module

* Make the FinalizedState cloneable
  • Loading branch information
teor2345 authored Sep 16, 2022
1 parent 20d80ad commit bfdb29b
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 262 deletions.
142 changes: 109 additions & 33 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! chain tip changes.
use std::{
collections::HashMap,
convert,
future::Future,
pin::Pin,
Expand All @@ -41,8 +42,9 @@ use crate::{
service::{
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::{NonFinalizedState, QueuedBlocks},
non_finalized_state::NonFinalizedState,
pending_utxos::PendingUtxos,
queued_blocks::QueuedBlocks,
watch_receiver::WatchReceiver,
},
BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, ReadRequest,
Expand All @@ -58,6 +60,7 @@ pub(crate) mod check;
mod finalized_state;
mod non_finalized_state;
mod pending_utxos;
mod queued_blocks;
pub(crate) mod read;

#[cfg(any(test, feature = "proptest-impl"))]
Expand All @@ -68,10 +71,7 @@ mod tests;

pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation};

pub type QueuedFinalized = (
FinalizedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
);
use self::queued_blocks::QueuedFinalized;

/// A read-write service for Zebra's cached blockchain state.
///
Expand All @@ -98,19 +98,30 @@ pub(crate) struct StateService {
/// The configured Zcash network.
network: Network,

// Exclusively Writeable State
// Queued Blocks
//
/// The finalized chain state, including its on-disk database.
pub(crate) disk: FinalizedState,
/// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
/// These blocks are awaiting their parent blocks before they can do contextual verification.
queued_non_finalized_blocks: QueuedBlocks,

/// Queued blocks for the [`FinalizedState`] that arrived out of order.
/// These blocks are awaiting their parent blocks before they can do contextual verification.
///
/// Indexed by their parent block hash.
queued_finalized_blocks: HashMap<block::Hash, QueuedFinalized>,

// Exclusively Writeable State
//
/// The non-finalized chain state, including its in-memory chain forks.
//
// TODO: get rid of this struct member, and just let the block write task own the NonFinalizedState.
mem: NonFinalizedState,

// Queued Blocks
/// The finalized chain state, including its on-disk database.
//
/// Blocks for the [`NonFinalizedState`], which are awaiting their parent blocks
/// before they can do contextual verification.
queued_blocks: QueuedBlocks,
// TODO: get rid of this struct member, and just let the ReadStateService
// and block write task share ownership of the database.
pub(crate) disk: FinalizedState,

// Pending UTXO Request Tracking
//
Expand All @@ -133,6 +144,14 @@ pub(crate) struct StateService {
///
/// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
read_service: ReadStateService,

// Metrics
//
/// A metric tracking the maximum height that's currently in `queued_finalized_blocks`
///
/// Set to `f64::NAN` if `queued_finalized_blocks` is empty, because grafana shows NaNs
/// as a break in the graph.
max_queued_height: f64,
}

/// A read-only service for accessing Zebra's cached blockchain state.
Expand All @@ -155,6 +174,12 @@ pub struct ReadStateService {

// Shared Concurrently Readable State
//
/// A watch channel for a recent [`NonFinalizedState`].
///
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,

/// The shared inner on-disk database for the finalized state.
///
/// RocksDB allows reads and writes via a shared reference,
Expand All @@ -163,12 +188,6 @@ pub struct ReadStateService {
/// This chain is updated concurrently with requests,
/// so it might include some block data that is also in `best_mem`.
db: ZebraDb,

/// A watch channel for a recent [`NonFinalizedState`].
///
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
}

impl StateService {
Expand All @@ -182,6 +201,7 @@ impl StateService {
network: Network,
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let timer = CodeTimer::start();

let disk = FinalizedState::new(&config, network);
timer.finish(module_path!(), line!(), "opening finalized state database");

Expand All @@ -201,19 +221,21 @@ impl StateService {

let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk);

let queued_blocks = QueuedBlocks::default();
let queued_non_finalized_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default();

let state = Self {
network,
disk,
queued_non_finalized_blocks,
queued_finalized_blocks: HashMap::new(),
mem,
queued_blocks,
disk,
pending_utxos,
last_prune: Instant::now(),
chain_tip_sender,
non_finalized_state_sender,
read_service: read_service.clone(),
max_queued_height: f64::NAN,
};
timer.finish(module_path!(), line!(), "initializing state service");

Expand Down Expand Up @@ -262,15 +284,64 @@ impl StateService {
// - run the set_finalized_tip() in this function in the state block commit task
// - move all that code to the inner service
let tip_block = self
.disk
.queue_and_commit_finalized((finalized, rsp_tx))
.drain_queue_and_commit_finalized((finalized, rsp_tx))
.map(ChainTipBlock::from);

self.chain_tip_sender.set_finalized_tip(tip_block);

rsp_rx
}

/// Queue a finalized block to be committed to the state.
///
/// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state.
///
/// Returns the highest finalized tip block committed from the queue,
/// or `None` if no blocks were committed in this call.
/// (Use `tip_block` to get the finalized tip, regardless of when it was committed.)
pub fn drain_queue_and_commit_finalized(
&mut self,
queued: QueuedFinalized,
) -> Option<FinalizedBlock> {
let mut highest_queue_commit = None;

let prev_hash = queued.0.block.header.previous_block_hash;
let height = queued.0.height;
self.queued_finalized_blocks.insert(prev_hash, queued);

while let Some(queued_block) = self
.queued_finalized_blocks
.remove(&self.disk.db().finalized_tip_hash())
{
if let Ok(finalized) = self.disk.commit_finalized(queued_block) {
highest_queue_commit = Some(finalized);
} else {
// the last block in the queue failed, so we can't commit the next block
break;
}
}

if self.queued_finalized_blocks.is_empty() {
self.max_queued_height = f64::NAN;
} else if self.max_queued_height.is_nan() || self.max_queued_height < height.0 as f64 {
// if there are still blocks in the queue, then either:
// - the new block was lower than the old maximum, and there was a gap before it,
// so the maximum is still the same (and we skip this code), or
// - the new block is higher than the old maximum, and there is at least one gap
// between the finalized tip and the new maximum
self.max_queued_height = height.0 as f64;
}

metrics::gauge!("state.checkpoint.queued.max.height", self.max_queued_height);
metrics::gauge!(
"state.checkpoint.queued.block.count",
self.queued_finalized_blocks.len() as f64,
);

highest_queue_commit
}

/// Queue a non finalized block for verification and check if any queued
/// blocks are ready to be verified and committed to the state.
///
Expand All @@ -297,15 +368,17 @@ impl StateService {
// Request::CommitBlock contract: a request to commit a block which has
// been queued but not yet committed to the state fails the older
// request and replaces it with the newer request.
let rsp_rx = if let Some((_, old_rsp_tx)) = self.queued_blocks.get_mut(&prepared.hash) {
let rsp_rx = if let Some((_, old_rsp_tx)) =
self.queued_non_finalized_blocks.get_mut(&prepared.hash)
{
tracing::debug!("replacing older queued request with new request");
let (mut rsp_tx, rsp_rx) = oneshot::channel();
std::mem::swap(old_rsp_tx, &mut rsp_tx);
let _ = rsp_tx.send(Err("replaced by newer request".into()));
rsp_rx
} else {
let (rsp_tx, rsp_rx) = oneshot::channel();
self.queued_blocks.queue((prepared, rsp_tx));
self.queued_non_finalized_blocks.queue((prepared, rsp_tx));
rsp_rx
};

Expand Down Expand Up @@ -337,7 +410,8 @@ impl StateService {
let finalized_tip_height = self.disk.db().finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
);
self.queued_blocks.prune_by_height(finalized_tip_height);
self.queued_non_finalized_blocks
.prune_by_height(finalized_tip_height);

let tip_block_height = self.update_latest_chain_channels();

Expand Down Expand Up @@ -415,7 +489,9 @@ impl StateService {
vec![(new_parent, Ok(()))];

while let Some((parent_hash, parent_result)) = new_parents.pop() {
let queued_children = self.queued_blocks.dequeue_children(parent_hash);
let queued_children = self
.queued_non_finalized_blocks
.dequeue_children(parent_hash);

for (child, rsp_tx) in queued_children {
let child_hash = child.hash;
Expand Down Expand Up @@ -572,7 +648,7 @@ impl Service<Request> for StateService {
#[instrument(name = "state", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future {
match req {
// Uses queued_blocks and pending_utxos in the StateService
// Uses queued_non_finalized_blocks and pending_utxos in the StateService
// Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
Request::CommitBlock(prepared) => {
metrics::counter!(
Expand Down Expand Up @@ -624,8 +700,8 @@ impl Service<Request> for StateService {
.boxed()
}

// Uses queued_by_prev_hash in the FinalizedState and pending_utxos in the StateService.
// Accesses shared writeable state in the StateService, FinalizedState, and ZebraDb.
// Uses queued_finalized_blocks and pending_utxos in the StateService.
// Accesses shared writeable state in the StateService and ZebraDb.
Request::CommitFinalizedBlock(finalized) => {
metrics::counter!(
"state.requests",
Expand Down Expand Up @@ -679,7 +755,7 @@ impl Service<Request> for StateService {
.boxed()
}

// Uses pending_utxos and queued_blocks in the StateService.
// Uses pending_utxos and queued_non_finalized_blocks in the StateService.
// If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
Request::AwaitUtxo(outpoint) => {
metrics::counter!(
Expand All @@ -700,7 +776,7 @@ impl Service<Request> for StateService {

// Check the non-finalized block queue outside the returned future,
// so we can access mutable state fields.
if let Some(utxo) = self.queued_blocks.utxo(&outpoint) {
if let Some(utxo) = self.queued_non_finalized_blocks.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo);

// We're finished, the returned future gets the UTXO from the respond() channel.
Expand All @@ -709,7 +785,7 @@ impl Service<Request> for StateService {
return response_fut;
}

// We ignore any UTXOs in FinalizedState.queued_by_prev_hash,
// We ignore any UTXOs in FinalizedState.queued_finalized_blocks,
// because it is only used during checkpoint verification.
//
// This creates a rare race condition, but it doesn't seem to happen much in practice.
Expand Down
Loading

0 comments on commit bfdb29b

Please sign in to comment.