diff --git a/crates/amaru/src/bin/amaru/cmd/import.rs b/crates/amaru/src/bin/amaru/cmd/import.rs index 8d5d964..1de77fe 100644 --- a/crates/amaru/src/bin/amaru/cmd/import.rs +++ b/crates/amaru/src/bin/amaru/cmd/import.rs @@ -14,9 +14,10 @@ use indicatif::{ProgressBar, ProgressStyle}; use miette::{Diagnostic, IntoDiagnostic}; use pallas_codec::minicbor as cbor; use std::{collections::HashMap, fs, iter, path::PathBuf}; -use tracing::info; +use tracing::{info, info_span}; const BATCH_SIZE: usize = 5000; +const EVENT_TARGET: &str = "amaru::import"; #[derive(Debug, Parser)] pub struct Args { @@ -72,6 +73,18 @@ impl From> for Option { pub async fn run(args: Args) -> miette::Result<()> { let point = super::parse_point(&args.date, Error::MalformedDate).into_diagnostic()?; + let import_span = info_span!( + target: EVENT_TARGET, + "import", + point.slot = point.slot_or_default(), + point.hash = tracing::field::Empty, + out_dir = &args.out.display().to_string(), + snapshot = &args.snapshot.display().to_string(), + ) + .entered(); + if let Point::Specific(_, ref header_hash) = point { + import_span.record("point.hash", hex::encode(header_hash)); + } fs::create_dir_all(&args.out).into_diagnostic()?; let mut db = ledger::store::rocksdb::RocksDB::empty(&args.out).into_diagnostic()?; @@ -81,202 +94,307 @@ pub async fn run(args: Args) -> miette::Result<()> { // Epoch State { - let _epoch_state_len = d.array().into_diagnostic()?; + let epoch_span = info_span!( + target: EVENT_TARGET, + "epoch_state", + length = d.array().into_diagnostic()?, + current_epoch = tracing::field::Empty, + ) + .entered(); // Epoch State / Account State - d.skip().into_diagnostic()?; + { + let account_span = info_span!( + target: EVENT_TARGET, + "account_state", + skipped = true, + ) + .entered(); + d.skip().into_diagnostic()?; + } // Epoch State / Ledger State - let _ledger_state_len = d.array().into_diagnostic()?; - - // Epoch State / Ledger State / Cert State { - let _cert_state_len = d.array().into_diagnostic()?; + let ledger_span = info_span!( + target: EVENT_TARGET, + "ledger_state", + length = d.array().into_diagnostic()?, + ) + .entered(); + + // Epoch State / Ledger State / Cert State + { + let cert_span = info_span!( + target: EVENT_TARGET, + "cert_state", + length = d.array().into_diagnostic()?, + ) + .entered(); - // Epoch State / Ledger State / Cert State / Voting State - d.skip().into_diagnostic()?; + // Epoch State / Ledger State / Cert State / Voting State + { + let vote_span = info_span!( + target: EVENT_TARGET, + "voting_state", + skipped = true, + ) + .entered(); + d.skip().into_diagnostic()?; + } - // Epoch State / Ledger State / Cert State / Pool State - { - let _pool_state_len = d.array().into_diagnostic()?; + // Epoch State / Ledger State / Cert State / Pool State + { + let pool_span = info_span!( + target: EVENT_TARGET, + "pool_state", + length = d.array().into_diagnostic()?, + ) + .entered(); - let pools: HashMap = d.decode().into_diagnostic()?; + let pools: HashMap = d.decode().into_diagnostic()?; - let updates: HashMap = d.decode().into_diagnostic()?; + let updates: HashMap = d.decode().into_diagnostic()?; - let retirements: HashMap = d.decode().into_diagnostic()?; + let retirements: HashMap = d.decode().into_diagnostic()?; - // Deposits - d.skip().into_diagnostic()?; + // Deposits + { + let deposit_span = info_span!( + target: EVENT_TARGET, + "deposits", + skipped = true, + ) + .entered(); + d.skip().into_diagnostic()?; + } - let mut state = ledger::state::diff_epoch_reg::DiffEpochReg::default(); - for (pool, params) in pools.into_iter() { - state.register(pool, params); - } + let mut state = ledger::state::diff_epoch_reg::DiffEpochReg::default(); + for (pool, params) in pools.into_iter() { + state.register(pool, params); + } - for (pool, params) in updates.into_iter() { - state.register(pool, params); - } + for (pool, params) in updates.into_iter() { + state.register(pool, params); + } - for (pool, epoch) in retirements.into_iter() { - state.unregister(pool, epoch); - } + for (pool, epoch) in retirements.into_iter() { + state.unregister(pool, epoch); + } - info!( - what = "stake_pools", - registered = state.registered.len(), - retiring = state.unregistered.len(), - ); - - let current_epoch = epoch_from_slot(point.slot_or_default()); - - db.save( - &Point::Origin, - store::Columns { - utxo: iter::empty(), - pools: state - .registered - .into_iter() - .flat_map(move |(_, registrations)| { - registrations - .into_iter() - .map(|r| (r, current_epoch)) - .collect::>() - }), - accounts: iter::empty(), - }, - store::Columns { - pools: state.unregistered.into_iter(), - utxo: iter::empty(), - accounts: iter::empty(), - }, - ) - .into_diagnostic()?; - } + info!( + what = "stake_pools", + registered = state.registered.len(), + retiring = state.unregistered.len(), + ); - // Epoch State / Ledger State / Cert State / Delegation state - { - let _delegation_state_len = d.array().into_diagnostic()?; + let current_epoch = epoch_from_slot(point.slot_or_default()); + epoch_span.record("current_epoch", current_epoch); - // Epoch State / Ledger State / Cert State / Delegation state / dsUnified + db.save( + &Point::Origin, + store::Columns { + utxo: iter::empty(), + pools: state.registered.into_iter().flat_map( + move |(_, registrations)| { + registrations + .into_iter() + .map(|r| (r, current_epoch)) + .collect::>() + }, + ), + accounts: iter::empty(), + }, + store::Columns { + pools: state.unregistered.into_iter(), + utxo: iter::empty(), + accounts: iter::empty(), + }, + ) + .into_diagnostic()?; + } + + // Epoch State / Ledger State / Cert State / Delegation state { - let _stake_credentials_len = d.array().into_diagnostic()?; + let delegation_span = info_span!( + target: EVENT_TARGET, + "delegation_state", + length = d.array().into_diagnostic()?, + ) + .entered(); - // credentials + // Epoch State / Ledger State / Cert State / Delegation state / dsUnified { - let mut credentials = - d.decode::, - Set<()>, - StrictMaybe, - StrictMaybe, - ), - >>() - .into_diagnostic()? - .into_iter() - .map( - |(credential, (rewards_and_deposit, _pointers, pool, _drep))| { - let (rewards, deposit) = - Option::<(Lovelace, Lovelace)>::from(rewards_and_deposit) - .unwrap_or((0, STAKE_CREDENTIAL_DEPOSIT as u64)); - + let unified_span = info_span!( + target: EVENT_TARGET, + "unified", + length = d.array().into_diagnostic()?, + ) + .entered(); + + // credentials + { + let credentials_span = info_span!( + target: EVENT_TARGET, + "credentials" + ) + .entered(); + let mut credentials = + d.decode::, + Set<()>, + StrictMaybe, + StrictMaybe, + ), + >>() + .into_diagnostic()? + .into_iter() + .map( + |( credential, - Option::::from(pool), - Some(deposit), - rewards, - ) - }, - ) - .collect::, - Option, - Lovelace, - )>>(); - - info!(what = "credentials", size = credentials.len()); + (rewards_and_deposit, _pointers, pool, _drep), + )| { + let (rewards, deposit) = + Option::<(Lovelace, Lovelace)>::from( + rewards_and_deposit, + ) + .unwrap_or((0, STAKE_CREDENTIAL_DEPOSIT as u64)); - let progress = ProgressBar::new(credentials.len() as u64).with_style( - ProgressStyle::with_template(" Accounts {bar:70} {pos:>7}/{len:7}") + ( + credential, + Option::::from(pool), + Some(deposit), + rewards, + ) + }, + ) + .collect::, + Option, + Lovelace, + )>>( + ); + + info!(what = "credentials", size = credentials.len()); + + let progress = ProgressBar::new(credentials.len() as u64).with_style( + ProgressStyle::with_template( + " Accounts {bar:70} {pos:>7}/{len:7}", + ) .unwrap(), - ); - - while !credentials.is_empty() { - let n = std::cmp::min(BATCH_SIZE, credentials.len()); - let chunk = credentials.drain(0..n); - - db.save( - &Point::Origin, - store::Columns { - utxo: iter::empty(), - pools: iter::empty(), - accounts: chunk, - }, - Default::default(), - ) - .into_diagnostic()?; - - progress.inc(n as u64); + ); + + while !credentials.is_empty() { + let n = std::cmp::min(BATCH_SIZE, credentials.len()); + let chunk = credentials.drain(0..n); + + db.save( + &Point::Origin, + store::Columns { + utxo: iter::empty(), + pools: iter::empty(), + accounts: chunk, + }, + Default::default(), + ) + .into_diagnostic()?; + + progress.inc(n as u64); + } + + progress.finish(); } - progress.finish(); + // pointers + { + let pointers_span = info_span!( + target: EVENT_TARGET, + "pointers", + skipped = true, + ) + .entered(); + d.skip().into_diagnostic()?; + } } - // pointers + // Epoch State / Ledger State / Cert State / Delegation state / dsFutureGenDelegs { + let future_gen_span = info_span!( + target: EVENT_TARGET, + "future_gen_delegations", + skipped = true, + ) + .entered(); d.skip().into_diagnostic()?; } - } - // Epoch State / Ledger State / Cert State / Delegation state / dsFutureGenDelegs - d.skip().into_diagnostic()?; - - // Epoch State / Ledger State / Cert State / Delegation state / dsGenDelegs - d.skip().into_diagnostic()?; - - // Epoch State / Ledger State / Cert State / Delegation state / dsIRewards - d.skip().into_diagnostic()?; - } - - // Epoch State / Ledger State / UTxO State - { - let _utxo_state_len = d.array().into_diagnostic()?; - - let mut utxo: Vec<(TransactionInput, TransactionOutput)> = d - .decode::>() - .into_diagnostic()? - .into_iter() - .collect::>(); - - info!(what = "utxo_entries", size = utxo.len()); - - let progress = ProgressBar::new(utxo.len() as u64).with_style( - ProgressStyle::with_template(" UTxO entries {bar:70} {pos:>7}/{len:7}") - .unwrap(), - ); + // Epoch State / Ledger State / Cert State / Delegation state / dsGenDelegs + { + let gen_delegs_span = info_span!( + target: EVENT_TARGET, + "gen_delegations", + skipped = true, + ) + .entered(); + d.skip().into_diagnostic()?; + } - while !utxo.is_empty() { - let n = std::cmp::min(BATCH_SIZE, utxo.len()); - let chunk = utxo.drain(0..n); + // Epoch State / Ledger State / Cert State / Delegation state / dsIRewards + { + let rewards_span = info_span!( + target: EVENT_TARGET, + "rewards", + skipped = true, + ) + .entered(); + d.skip().into_diagnostic()?; + } + } - db.save( - &Point::Origin, - store::Columns { - utxo: chunk, - pools: iter::empty(), - accounts: iter::empty(), - }, - Default::default(), + // Epoch State / Ledger State / UTxO State + { + let utxo_span = info_span!( + target: EVENT_TARGET, + "utxo_state", + length = d.array().into_diagnostic()?, ) - .into_diagnostic()?; + .entered(); + + let mut utxo: Vec<(TransactionInput, TransactionOutput)> = d + .decode::>() + .into_diagnostic()? + .into_iter() + .collect::>(); + + info!(what = "utxo_entries", size = utxo.len()); + + let progress = ProgressBar::new(utxo.len() as u64).with_style( + ProgressStyle::with_template(" UTxO entries {bar:70} {pos:>7}/{len:7}") + .unwrap(), + ); + + while !utxo.is_empty() { + let n = std::cmp::min(BATCH_SIZE, utxo.len()); + let chunk = utxo.drain(0..n); + + db.save( + &Point::Origin, + store::Columns { + utxo: chunk, + pools: iter::empty(), + accounts: iter::empty(), + }, + Default::default(), + ) + .into_diagnostic()?; + + progress.inc(n as u64); + } - progress.inc(n as u64); + progress.finish(); } - - progress.finish(); } } } @@ -286,14 +404,28 @@ pub async fn run(args: Args) -> miette::Result<()> { let epoch = epoch_from_slot(point.slot_or_default()); - db.next_snapshot(epoch).into_diagnostic()?; + { + let snapshot_span = info_span!( + target: EVENT_TARGET, + "snapshot" + ) + .entered(); + db.next_snapshot(epoch).into_diagnostic()?; + } - db.with_pools(|iterator| { - for (_, pool) in iterator { - pools::Row::tick(pool, epoch + 1) - } - }) - .into_diagnostic()?; + { + let pool_span = info_span!( + target: EVENT_TARGET, + "pools" + ) + .entered(); + db.with_pools(|iterator| { + for (_, pool) in iterator { + pools::Row::tick(pool, epoch + 1) + } + }) + .into_diagnostic()?; + } Ok(()) }