diff --git a/src/db/blockstore_with_read_cache.rs b/src/db/blockstore_with_read_cache.rs new file mode 100644 index 000000000000..2a184d028361 --- /dev/null +++ b/src/db/blockstore_with_read_cache.rs @@ -0,0 +1,255 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use cid::Cid; +use fvm_ipld_blockstore::Blockstore; +use lru::LruCache; +use parking_lot::Mutex; +use std::{ + num::NonZeroUsize, + sync::{ + Arc, + atomic::{self, AtomicUsize}, + }, +}; + +pub trait BlockstoreReadCache { + fn get(&self, k: &Cid) -> Option>; + + fn put(&self, k: Cid, block: Vec); + + fn len(&self) -> usize; + + fn size_in_bytes(&self) -> usize; +} + +pub struct LruBlockstoreReadCache { + lru: Mutex>>, + size_in_bytes: AtomicUsize, +} + +impl LruBlockstoreReadCache { + pub fn new(cap: NonZeroUsize) -> Self { + Self { + lru: Mutex::new(LruCache::new(cap)), + size_in_bytes: AtomicUsize::default(), + } + } +} + +impl BlockstoreReadCache for LruBlockstoreReadCache { + fn get(&self, k: &Cid) -> Option> { + self.lru.lock().get(k).cloned() + } + + fn put(&self, k: Cid, block: Vec) { + let block_size = block.len(); + if let Some((_, old_block)) = self.lru.lock().push(k, block) { + let old_block_size = old_block.len(); + if block_size >= old_block_size { + self.size_in_bytes + .fetch_add(block_size - old_block_size, atomic::Ordering::Relaxed); + } else { + self.size_in_bytes + .fetch_sub(old_block_size - block_size, atomic::Ordering::Relaxed); + } + } else { + self.size_in_bytes.fetch_add( + std::mem::size_of::() + block_size, + atomic::Ordering::Relaxed, + ); + } + } + + fn len(&self) -> usize { + self.lru.lock().len() + } + + fn size_in_bytes(&self) -> usize { + self.size_in_bytes.load(atomic::Ordering::Relaxed) + } +} + +#[derive(Debug, Default)] +pub struct VoidBlockstoreReadCache; + +impl BlockstoreReadCache for VoidBlockstoreReadCache { + fn get(&self, _: &Cid) -> Option> { + None + } + + fn put(&self, _: Cid, _: Vec) {} + + fn len(&self) -> usize { + 0 + } + + fn size_in_bytes(&self) -> usize { + 0 + } +} + +impl BlockstoreReadCache for Arc { + fn get(&self, k: &Cid) -> Option> { + self.as_ref().get(k) + } + + fn put(&self, k: Cid, block: Vec) { + self.as_ref().put(k, block) + } + + fn len(&self) -> usize { + self.as_ref().len() + } + + fn size_in_bytes(&self) -> usize { + self.as_ref().size_in_bytes() + } +} + +pub trait BlockstoreReadCacheStats { + fn hit(&self) -> usize; + + fn track_hit(&self); + + fn miss(&self) -> usize; + + fn track_miss(&self); +} + +#[derive(Debug, Default)] +pub struct DefaultBlockstoreReadCacheStats { + hit: AtomicUsize, + miss: AtomicUsize, +} + +impl BlockstoreReadCacheStats for DefaultBlockstoreReadCacheStats { + fn hit(&self) -> usize { + self.hit.load(atomic::Ordering::Relaxed) + } + + fn track_hit(&self) { + self.hit.fetch_add(1, atomic::Ordering::Relaxed); + } + + fn miss(&self) -> usize { + self.miss.load(atomic::Ordering::Relaxed) + } + + fn track_miss(&self) { + self.miss.fetch_add(1, atomic::Ordering::Relaxed); + } +} + +pub struct BlockstoreWithReadCache< + DB: Blockstore, + CACHE: BlockstoreReadCache, + STATS: BlockstoreReadCacheStats, +> { + inner: DB, + cache: CACHE, + stats: Option, +} + +impl + BlockstoreWithReadCache +{ + pub fn new(db: DB, cache: CACHE, stats: Option) -> Self { + Self { + inner: db, + cache, + stats, + } + } + + pub fn stats(&self) -> Option<&STATS> { + self.stats.as_ref() + } +} + +impl Blockstore + for BlockstoreWithReadCache +{ + fn get(&self, k: &Cid) -> anyhow::Result>> { + if let Some(cached) = self.cache.get(k) { + self.stats.as_ref().map(BlockstoreReadCacheStats::track_hit); + Ok(Some(cached)) + } else { + let block = self.inner.get(k)?; + self.stats + .as_ref() + .map(BlockstoreReadCacheStats::track_miss); + if let Some(block) = &block { + self.cache.put(*k, block.clone()); + } + Ok(block) + } + } + + fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.inner.put_keyed(k, block) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{db::MemoryDB, utils::rand::forest_rng}; + use fvm_ipld_encoding::DAG_CBOR; + use multihash_codetable::Code::Blake2b256; + use multihash_codetable::MultihashDigest as _; + use rand::Rng as _; + + #[test] + fn test_blockstore_read_cache() { + const N_RECORDS: usize = 4; + const CACHE_SIZE: usize = 2; + let mem_db = Arc::new(MemoryDB::default()); + let mut records = Vec::with_capacity(N_RECORDS); + for _ in 0..N_RECORDS { + let mut record = [0; 1024]; + forest_rng().fill(&mut record); + let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice())); + mem_db.put_keyed(&key, &record).unwrap(); + records.push((key, record)); + } + let cache = Arc::new(LruBlockstoreReadCache::new(CACHE_SIZE.try_into().unwrap())); + let db = BlockstoreWithReadCache::new( + mem_db.clone(), + cache.clone(), + Some(DefaultBlockstoreReadCacheStats::default()), + ); + + assert_eq!(cache.len(), 0); + assert_eq!(db.stats().unwrap().hit(), 0); + assert_eq!(db.stats().unwrap().miss(), 0); + + for (i, (k, v)) in records.iter().enumerate() { + assert_eq!(&db.get(k).unwrap().unwrap(), v); + + assert_eq!(cache.len(), CACHE_SIZE.min(i + 1)); + assert_eq!(db.stats().unwrap().hit(), i); + assert_eq!(db.stats().unwrap().miss(), i + 1); + + assert_eq!(&db.get(k).unwrap().unwrap(), v); + + assert_eq!(cache.len(), CACHE_SIZE.min(i + 1)); + assert_eq!(db.stats().unwrap().hit(), i + 1); + assert_eq!(db.stats().unwrap().miss(), i + 1); + } + + let (k0, v0) = &records[0]; + + assert_eq!(&db.get(k0).unwrap().unwrap(), v0); + + assert_eq!(cache.len(), CACHE_SIZE); + assert_eq!(db.stats().unwrap().hit(), 4); + assert_eq!(db.stats().unwrap().miss(), 5); + + assert_eq!(&db.get(k0).unwrap().unwrap(), v0); + + assert_eq!(cache.len(), CACHE_SIZE); + assert_eq!(db.stats().unwrap().hit(), 5); + assert_eq!(db.stats().unwrap().miss(), 5); + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs index c97057d55923..70eb8f4a7b55 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,6 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +mod blockstore_with_read_cache; mod blockstore_with_write_buffer; pub mod car; mod memory; @@ -9,6 +10,7 @@ pub mod parity_db_config; pub mod gc; pub mod ttl; +pub use blockstore_with_read_cache::*; pub use blockstore_with_write_buffer::BlockstoreWithWriteBuffer; pub use memory::MemoryDB; mod db_mode; diff --git a/src/rpc/methods/f3.rs b/src/rpc/methods/f3.rs index f6fcc0f552aa..33290b92d46d 100644 --- a/src/rpc/methods/f3.rs +++ b/src/rpc/methods/f3.rs @@ -19,6 +19,10 @@ use crate::{ blocks::Tipset, chain::index::ResolveNullTipset, chain_sync::TipsetValidator, + db::{ + BlockstoreReadCache as _, BlockstoreReadCacheStats as _, BlockstoreWithReadCache, + DefaultBlockstoreReadCacheStats, LruBlockstoreReadCache, + }, libp2p::{NetRPCMethods, NetworkMessage}, lotus_json::HasLotusJson as _, rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError, types::ApiTipsetKey}, @@ -51,7 +55,7 @@ use num::Signed as _; use once_cell::sync::Lazy; use once_cell::sync::OnceCell; use parking_lot::RwLock; -use std::{borrow::Cow, fmt::Display, num::NonZeroU64, str::FromStr as _, sync::Arc}; +use std::{borrow::Cow, fmt::Display, str::FromStr as _, sync::Arc}; pub static F3_LEASE_MANAGER: OnceCell = OnceCell::new(); @@ -157,24 +161,34 @@ impl GetPowerTable { ctx: &Ctx, ts: &Arc, ) -> anyhow::Result> { + // The RAM overhead on mainnet is ~14MiB + const BLOCKSTORE_CACHE_CAP: usize = 65536; + static BLOCKSTORE_CACHE: Lazy> = Lazy::new(|| { + Arc::new(LruBlockstoreReadCache::new( + BLOCKSTORE_CACHE_CAP.try_into().expect("Infallible"), + )) + }); + let db = BlockstoreWithReadCache::new( + ctx.store_owned(), + BLOCKSTORE_CACHE.clone(), + Some(DefaultBlockstoreReadCacheStats::default()), + ); + macro_rules! handle_miner_state_v12_on { ($version:tt, $id_power_worker_mappings:ident, $ts:expr, $state:expr, $policy:expr) => { fn map_err(e: E) -> fil_actors_shared::$version::ActorError { fil_actors_shared::$version::ActorError::unspecified(e.to_string()) } - let claims = $state.load_claims(ctx.store())?; + let claims = $state.load_claims(&db)?; claims.for_each(|miner, claim| { if !claim.quality_adj_power.is_positive() { return Ok(()); } let id = miner.id().map_err(map_err)?; - let (_, ok) = $state.miner_nominal_power_meets_consensus_minimum( - $policy, - ctx.store(), - id, - )?; + let (_, ok) = + $state.miner_nominal_power_meets_consensus_minimum($policy, &db, id)?; if !ok { return Ok(()); } @@ -188,7 +202,7 @@ impl GetPowerTable { // fee debt don't add the miner to power table return Ok(()); } - let miner_info = miner_state.info(ctx.store()).map_err(map_err)?; + let miner_info = miner_state.info(&db).map_err(map_err)?; // check consensus faults if $ts.epoch() <= miner_info.consensus_fault_elapsed { return Ok(()); @@ -210,7 +224,7 @@ impl GetPowerTable { let claims = fil_actors_shared::v8::make_map_with_root::< _, fil_actor_power_state::v8::Claim, - >(&s.claims, ctx.store())?; + >(&s.claims, &db)?; claims.for_each(|key, claim| { let miner = Address::from_bytes(key)?; if !claim.quality_adj_power.is_positive() { @@ -220,7 +234,7 @@ impl GetPowerTable { let id = miner.id().map_err(map_err)?; let ok = s.miner_nominal_power_meets_consensus_minimum( &from_policy_v13_to_v9(&ctx.chain_config().policy), - ctx.store(), + &db, &miner.into(), )?; if !ok { @@ -236,7 +250,7 @@ impl GetPowerTable { // fee debt don't add the miner to power table return Ok(()); } - let miner_info = miner_state.info(ctx.store()).map_err(map_err)?; + let miner_info = miner_state.info(&db).map_err(map_err)?; // check consensus faults if ts.epoch() <= miner_info.consensus_fault_elapsed { return Ok(()); @@ -253,7 +267,7 @@ impl GetPowerTable { let claims = fil_actors_shared::v9::make_map_with_root::< _, fil_actor_power_state::v9::Claim, - >(&s.claims, ctx.store())?; + >(&s.claims, &db)?; claims.for_each(|key, claim| { let miner = Address::from_bytes(key)?; if !claim.quality_adj_power.is_positive() { @@ -263,7 +277,7 @@ impl GetPowerTable { let id = miner.id().map_err(map_err)?; let ok = s.miner_nominal_power_meets_consensus_minimum( &from_policy_v13_to_v9(&ctx.chain_config().policy), - ctx.store(), + &db, &miner.into(), )?; if !ok { @@ -279,7 +293,7 @@ impl GetPowerTable { // fee debt don't add the miner to power table return Ok(()); } - let miner_info = miner_state.info(ctx.store()).map_err(map_err)?; + let miner_info = miner_state.info(&db).map_err(map_err)?; // check consensus faults if ts.epoch() <= miner_info.consensus_fault_elapsed { return Ok(()); @@ -296,7 +310,7 @@ impl GetPowerTable { let claims = fil_actors_shared::v10::make_map_with_root::< _, fil_actor_power_state::v10::Claim, - >(&s.claims, ctx.store())?; + >(&s.claims, &db)?; claims.for_each(|key, claim| { let miner = Address::from_bytes(key)?; if !claim.quality_adj_power.is_positive() { @@ -306,7 +320,7 @@ impl GetPowerTable { let id = miner.id().map_err(map_err)?; let (_, ok) = s.miner_nominal_power_meets_consensus_minimum( &from_policy_v13_to_v10(&ctx.chain_config().policy), - ctx.store(), + &db, id, )?; if !ok { @@ -322,7 +336,7 @@ impl GetPowerTable { // fee debt don't add the miner to power table return Ok(()); } - let miner_info = miner_state.info(ctx.store()).map_err(map_err)?; + let miner_info = miner_state.info(&db).map_err(map_err)?; // check consensus faults if ts.epoch() <= miner_info.consensus_fault_elapsed { return Ok(()); @@ -339,7 +353,7 @@ impl GetPowerTable { let claims = fil_actors_shared::v11::make_map_with_root::< _, fil_actor_power_state::v11::Claim, - >(&s.claims, ctx.store())?; + >(&s.claims, &db)?; claims.for_each(|key, claim| { let miner = Address::from_bytes(key)?; if !claim.quality_adj_power.is_positive() { @@ -349,7 +363,7 @@ impl GetPowerTable { let id = miner.id().map_err(map_err)?; let (_, ok) = s.miner_nominal_power_meets_consensus_minimum( &from_policy_v13_to_v11(&ctx.chain_config().policy), - ctx.store(), + &db, id, )?; if !ok { @@ -365,7 +379,7 @@ impl GetPowerTable { // fee debt don't add the miner to power table return Ok(()); } - let miner_info = miner_state.info(ctx.store()).map_err(map_err)?; + let miner_info = miner_state.info(&db).map_err(map_err)?; // check consensus faults if ts.epoch() <= miner_info.consensus_fault_elapsed { return Ok(()); @@ -433,6 +447,11 @@ impl GetPowerTable { power_entries.push(F3PowerEntry { id, power, pub_key }); } power_entries.sort(); + + if let Some(stats) = db.stats() { + tracing::debug!(epoch=%ts.epoch(), hit=%stats.hit(), miss=%stats.miss(),cache_len=%BLOCKSTORE_CACHE.len(), cache_size=%human_bytes::human_bytes(BLOCKSTORE_CACHE.size_in_bytes() as f64), "F3.GetPowerTable blockstore read cache"); + } + Ok(power_entries) } } @@ -542,7 +561,7 @@ impl RpcMethod<1> for Finalize { Some(ts) => ts, None => ctx .sync_network_context - .chain_exchange_headers(None, &tsk, NonZeroU64::new(1).expect("Infallible")) + .chain_exchange_headers(None, &tsk, 1.try_into().expect("Infallible")) .await? .first() .cloned()