From 8c6ce74426c2a5fb4589aea034217704318150d8 Mon Sep 17 00:00:00 2001 From: Andre da Silva Date: Fri, 12 Dec 2025 14:49:19 -0300 Subject: [PATCH] Cache deserialized immutable data on storage --- Cargo.lock | 2 + linera-base/Cargo.toml | 2 + linera-base/src/lib.rs | 1 + .../src/value_cache.rs | 37 +++- linera-core/src/chain_worker/actor.rs | 3 +- linera-core/src/chain_worker/state.rs | 15 +- linera-core/src/lib.rs | 5 +- .../src/unit_tests/value_cache_tests.rs | 17 +- linera-core/src/worker.rs | 2 +- linera-service/src/cli/main.rs | 10 +- linera-service/src/cli/options.rs | 2 + linera-service/src/exporter/main.rs | 3 +- linera-service/src/proxy/main.rs | 2 + linera-service/src/server.rs | 3 +- linera-service/src/storage.rs | 74 +++++-- linera-storage/src/db_storage.rs | 201 ++++++++++++++++-- linera-storage/src/lib.rs | 4 +- 17 files changed, 309 insertions(+), 74 deletions(-) rename {linera-core => linera-base}/src/value_cache.rs (86%) diff --git a/Cargo.lock b/Cargo.lock index 46d18e9653b3..dd1013ca6dff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5267,9 +5267,11 @@ dependencies = [ "linera-base", "linera-kywasmtime", "linera-witty", + "lru 0.12.5", "port-selector", "prometheus", "proptest", + "quick_cache", "rand 0.8.5", "reqwest 0.11.27", "ruzstd", diff --git a/linera-base/Cargo.toml b/linera-base/Cargo.toml index 8aad1add62cf..e8f6f1d04c1f 100644 --- a/linera-base/Cargo.toml +++ b/linera-base/Cargo.toml @@ -49,7 +49,9 @@ hex.workspace = true k256.workspace = true linera-kywasmtime = { workspace = true, optional = true } linera-witty = { workspace = true, features = ["macros"] } +lru.workspace = true prometheus = { workspace = true, optional = true } +quick_cache.workspace = true proptest = { workspace = true, optional = true, features = ["alloc"] } rand.workspace = true reqwest = { workspace = true, optional = true } diff --git a/linera-base/src/lib.rs b/linera-base/src/lib.rs index 3d8d810e809c..510221a037a2 100644 --- a/linera-base/src/lib.rs +++ b/linera-base/src/lib.rs @@ -39,6 +39,7 @@ pub mod time; #[cfg(test)] mod unit_tests; pub mod util; +pub mod value_cache; pub mod vm; pub use graphql::BcsHexParseError; diff --git a/linera-core/src/value_cache.rs b/linera-base/src/value_cache.rs similarity index 86% rename from linera-core/src/value_cache.rs rename to linera-base/src/value_cache.rs index ffaea6d60985..c11fabb4be7a 100644 --- a/linera-core/src/value_cache.rs +++ b/linera-base/src/value_cache.rs @@ -3,26 +3,24 @@ //! Concurrent caches for values. -#[cfg(test)] -#[path = "unit_tests/value_cache_tests.rs"] -mod unit_tests; - #[cfg(with_metrics)] use std::any::type_name; use std::{borrow::Cow, hash::Hash, num::NonZeroUsize, sync::Mutex}; -use linera_base::{crypto::CryptoHash, hashed::Hashed}; use lru::LruCache; use quick_cache::sync::Cache; +use crate::{crypto::CryptoHash, hashed::Hashed}; + /// A counter metric for the number of cache hits in the [`ValueCache`]. #[cfg(with_metrics)] mod metrics { use std::sync::LazyLock; - use linera_base::prometheus_util::register_int_counter_vec; use prometheus::IntCounterVec; + use crate::prometheus_util::register_int_counter_vec; + pub static CACHE_HIT_COUNT: LazyLock = LazyLock::new(|| { register_int_counter_vec( "value_cache_hit", @@ -65,11 +63,27 @@ where } } + /// Inserts a value into the cache with the given key. + /// Returns `true` if the value was newly inserted, `false` if it already existed. + pub fn insert(&self, key: K, value: V) -> bool { + if self.contains_key(&key) { + false + } else { + self.cache.insert(key, value); + true + } + } + /// Returns a `V` from the cache, if present. pub fn get(&self, key: &K) -> Option { Self::track_cache_usage(self.cache.get(key)) } + /// Returns `true` if the cache contains the given key. + pub fn contains_key(&self, key: &K) -> bool { + self.cache.get(key).is_some() + } + fn track_cache_usage(maybe_value: Option) -> Option { #[cfg(with_metrics)] { @@ -94,7 +108,7 @@ impl ValueCache> { /// inserted in the cache. /// /// Returns [`true`] if the value was not already present in the cache. - pub fn insert(&self, value: Cow>) -> bool { + pub fn insert_hashed(&self, value: Cow>) -> bool { let hash = (*value).hash(); if self.cache.get(&hash).is_some() { false @@ -108,7 +122,7 @@ impl ValueCache> { /// /// The `values` are wrapped in [`Cow`]s so that each `value` is only cloned if it /// needs to be inserted in the cache. - #[cfg(test)] + #[cfg(with_testing)] pub fn insert_all<'a>(&self, values: impl IntoIterator>>) where T: 'a, @@ -122,7 +136,7 @@ impl ValueCache> { } } -#[cfg(test)] +#[cfg(with_testing)] impl ValueCache where K: Hash + Eq + Clone, @@ -137,6 +151,11 @@ where pub fn len(&self) -> usize { self.cache.len() } + + /// Returns [`true`] if the cache is empty. + pub fn is_empty(&self) -> bool { + self.cache.len() == 0 + } } /// A cache for values that need to be "parked" temporarily and taken out for exclusive use. diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index f532a25ff4b2..febf84c1da20 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -32,11 +32,12 @@ use linera_views::context::InactiveContext; use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard}; use tracing::{debug, instrument, trace, Instrument as _}; +use linera_base::value_cache::{ParkingCache, ValueCache}; + use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier}; use crate::{ chain_worker::BlockOutcome, data_types::{ChainInfoQuery, ChainInfoResponse}, - value_cache::{ParkingCache, ValueCache}, worker::{NetworkActions, WorkerError}, }; diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 6755501ebf80..11e255fc2496 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -43,10 +43,11 @@ use linera_views::{ use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard}; use tracing::{debug, instrument, trace, warn}; +use linera_base::value_cache::{ParkingCache, ValueCache}; + use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier, EventSubscriptionsResult}; use crate::{ data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, - value_cache::{ParkingCache, ValueCache}, worker::{NetworkActions, Notification, Reason, WorkerError}, }; @@ -589,7 +590,7 @@ where for block in blocks { let hashed_block = block.into_inner(); let height = hashed_block.inner().header.height; - self.block_values.insert(Cow::Owned(hashed_block.clone())); + self.block_values.insert_hashed(Cow::Owned(hashed_block.clone())); height_to_blocks.insert(height, hashed_block); } } @@ -782,7 +783,7 @@ where } self.block_values - .insert(Cow::Borrowed(certificate.inner().inner())); + .insert_hashed(Cow::Borrowed(certificate.inner().inner())); let required_blob_ids = block.required_blob_ids(); let maybe_blobs = self .maybe_get_required_blobs(required_blob_ids, Some(&block.created_blobs())) @@ -1027,7 +1028,7 @@ where self.save().await?; self.block_values - .insert(Cow::Owned(certificate.into_inner().into_inner())); + .insert_hashed(Cow::Owned(certificate.into_inner().into_inner())); self.register_delivery_notifier(height, &actions, notify_when_messages_are_delivered) .await; @@ -1670,10 +1671,12 @@ where match manager.create_vote(proposal, block, key_pair, local_time, blobs)? { // Cache the value we voted on, so the client doesn't have to send it again. Some(Either::Left(vote)) => { - self.block_values.insert(Cow::Borrowed(vote.value.inner())); + self.block_values + .insert_hashed(Cow::Borrowed(vote.value.inner())); } Some(Either::Right(vote)) => { - self.block_values.insert(Cow::Borrowed(vote.value.inner())); + self.block_values + .insert_hashed(Cow::Borrowed(vote.value.inner())); } None => (), } diff --git a/linera-core/src/lib.rs b/linera-core/src/lib.rs index 47c6cf507b51..2dd76e1d0bed 100644 --- a/linera-core/src/lib.rs +++ b/linera-core/src/lib.rs @@ -23,7 +23,10 @@ pub mod test_utils; pub mod worker; pub(crate) mod updater; -mod value_cache; + +#[cfg(test)] +#[path = "unit_tests/value_cache_tests.rs"] +mod value_cache_tests; pub use local_node::LocalNodeError; pub use updater::DEFAULT_QUORUM_GRACE_PERIOD; diff --git a/linera-core/src/unit_tests/value_cache_tests.rs b/linera-core/src/unit_tests/value_cache_tests.rs index 7f4bc981ff28..2731a34b0387 100644 --- a/linera-core/src/unit_tests/value_cache_tests.rs +++ b/linera-core/src/unit_tests/value_cache_tests.rs @@ -8,11 +8,10 @@ use linera_base::{ data_types::{BlockHeight, Epoch}, hashed::Hashed, identifiers::ChainId, + value_cache::ValueCache, }; use linera_chain::types::Timeout; -use super::ValueCache; - /// Test cache size for unit tests. const TEST_CACHE_SIZE: usize = 10; @@ -33,7 +32,7 @@ fn test_insert_single_certificate_value() { let value = create_dummy_certificate_value(0); let hash = value.hash(); - assert!(cache.insert(Cow::Borrowed(&value))); + assert!(cache.insert_hashed(Cow::Borrowed(&value))); assert!(cache.contains(&hash)); assert_eq!(cache.get(&hash), Some(value)); assert_eq!(cache.len(), 1); @@ -46,7 +45,7 @@ fn test_insert_many_certificate_values_individually() { let values = create_dummy_certificate_values(0..(TEST_CACHE_SIZE as u64)).collect::>(); for value in &values { - assert!(cache.insert(Cow::Borrowed(value))); + assert!(cache.insert_hashed(Cow::Borrowed(value))); } for value in &values { @@ -82,7 +81,7 @@ fn test_reinsertion_of_values() { cache.insert_all(values.iter().map(Cow::Borrowed)); for value in &values { - assert!(!cache.insert(Cow::Borrowed(value))); + assert!(!cache.insert_hashed(Cow::Borrowed(value))); } for value in &values { @@ -105,7 +104,7 @@ fn test_eviction_occurs() { create_dummy_certificate_values(0..((TEST_CACHE_SIZE as u64) * 2)).collect::>(); for value in &values { - cache.insert(Cow::Borrowed(value)); + cache.insert_hashed(Cow::Borrowed(value)); } // Cache size should be bounded by capacity @@ -175,7 +174,7 @@ fn test_access_affects_eviction() { .collect::>(); for value in &extra_values { - cache.insert(Cow::Borrowed(value)); + cache.insert_hashed(Cow::Borrowed(value)); } // The frequently accessed first value should still be present @@ -195,7 +194,7 @@ fn test_promotion_of_reinsertion() { cache.insert_all(values.iter().map(Cow::Borrowed)); // Re-insert the first value (this should "promote" it) - assert!(!cache.insert(Cow::Borrowed(&values[0]))); + assert!(!cache.insert_hashed(Cow::Borrowed(&values[0]))); // Insert additional values to trigger eviction let extra_values = @@ -203,7 +202,7 @@ fn test_promotion_of_reinsertion() { .collect::>(); for value in &extra_values { - cache.insert(Cow::Borrowed(value)); + cache.insert_hashed(Cow::Borrowed(value)); } // The re-inserted first value should still be present diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 2aeb67ddd84a..b2c6b1530fb7 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -47,9 +47,9 @@ use crate::{ data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest}, join_set_ext::{JoinSet, JoinSetExt}, notifier::Notifier, - value_cache::{ParkingCache, ValueCache}, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES, }; +use linera_base::value_cache::{ParkingCache, ValueCache}; #[cfg(test)] #[path = "unit_tests/worker_tests.rs"] diff --git a/linera-service/src/cli/main.rs b/linera-service/src/cli/main.rs index 34362db9dc9a..fb4c4fa69dac 100644 --- a/linera-service/src/cli/main.rs +++ b/linera-service/src/cli/main.rs @@ -81,7 +81,7 @@ use linera_service::{ task_processor::TaskProcessor, util, }; -use linera_storage::{DbStorage, Storage}; +use linera_storage::{DbStorage, Storage, StorageCacheConfig}; use linera_views::store::{KeyValueDatabase, KeyValueStore}; use options::Options; use serde_json::Value; @@ -1776,7 +1776,7 @@ impl RunnableWithStore for DatabaseToolJob<'_> { } => { let genesis_config: GenesisConfig = util::read_json(genesis_config_path)?; let mut storage = - DbStorage::::maybe_create_and_connect(&config, &namespace, None).await?; + DbStorage::::maybe_create_and_connect(&config, &namespace, None, StorageCacheConfig::default()).await?; genesis_config.initialize_storage(&mut storage).await?; info!( "Namespace {namespace} was initialized in {} ms", @@ -1796,7 +1796,7 @@ impl RunnableWithStore for DatabaseToolJob<'_> { } DatabaseToolCommand::ListBlobIds => { let storage = - DbStorage::::maybe_create_and_connect(&config, &namespace, None).await?; + DbStorage::::maybe_create_and_connect(&config, &namespace, None, StorageCacheConfig::default()).await?; let blob_ids = storage.list_blob_ids().await?; info!("Blob IDs listed in {} ms", start_time.elapsed().as_millis()); info!("The list of blob IDs is:"); @@ -1806,7 +1806,7 @@ impl RunnableWithStore for DatabaseToolJob<'_> { } DatabaseToolCommand::ListChainIds => { let storage = - DbStorage::::maybe_create_and_connect(&config, &namespace, None).await?; + DbStorage::::maybe_create_and_connect(&config, &namespace, None, StorageCacheConfig::default()).await?; let chain_ids = storage.list_chain_ids().await?; info!( "Chain IDs listed in {} ms", @@ -1819,7 +1819,7 @@ impl RunnableWithStore for DatabaseToolJob<'_> { } DatabaseToolCommand::ListEventIds => { let storage = - DbStorage::::maybe_create_and_connect(&config, &namespace, None).await?; + DbStorage::::maybe_create_and_connect(&config, &namespace, None, StorageCacheConfig::default()).await?; let event_ids = storage.list_event_ids().await?; info!( "Event IDs listed in {} ms", diff --git a/linera-service/src/cli/options.rs b/linera-service/src/cli/options.rs index 1cf4c398ac1d..cfa08e9a73aa 100644 --- a/linera-service/src/cli/options.rs +++ b/linera-service/src/cli/options.rs @@ -132,8 +132,10 @@ impl Options { debug!("Running command using storage configuration: {storage_config}"); let store_config = storage_config.add_common_storage_options(&self.common_storage_options)?; + let db_storage_cache_config = self.common_storage_options.db_storage_cache_config(); let output = Box::pin(store_config.run_with_storage( self.wasm_runtime.with_wasm_default(), + db_storage_cache_config, self.application_logs, job, )) diff --git a/linera-service/src/exporter/main.rs b/linera-service/src/exporter/main.rs index b51c47618402..9b70d809bde7 100644 --- a/linera-service/src/exporter/main.rs +++ b/linera-service/src/exporter/main.rs @@ -183,10 +183,11 @@ impl ExporterOptions { .storage_config .add_common_storage_options(&self.common_storage_options) .unwrap(); + let db_storage_cache_config = self.common_storage_options.db_storage_cache_config(); // Exporters are part of validator infrastructure and should not output contract logs. let allow_application_logs = false; store_config - .run_with_storage(None, allow_application_logs, context) + .run_with_storage(None, db_storage_cache_config, allow_application_logs, context) .boxed() .await }; diff --git a/linera-service/src/proxy/main.rs b/linera-service/src/proxy/main.rs index 33151defa695..6cbfa45b7a2f 100644 --- a/linera-service/src/proxy/main.rs +++ b/linera-service/src/proxy/main.rs @@ -506,11 +506,13 @@ impl ProxyOptions { let store_config = self .storage_config .add_common_storage_options(&self.common_storage_options)?; + let db_storage_cache_config = self.common_storage_options.db_storage_cache_config(); // Proxies are part of validator infrastructure and should not output contract logs. let allow_application_logs = false; store_config .run_with_storage( None, + db_storage_cache_config, allow_application_logs, ProxyContext::from_options(self)?, ) diff --git a/linera-service/src/server.rs b/linera-service/src/server.rs index 6082506b3d18..8c7f58d97175 100644 --- a/linera-service/src/server.rs +++ b/linera-service/src/server.rs @@ -561,10 +561,11 @@ async fn run(options: ServerOptions) { let store_config = storage_config .add_common_storage_options(&common_storage_options) .unwrap(); + let db_storage_cache_config = common_storage_options.db_storage_cache_config(); // Validators should not output contract logs. let allow_application_logs = false; store_config - .run_with_storage(wasm_runtime, allow_application_logs, job) + .run_with_storage(wasm_runtime, db_storage_cache_config, allow_application_logs, job) .boxed() .await .unwrap() diff --git a/linera-service/src/storage.rs b/linera-service/src/storage.rs index f5ea0110607e..fcc4cb242257 100644 --- a/linera-service/src/storage.rs +++ b/linera-service/src/storage.rs @@ -7,7 +7,9 @@ use anyhow::{anyhow, bail}; use async_trait::async_trait; use linera_client::config::GenesisConfig; use linera_execution::WasmRuntime; -use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE}; +use linera_storage::{ + DbStorage, Storage, StorageCacheConfig as DbStorageCacheConfig, DEFAULT_NAMESPACE, +}; #[cfg(feature = "storage-service")] use linera_storage_service::{ client::StorageServiceDatabase, @@ -85,6 +87,18 @@ pub struct CommonStorageOptions { /// The replication factor for the keyspace #[arg(long, default_value = "1", global = true)] pub storage_replication_factor: u32, + + /// Number of blobs to cache (application-level cache). + #[arg(long, default_value = "1000", global = true)] + pub blob_cache_size: usize, + + /// Number of certificates to cache (application-level cache). + #[arg(long, default_value = "1000", global = true)] + pub certificate_cache_size: usize, + + /// Number of confirmed blocks to cache (application-level cache). + #[arg(long, default_value = "1000", global = true)] + pub confirmed_block_cache_size: usize, } impl CommonStorageOptions { @@ -100,6 +114,14 @@ impl CommonStorageOptions { max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size, } } + + pub fn db_storage_cache_config(&self) -> DbStorageCacheConfig { + DbStorageCacheConfig { + blob_cache_size: self.blob_cache_size, + certificate_cache_size: self.certificate_cache_size, + confirmed_block_cache_size: self.confirmed_block_cache_size, + } + } } /// The configuration of the key value store in use. @@ -629,6 +651,7 @@ impl StoreConfig { pub async fn run_with_storage( self, wasm_runtime: Option, + db_storage_cache_config: DbStorageCacheConfig, allow_application_logs: bool, job: Job, ) -> Result @@ -645,6 +668,7 @@ impl StoreConfig { &config, &namespace, wasm_runtime, + db_storage_cache_config, ) .await? .with_allow_application_logs(allow_application_logs); @@ -659,6 +683,7 @@ impl StoreConfig { &config, &namespace, wasm_runtime, + db_storage_cache_config, ) .await? .with_allow_application_logs(allow_application_logs); @@ -666,26 +691,38 @@ impl StoreConfig { } #[cfg(feature = "rocksdb")] StoreConfig::RocksDb { config, namespace } => { - let storage = - DbStorage::::connect(&config, &namespace, wasm_runtime) - .await? - .with_allow_application_logs(allow_application_logs); + let storage = DbStorage::::connect( + &config, + &namespace, + wasm_runtime, + db_storage_cache_config, + ) + .await? + .with_allow_application_logs(allow_application_logs); Ok(job.run(storage).await) } #[cfg(feature = "dynamodb")] StoreConfig::DynamoDb { config, namespace } => { - let storage = - DbStorage::::connect(&config, &namespace, wasm_runtime) - .await? - .with_allow_application_logs(allow_application_logs); + let storage = DbStorage::::connect( + &config, + &namespace, + wasm_runtime, + db_storage_cache_config, + ) + .await? + .with_allow_application_logs(allow_application_logs); Ok(job.run(storage).await) } #[cfg(feature = "scylladb")] StoreConfig::ScyllaDb { config, namespace } => { - let storage = - DbStorage::::connect(&config, &namespace, wasm_runtime) - .await? - .with_allow_application_logs(allow_application_logs); + let storage = DbStorage::::connect( + &config, + &namespace, + wasm_runtime, + db_storage_cache_config, + ) + .await? + .with_allow_application_logs(allow_application_logs); Ok(job.run(storage).await) } #[cfg(all(feature = "rocksdb", feature = "scylladb"))] @@ -693,7 +730,7 @@ impl StoreConfig { let storage = DbStorage::< DualDatabase, _, - >::connect(&config, &namespace, wasm_runtime) + >::connect(&config, &namespace, wasm_runtime, db_storage_cache_config) .await? .with_allow_application_logs(allow_application_logs); Ok(job.run(storage).await) @@ -756,8 +793,13 @@ impl RunnableWithStore for InitializeStorageJob<'_> { D::Store: KeyValueStore + Clone + Send + Sync + 'static, D::Error: Send + Sync, { - let mut storage = - DbStorage::::maybe_create_and_connect(&config, &namespace, None).await?; + let mut storage = DbStorage::::maybe_create_and_connect( + &config, + &namespace, + None, + DbStorageCacheConfig::default(), + ) + .await?; self.0.initialize_storage(&mut storage).await?; Ok(()) } diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index 8724104e7a76..78aba275a29e 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -10,6 +10,7 @@ use linera_base::{ crypto::CryptoHash, data_types::{Blob, NetworkDescription, TimeDelta, Timestamp}, identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId}, + value_cache::ValueCache, }; use linera_chain::{ types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate}, @@ -236,6 +237,30 @@ pub mod metrics { }); } +/// Default cache size for caches. +const DEFAULT_CACHE_SIZE: usize = 1000; + +/// Configuration for storage caches. +#[derive(Clone, Debug)] +pub struct StorageCacheConfig { + /// Size of the blob cache (keyed by BlobId). + pub blob_cache_size: usize, + /// Size of the certificate cache (keyed by CryptoHash). + pub certificate_cache_size: usize, + /// Size of the confirmed block cache (keyed by CryptoHash). + pub confirmed_block_cache_size: usize, +} + +impl Default for StorageCacheConfig { + fn default() -> Self { + Self { + blob_cache_size: DEFAULT_CACHE_SIZE, + certificate_cache_size: DEFAULT_CACHE_SIZE, + confirmed_block_cache_size: DEFAULT_CACHE_SIZE, + } + } +} + /// The key used for blobs. The Blob ID itself is contained in the root key. const BLOB_KEY: &[u8] = &[0]; @@ -348,6 +373,12 @@ pub struct DbStorage { user_contracts: Arc>, user_services: Arc>, execution_runtime_config: ExecutionRuntimeConfig, + /// Cache for blobs, keyed by BlobId. + blob_cache: Arc>, + /// Cache for certificates, keyed by CryptoHash. + certificate_cache: Arc>, + /// Cache for confirmed blocks, keyed by CryptoHash. + confirmed_block_cache: Arc>, } #[derive(Debug, Serialize, Deserialize)] @@ -593,6 +624,10 @@ where #[instrument(level = "trace", skip_all, fields(%blob_id))] async fn contains_blob(&self, blob_id: BlobId) -> Result { + // Check cache first + if self.blob_cache.contains_key(&blob_id) { + return Ok(true); + } let root_key = RootKey::BlobId(blob_id).bytes(); let store = self.database.open_shared(&root_key)?; let test = store.contains_key(BLOB_KEY).await?; @@ -605,6 +640,10 @@ where async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result, ViewError> { let mut missing_blobs = Vec::new(); for blob_id in blob_ids { + // Check cache first + if self.blob_cache.contains_key(blob_id) { + continue; + } let root_key = RootKey::BlobId(*blob_id).bytes(); let store = self.database.open_shared(&root_key)?; if !store.contains_key(BLOB_KEY).await? { @@ -633,13 +672,23 @@ where &self, hash: CryptoHash, ) -> Result, ViewError> { + // Check cache first + if let Some(block) = self.confirmed_block_cache.get(&hash) { + return Ok(Some(block)); + } + let root_key = RootKey::BlockHash(hash).bytes(); let store = self.database.open_shared(&root_key)?; - let value = store.read_value(BLOCK_KEY).await?; + let value: Option = store.read_value(BLOCK_KEY).await?; #[cfg(with_metrics)] metrics::READ_CONFIRMED_BLOCK_COUNTER .with_label_values(&[]) .inc(); + + // Cache the block if found + if let Some(ref block) = value { + self.confirmed_block_cache.insert(hash, block.clone()); + } Ok(value) } @@ -667,12 +716,24 @@ where #[instrument(skip_all, fields(%blob_id))] async fn read_blob(&self, blob_id: BlobId) -> Result, ViewError> { + // Check cache first + if let Some(blob) = self.blob_cache.get(&blob_id) { + return Ok(Some(blob)); + } + let root_key = RootKey::BlobId(blob_id).bytes(); let store = self.database.open_shared(&root_key)?; let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?; #[cfg(with_metrics)] metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc(); - Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes))) + + // Cache the blob if found + let result = + maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)); + if let Some(ref blob) = result { + self.blob_cache.insert(blob_id, blob.clone()); + } + Ok(result) } #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))] @@ -727,6 +788,8 @@ where let mut batch = MultiPartitionBatch::new(); batch.add_blob(blob)?; self.write_batch(batch).await?; + // Cache the blob after writing + self.blob_cache.insert(blob.id(), blob.clone()); Ok(()) } @@ -773,6 +836,7 @@ where } let mut batch = MultiPartitionBatch::new(); let mut blob_states = Vec::new(); + let mut blobs_written = Vec::new(); for blob in blobs { let root_key = RootKey::BlobId(blob.id()).bytes(); let store = self.database.open_shared(&root_key)?; @@ -780,9 +844,14 @@ where blob_states.push(has_state); if has_state { batch.add_blob(blob)?; + blobs_written.push(blob); } } self.write_batch(batch).await?; + // Cache blobs that were written + for blob in blobs_written { + self.blob_cache.insert(blob.id(), blob.clone()); + } Ok(blob_states) } @@ -795,7 +864,12 @@ where for blob in blobs { batch.add_blob(blob)?; } - self.write_batch(batch).await + self.write_batch(batch).await?; + // Cache all blobs after writing + for blob in blobs { + self.blob_cache.insert(blob.id(), blob.clone()); + } + Ok(()) } #[instrument(skip_all, fields(blobs_len = %blobs.len()))] @@ -809,11 +883,25 @@ where batch.add_blob(blob)?; } batch.add_certificate(certificate)?; - self.write_batch(batch).await + self.write_batch(batch).await?; + // Cache blobs after writing + for blob in blobs { + self.blob_cache.insert(blob.id(), blob.clone()); + } + // Cache certificate and confirmed block after writing + let hash = certificate.hash(); + self.certificate_cache.insert(hash, certificate.clone()); + self.confirmed_block_cache + .insert(hash, certificate.value().clone()); + Ok(()) } #[instrument(skip_all, fields(%hash))] async fn contains_certificate(&self, hash: CryptoHash) -> Result { + // Check cache first + if self.certificate_cache.contains_key(&hash) { + return Ok(true); + } let root_key = RootKey::BlockHash(hash).bytes(); let store = self.database.open_shared(&root_key)?; let results = store.contains_keys(&get_block_keys()).await?; @@ -829,6 +917,11 @@ where &self, hash: CryptoHash, ) -> Result, ViewError> { + // Check cache first + if let Some(certificate) = self.certificate_cache.get(&hash) { + return Ok(Some(certificate)); + } + let root_key = RootKey::BlockHash(hash).bytes(); let store = self.database.open_shared(&root_key)?; let values = store.read_multi_values_bytes(&get_block_keys()).await?; @@ -836,7 +929,15 @@ where metrics::READ_CERTIFICATE_COUNTER .with_label_values(&[]) .inc(); - Self::deserialize_certificate(&values, hash) + let result = Self::deserialize_certificate(&values, hash)?; + + // Cache the certificate and confirmed block if found + if let Some(ref certificate) = result { + self.certificate_cache.insert(hash, certificate.clone()); + self.confirmed_block_cache + .insert(hash, certificate.value().clone()); + } + Ok(result) } #[instrument(skip_all)] @@ -848,22 +949,52 @@ where if hashes.is_empty() { return Ok(Vec::new()); } - let root_keys = Self::get_root_keys_for_certificates(&hashes); - let mut values = Vec::new(); - for root_key in root_keys { - let store = self.database.open_shared(&root_key)?; - values.extend(store.read_multi_values_bytes(&get_block_keys()).await?); + + // Check cache first, collect hashes that need to be fetched from DB + let mut results: Vec> = Vec::with_capacity(hashes.len()); + let mut missing_indices = Vec::new(); + let mut missing_hashes = Vec::new(); + + for (i, hash) in hashes.iter().enumerate() { + if let Some(certificate) = self.certificate_cache.get(hash) { + results.push(Some(certificate)); + } else { + results.push(None); + missing_indices.push(i); + missing_hashes.push(*hash); + } } - #[cfg(with_metrics)] - metrics::READ_CERTIFICATES_COUNTER - .with_label_values(&[]) - .inc_by(hashes.len() as u64); - let mut certificates = Vec::new(); - for (pair, hash) in values.chunks_exact(2).zip(hashes) { - let certificate = Self::deserialize_certificate(pair, hash)?; - certificates.push(certificate); + + // Fetch missing certificates from DB + if !missing_hashes.is_empty() { + let root_keys = Self::get_root_keys_for_certificates(&missing_hashes); + let mut values = Vec::new(); + for root_key in root_keys { + let store = self.database.open_shared(&root_key)?; + values.extend(store.read_multi_values_bytes(&get_block_keys()).await?); + } + #[cfg(with_metrics)] + metrics::READ_CERTIFICATES_COUNTER + .with_label_values(&[]) + .inc_by(missing_hashes.len() as u64); + + for ((pair, hash), idx) in values + .chunks_exact(2) + .zip(missing_hashes.iter()) + .zip(missing_indices.iter()) + { + let certificate = Self::deserialize_certificate(pair, *hash)?; + // Cache if found + if let Some(ref cert) = certificate { + self.certificate_cache.insert(*hash, cert.clone()); + self.confirmed_block_cache + .insert(*hash, cert.value().clone()); + } + results[*idx] = certificate; + } } - Ok(certificates) + + Ok(results) } /// Reads certificates by hashes. @@ -872,6 +1003,9 @@ where /// and the second element is confirmed block. /// /// It does not check if all hashes all returned. + /// + /// Note: No application-level caching is done here because the raw bytes + /// are already cached by `LruCachingDatabase` at the storage layer. #[instrument(skip_all)] async fn read_certificates_raw + Send>( &self, @@ -1116,7 +1250,12 @@ where } impl DbStorage { - fn new(database: Database, wasm_runtime: Option, clock: C) -> Self { + fn new( + database: Database, + wasm_runtime: Option, + clock: C, + cache_config: StorageCacheConfig, + ) -> Self { Self { database: Arc::new(database), clock, @@ -1127,6 +1266,11 @@ impl DbStorage { user_contracts: Arc::new(papaya::HashMap::new()), user_services: Arc::new(papaya::HashMap::new()), execution_runtime_config: ExecutionRuntimeConfig::default(), + blob_cache: Arc::new(ValueCache::new(cache_config.blob_cache_size)), + certificate_cache: Arc::new(ValueCache::new(cache_config.certificate_cache_size)), + confirmed_block_cache: Arc::new(ValueCache::new( + cache_config.confirmed_block_cache_size, + )), } } @@ -1143,22 +1287,26 @@ where Database::Error: Send + Sync, Database::Store: KeyValueStore + Clone + Send + Sync + 'static, { + /// Creates a new storage, creating the namespace if necessary. pub async fn maybe_create_and_connect( config: &Database::Config, namespace: &str, wasm_runtime: Option, + cache_config: StorageCacheConfig, ) -> Result { let database = Database::maybe_create_and_connect(config, namespace).await?; - Ok(Self::new(database, wasm_runtime, WallClock)) + Ok(Self::new(database, wasm_runtime, WallClock, cache_config)) } + /// Connects to existing storage. pub async fn connect( config: &Database::Config, namespace: &str, wasm_runtime: Option, + cache_config: StorageCacheConfig, ) -> Result { let database = Database::connect(config, namespace).await?; - Ok(Self::new(database, wasm_runtime, WallClock)) + Ok(Self::new(database, wasm_runtime, WallClock, cache_config)) } } @@ -1169,6 +1317,7 @@ where Database::Store: KeyValueStore + Clone + Send + Sync + 'static, Database::Error: Send + Sync, { + /// Creates a test storage with default cache configuration. pub async fn make_test_storage(wasm_runtime: Option) -> Self { let config = Database::new_test_config().await.unwrap(); let namespace = generate_test_namespace(); @@ -1182,6 +1331,7 @@ where .unwrap() } + /// Creates a test storage with the specified clock. pub async fn new_for_testing( config: Database::Config, namespace: &str, @@ -1189,7 +1339,12 @@ where clock: TestClock, ) -> Result { let database = Database::recreate_and_connect(&config, namespace).await?; - Ok(Self::new(database, wasm_runtime, clock)) + Ok(Self::new( + database, + wasm_runtime, + clock, + StorageCacheConfig::default(), + )) } } diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index 330bf73e1ee7..9a0a2302b8f4 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -39,7 +39,9 @@ use linera_views::{context::Context, views::RootView, ViewError}; pub use crate::db_storage::metrics; #[cfg(with_testing)] pub use crate::db_storage::TestClock; -pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock}; +pub use crate::db_storage::{ + ChainStatesFirstAssignment, DbStorage, StorageCacheConfig, WallClock, +}; /// The default namespace to be used when none is specified pub const DEFAULT_NAMESPACE: &str = "default";