diff --git a/Cargo.lock b/Cargo.lock index 93d30a07d34..fad35167e84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,15 +216,6 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" -[[package]] -name = "array-util" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e509844de8f09b90a2c3444684a2b6695f4071360e13d2fda0af9f749cc2ed6" -dependencies = [ - "arrayvec", -] - [[package]] name = "arrayvec" version = "0.7.6" @@ -574,18 +565,6 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" -[[package]] -name = "auto_enums" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "459b77b7e855f875fd15f101064825cd79eb83185a961d66e6298560126facfb" -dependencies = [ - "derive_utils", - "proc-macro2 1.0.95", - "quote 1.0.40", - "syn 2.0.102", -] - [[package]] name = "autocfg" version = "1.4.0" @@ -2336,6 +2315,17 @@ dependencies = [ "libc", ] +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "cpufeatures" version = "0.2.16" @@ -2767,17 +2757,6 @@ dependencies = [ "syn 2.0.102", ] -[[package]] -name = "derive_utils" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65f152f4b8559c4da5d574bafc7af85454d706b4c5fe8b530d508cacbb6807ea" -dependencies = [ - "proc-macro2 1.0.95", - "quote 1.0.40", - "syn 2.0.102", -] - [[package]] name = "dialoguer" version = "0.11.0" @@ -3016,9 +2995,9 @@ dependencies = [ [[package]] name = "fastant" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bf7fa928ce0c4a43bd6e7d1235318fc32ac3a3dea06a2208c44e729449471a" +checksum = "2e825441bfb2d831c47c97d05821552db8832479f44c571b97fededbf0099c07" dependencies = [ "small_ctor", "web-time", @@ -3169,6 +3148,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -3222,9 +3207,9 @@ dependencies = [ [[package]] name = "foyer" -version = "0.17.3" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "635c7077026867cb5e5ea576c461f29b1c4151fce7a9d7cc3a1b1a9902d95c65" +checksum = "8d6e14f449dffa7c8e7bf17c4847f3a1072a4115cce4e5c84f0e25b5326df561" dependencies = [ "anyhow", "equivalent", @@ -3232,6 +3217,7 @@ dependencies = [ "foyer-common", "foyer-memory", "foyer-storage", + "futures-util", "madsim-tokio", "mixtrics", "pin-project", @@ -3242,11 +3228,11 @@ dependencies = [ [[package]] name = "foyer-common" -version = "0.17.3" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ed2316785e80137c7b91bb74dab1dc1967c3272df05825397b73ae8fc527041" +checksum = "269e6bab6c54c5e3d5cc50e1466304aebef2f91e29223db51f395edc06fa2d50" dependencies = [ - "ahash", + "anyhow", "bincode", "bytes", "cfg-if", @@ -3257,8 +3243,8 @@ dependencies = [ "parking_lot", "pin-project", "serde", - "thiserror 2.0.4", "tokio", + "twox-hash 2.0.1", ] [[package]] @@ -3272,11 +3258,11 @@ dependencies = [ [[package]] name = "foyer-memory" -version = "0.17.3" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "090cf5b89d49fd61e7da9bfae3a1aef605f03196d542b2f8171c74f3add013f4" +checksum = "41abc6230d1dcb24f1ac6746a2542a2c1381406f1f5b6ef4a0860ca17fd0900c" dependencies = [ - "ahash", + "anyhow", "arc-swap", "bitflags 2.9.0", "cmsketch", @@ -3284,32 +3270,31 @@ dependencies = [ "fastrace", "foyer-common", "foyer-intrusive-collections", - "hashbrown 0.15.2", + "futures-util", + "hashbrown 0.16.1", "itertools 0.14.0", "madsim-tokio", "mixtrics", "parking_lot", + "paste", "pin-project", "serde", - "thiserror 2.0.4", "tokio", "tracing", ] [[package]] name = "foyer-storage" -version = "0.17.3" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "095e857c97d6339d4a4a6424b88d08fe08ad0366bfbfaf65d6ddf55baf3d2a38" +checksum = "a3d238ff6f0ea10c1cc585f3046fb7227a5033d1f31f180890ad17806eebf2ed" dependencies = [ - "ahash", "allocator-api2", "anyhow", - "array-util", - "auto_enums", "bytes", - "clap", + "core_affinity", "equivalent", + "fastant", "fastrace", "flume", "foyer-common", @@ -3317,17 +3302,16 @@ dependencies = [ "fs4 0.13.1", "futures-core", "futures-util", + "hashbrown 0.16.1", + "io-uring", "itertools 0.14.0", "libc", "lz4", "madsim-tokio", - "ordered_hash_map", "parking_lot", - "paste", "pin-project", "rand 0.9.2", "serde", - "thiserror 2.0.4", "tokio", "tracing", "twox-hash 2.0.1", @@ -3705,32 +3689,34 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.13.2" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", + "allocator-api2", ] [[package]] name = "hashbrown" -version = "0.14.5" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ - "ahash", "allocator-api2", + "equivalent", + "foldhash 0.1.3", ] [[package]] name = "hashbrown" -version = "0.15.2" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.2.0", ] [[package]] @@ -4420,6 +4406,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd7bddefd0a8833b88a4b68f90dae22c7450d11b354198baee3874fd811b344" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.10.1" @@ -5166,9 +5163,9 @@ dependencies = [ [[package]] name = "mixtrics" -version = "0.1.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "749ed12bab176c8a42c13a679dd2de12876d5ad4abe7525548e31ae001a9ebbf" +checksum = "fb252c728b9d77c6ef9103f0c81524fa0a3d3b161d0a936295d7fbeff6e04c11" dependencies = [ "itertools 0.14.0", "opentelemetry", @@ -5773,15 +5770,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "ordered_hash_map" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176" -dependencies = [ - "hashbrown 0.13.2", -] - [[package]] name = "outref" version = "0.5.1" diff --git a/rust/blockstore/src/arrow/provider.rs b/rust/blockstore/src/arrow/provider.rs index 122db416d59..383843ae041 100644 --- a/rust/blockstore/src/arrow/provider.rs +++ b/rust/blockstore/src/arrow/provider.rs @@ -469,7 +469,7 @@ impl BlockManager { id: &Uuid, priority: StorageRequestPriority, ) -> Result, GetError> { - let block = self.block_cache.obtain(*id).await.ok().flatten(); + let block = self.block_cache.get(id).await.ok().flatten(); if let Some(block) = block { return Ok(Some(block)); } @@ -664,7 +664,7 @@ impl RootManager { prefix_path: &str, max_block_size_bytes: usize, ) -> Result, RootManagerError> { - let index = self.cache.obtain(*id).await.ok().flatten(); + let index = self.cache.get(id).await.ok().flatten(); match index { Some(index) => Ok(Some(index)), None => { diff --git a/rust/cache/Cargo.toml b/rust/cache/Cargo.toml index 3f7efe87a2d..9418ffbb5a3 100644 --- a/rust/cache/Cargo.toml +++ b/rust/cache/Cargo.toml @@ -8,8 +8,8 @@ path = "src/lib.rs" [dependencies] clap = { workspace = true } -foyer = { version = "0.17.3", features = ["tracing", "serde"] } -mixtrics = { version = "0.1.0", features = ["opentelemetry_0_27"] } +foyer = { version = "0.21.0", features = ["tracing", "serde"] } +mixtrics = { version = "0.2.3", features = ["opentelemetry_0_27"] } anyhow = "1.0" opentelemetry = { version = "0.27.0", default-features = false, features = ["trace", "metrics"] } diff --git a/rust/cache/src/foyer.rs b/rust/cache/src/foyer.rs index 3c95bbb8f76..ed352b3f8b7 100644 --- a/rust/cache/src/foyer.rs +++ b/rust/cache/src/foyer.rs @@ -4,9 +4,9 @@ use chroma_error::ChromaError; use chroma_tracing::util::{StopWatchUnit, Stopwatch}; use clap::Parser; use foyer::{ - CacheBuilder, DirectFsDeviceOptions, Engine, FifoConfig, FifoPicker, HybridCacheBuilder, - InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, S3FifoConfig, StorageKey, - StorageValue, Throttle, TracingOptions, + BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoConfig, FifoPicker, FsDeviceBuilder, + HybridCacheBuilder, InvalidRatioPicker, IoEngineBuilder, LfuConfig, LruConfig, + PsyncIoEngineBuilder, S3FifoConfig, StorageKey, StorageValue, Throttle, TracingOptions, }; use opentelemetry::{global, KeyValue}; use serde::{Deserialize, Serialize}; @@ -15,6 +15,8 @@ use std::hash::Hash; use std::sync::Arc; use std::time::Duration; +pub use foyer::Error as FoyerError; + pub const MIB: usize = 1024 * 1024; const fn default_capacity() -> usize { @@ -37,10 +39,6 @@ const fn default_flushers() -> usize { 4 } -const fn default_flush() -> bool { - false -} - const fn default_reclaimers() -> usize { 2 } @@ -83,10 +81,6 @@ const fn default_trace_get_us() -> usize { 1000 * 100 } -const fn default_trace_obtain_us() -> usize { - 1000 * 100 -} - const fn default_trace_remove_us() -> usize { 1000 * 100 } @@ -142,11 +136,6 @@ pub struct FoyerCacheConfig { #[serde(default = "default_buffer_pool_size")] pub buffer_pool: usize, - /// AKA fsync - #[arg(long, default_value_t = false)] - #[serde(default = "default_flush")] - pub flush: bool, - /// Reclaimer count. #[arg(long, default_value_t = 2)] #[serde(default = "default_reclaimers")] @@ -195,11 +184,6 @@ pub struct FoyerCacheConfig { #[serde(default = "default_trace_get_us")] pub trace_get_us: usize, - /// Record obtain trace threshold. Only effective with "mtrace" feature. - #[arg(long, default_value_t = 1000 * 100)] - #[serde(default = "default_trace_obtain_us")] - pub trace_obtain_us: usize, - /// Record remove trace threshold. Only effective with "mtrace" feature. #[arg(long, default_value_t = 1000 * 100)] #[serde(default = "default_trace_remove_us")] @@ -279,7 +263,6 @@ impl Default for FoyerCacheConfig { disk: default_disk(), file_size: default_file_size(), flushers: default_flushers(), - flush: default_flush(), reclaimers: default_reclaimers(), recover_concurrency: default_recover_concurrency(), deterministic_hashing: default_deterministic_hashing(), @@ -289,7 +272,6 @@ impl Default for FoyerCacheConfig { invalid_ratio: default_invalid_ratio(), trace_insert_us: default_trace_insert_us(), trace_get_us: default_trace_get_us(), - trace_obtain_us: default_trace_obtain_us(), trace_remove_us: default_trace_remove_us(), trace_fetch_us: default_trace_fetch_us(), buffer_pool: default_buffer_pool_size(), @@ -307,7 +289,6 @@ where cache_hit: opentelemetry::metrics::Counter, cache_miss: opentelemetry::metrics::Counter, get_latency: opentelemetry::metrics::Histogram, - obtain_latency: opentelemetry::metrics::Histogram, insert_latency: opentelemetry::metrics::Histogram, remove_latency: opentelemetry::metrics::Histogram, clear_latency: opentelemetry::metrics::Histogram, @@ -336,9 +317,10 @@ where let tracing_options = TracingOptions::new() .with_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _)) .with_record_hybrid_get_threshold(Duration::from_micros(config.trace_get_us as _)) - .with_record_hybrid_obtain_threshold(Duration::from_micros(config.trace_obtain_us as _)) .with_record_hybrid_remove_threshold(Duration::from_micros(config.trace_remove_us as _)) - .with_record_hybrid_fetch_threshold(Duration::from_micros(config.trace_fetch_us as _)); + .with_record_hybrid_get_or_fetch_threshold(Duration::from_micros( + config.trace_fetch_us as _, + )); let otel_0_27_metrics = Box::new( mixtrics::registry::opentelemetry_0_27::OpenTelemetryMetricsRegistry::new( @@ -386,25 +368,28 @@ where ))); }; - let mut device_options = DirectFsDeviceOptions::new(dir) - .with_capacity(config.disk * MIB) - .with_file_size(config.file_size * MIB); + let io_engine = PsyncIoEngineBuilder::new().build().await.map_err(|e| { + CacheError::InvalidCacheConfig(format!("build io engine failed: {:?}", e)).boxed() + })?; + let mut io_device_builder = FsDeviceBuilder::new(dir).with_capacity(config.disk * MIB); if config.admission_rate_limit > 0 { - device_options = device_options.with_throttle( + io_device_builder = io_device_builder.with_throttle( Throttle::new().with_write_throughput(config.admission_rate_limit * MIB), ); } + let io_device = io_device_builder.build().map_err(|e| { + CacheError::InvalidCacheConfig(format!("build io device failed: {:?}", e)).boxed() + })?; let builder = builder .with_weighter(|_, v| v.weight()) - .storage(Engine::Large) - .with_device_options(device_options) - .with_flush(config.flush) - .with_recover_mode(foyer::RecoverMode::Strict) - .with_large_object_disk_cache_options( - LargeEngineOptions::new() - .with_indexer_shards(config.shards) + .storage() + .with_recover_mode(foyer::RecoverMode::Quiet) + .with_io_engine(io_engine) + .with_engine_config( + BlockEngineBuilder::new(io_device) + .with_block_size(config.file_size * MIB) .with_recover_concurrency(config.recover_concurrency) .with_flushers(config.flushers) .with_reclaimers(config.reclaimers) @@ -424,7 +409,6 @@ where let cache_hit = meter.u64_counter("cache_hit").build(); let cache_miss = meter.u64_counter("cache_miss").build(); let get_latency = meter.u64_histogram("get_latency").build(); - let obtain_latency = meter.u64_histogram("obtain_latency").build(); let insert_latency = meter.u64_histogram("insert_latency").build(); let remove_latency = meter.u64_histogram("remove_latency").build(); let clear_latency = meter.u64_histogram("clear_latency").build(); @@ -435,7 +419,6 @@ where cache_hit, cache_miss, get_latency, - obtain_latency, insert_latency, remove_latency, clear_latency, @@ -485,18 +468,6 @@ where Ok(self.cache.clear().await?) } - async fn obtain(&self, key: K) -> Result, CacheError> { - let hostname = &[self.hostname.clone()]; - let _stopwatch = Stopwatch::new(&self.obtain_latency, hostname, StopWatchUnit::Millis); - let res = self.cache.obtain(key).await?.map(|v| v.value().clone()); - if res.is_some() { - self.cache_hit.add(1, hostname); - } else { - self.cache_miss.add(1, hostname); - } - Ok(res) - } - async fn may_contain(&self, key: &K) -> bool { self.cache.contains(key) } @@ -519,7 +490,6 @@ where cache_hit: opentelemetry::metrics::Counter, cache_miss: opentelemetry::metrics::Counter, get_latency: opentelemetry::metrics::Histogram, - obtain_latency: opentelemetry::metrics::Histogram, insert_latency: opentelemetry::metrics::Histogram, remove_latency: opentelemetry::metrics::Histogram, clear_latency: opentelemetry::metrics::Histogram, @@ -554,7 +524,6 @@ where let cache_hit = meter.u64_counter("cache_hit").build(); let cache_miss = meter.u64_counter("cache_miss").build(); let get_latency = meter.u64_histogram("get_latency").build(); - let obtain_latency = meter.u64_histogram("obtain_latency").build(); let insert_latency = meter.u64_histogram("insert_latency").build(); let remove_latency = meter.u64_histogram("remove_latency").build(); let clear_latency = meter.u64_histogram("clear_latency").build(); @@ -565,7 +534,6 @@ where cache_hit, cache_miss, get_latency, - obtain_latency, insert_latency, remove_latency, clear_latency, @@ -612,7 +580,6 @@ where let cache_hit = meter.u64_counter("cache_hit").build(); let cache_miss = meter.u64_counter("cache_miss").build(); let get_latency = meter.u64_histogram("get_latency").build(); - let obtain_latency = meter.u64_histogram("obtain_latency").build(); let insert_latency = meter.u64_histogram("insert_latency").build(); let remove_latency = meter.u64_histogram("remove_latency").build(); let clear_latency = meter.u64_histogram("clear_latency").build(); @@ -623,7 +590,6 @@ where cache_hit, cache_miss, get_latency, - obtain_latency, insert_latency, remove_latency, clear_latency, @@ -668,18 +634,6 @@ where self.cache.clear(); Ok(()) } - - async fn obtain(&self, key: K) -> Result, CacheError> { - let hostname = &[self.hostname.clone()]; - let _stopwatch = Stopwatch::new(&self.obtain_latency, hostname, StopWatchUnit::Millis); - let res = self.cache.get(&key).map(|v| v.value().clone()); - if res.is_some() { - self.cache_hit.add(1, hostname); - } else { - self.cache_miss.add(1, hostname); - } - Ok(res) - } } impl super::PersistentCache for FoyerPlainCache @@ -796,7 +750,6 @@ mod test { .to_string(); let cache = FoyerCacheConfig { dir: Some(dir.clone()), - flush: true, ..Default::default() } .build_hybrid_test::() @@ -826,7 +779,6 @@ mod test { .to_string(); let cache = FoyerCacheConfig { dir: Some(dir.clone()), - flush: true, ..Default::default() } .build_hybrid_test::() @@ -853,7 +805,6 @@ mod test { .to_string(); let cache = FoyerCacheConfig { dir: Some(dir.clone()), - flush: true, ..Default::default() } .build_hybrid_test::() diff --git a/rust/cache/src/lib.rs b/rust/cache/src/lib.rs index fb3a0da6edb..45beb069adc 100644 --- a/rust/cache/src/lib.rs +++ b/rust/cache/src/lib.rs @@ -27,14 +27,14 @@ pub enum CacheError { #[error("Invalid cache config")] InvalidCacheConfig(String), #[error("I/O error when serving from cache {0}")] - DiskError(#[from] anyhow::Error), + FoyerError(#[from] foyer::FoyerError), } impl ChromaError for CacheError { fn code(&self) -> ErrorCodes { match self { CacheError::InvalidCacheConfig(_) => ErrorCodes::InvalidArgument, - CacheError::DiskError(_) => ErrorCodes::Unavailable, + CacheError::FoyerError(_) => ErrorCodes::Unavailable, } } } @@ -76,7 +76,6 @@ where } async fn remove(&self, key: &K); async fn clear(&self) -> Result<(), CacheError>; - async fn obtain(&self, key: K) -> Result, CacheError>; } /// A persistent cache extends the traits of a cache to require StorageKey and StorageValue. diff --git a/rust/cache/src/nop.rs b/rust/cache/src/nop.rs index 8a86bff48de..a1e428be82a 100644 --- a/rust/cache/src/nop.rs +++ b/rust/cache/src/nop.rs @@ -22,10 +22,6 @@ where async fn clear(&self) -> Result<(), CacheError> { Ok(()) } - - async fn obtain(&self, _: K) -> Result, CacheError> { - Ok(None) - } } impl Debug for NopCache { diff --git a/rust/cache/src/unbounded.rs b/rust/cache/src/unbounded.rs index 02c5bfde799..a844d4640cb 100644 --- a/rust/cache/src/unbounded.rs +++ b/rust/cache/src/unbounded.rs @@ -65,12 +65,6 @@ where self.cache.write().clear(); Ok(()) } - - async fn obtain(&self, key: K) -> Result, CacheError> { - let read_guard = self.cache.read(); - let value = read_guard.get(&key); - Ok(value.cloned()) - } } impl Debug for UnboundedCache