From 8ee66fc3bf9a99910b48c9c5e78dd7ff9d2f1fa1 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Tue, 24 Sep 2024 09:41:01 +0000 Subject: [PATCH 1/3] fix: hold runtime ref and handle to prevent spawn after shutdown Signed-off-by: MrCroxx --- foyer-common/src/runtime.rs | 82 +++++++++++++++++++++++++ foyer-memory/src/cache.rs | 3 +- foyer-memory/src/generic.rs | 13 +++- foyer-storage/src/large/flusher.rs | 10 ++-- foyer-storage/src/large/generic.rs | 35 ++++------- foyer-storage/src/large/reclaimer.rs | 14 ++--- foyer-storage/src/large/recover.rs | 7 ++- foyer-storage/src/lib.rs | 1 + foyer-storage/src/prelude.rs | 3 +- foyer-storage/src/runtime.rs | 89 ++++++++++++++++++++++++++++ foyer-storage/src/store.rs | 84 ++++++++------------------ foyer/src/hybrid/cache.rs | 7 ++- foyer/src/prelude.rs | 4 +- 13 files changed, 244 insertions(+), 108 deletions(-) create mode 100644 foyer-storage/src/runtime.rs diff --git a/foyer-common/src/runtime.rs b/foyer-common/src/runtime.rs index 703171b2..40e5cce3 100644 --- a/foyer-common/src/runtime.rs +++ b/foyer-common/src/runtime.rs @@ -18,6 +18,10 @@ use std::{ ops::{Deref, DerefMut}, }; +use std::future::Future; + +use tokio::{runtime::Handle, task::JoinHandle}; + use tokio::runtime::Runtime; /// A wrapper around [`Runtime`] that shuts down the runtime in the background when dropped. @@ -62,3 +66,81 @@ impl From for BackgroundShutdownRuntime { Self(ManuallyDrop::new(runtime)) } } + +/// A non-clonable runtime handle. +#[derive(Debug)] +pub struct SingletonHandle(Handle); + +impl From for SingletonHandle { + fn from(handle: Handle) -> Self { + Self(handle) + } +} + +impl SingletonHandle { + /// Spawns a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// The provided future will start running in the background immediately + /// when `spawn` is called, even if you don't await the returned + /// `JoinHandle`. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Spawn a future onto the runtime using the handle + /// handle.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.0.spawn(future) + } + + /// Runs the provided function on an executor dedicated to blocking + /// operations. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Spawn a blocking function onto the runtime using the handle + /// handle.spawn_blocking(|| { + /// println!("now running on a worker thread"); + /// }); + /// # } + pub fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.0.spawn_blocking(func) + } +} diff --git a/foyer-memory/src/cache.rs b/foyer-memory/src/cache.rs index 8e50ca02..6d8fc3c1 100644 --- a/foyer-memory/src/cache.rs +++ b/foyer-memory/src/cache.rs @@ -19,6 +19,7 @@ use foyer_common::{ code::{HashBuilder, Key, Value}, event::EventListener, future::Diversion, + runtime::SingletonHandle, }; use futures::Future; use pin_project::pin_project; @@ -834,7 +835,7 @@ where key: K, context: CacheContext, fetch: F, - runtime: &tokio::runtime::Handle, + runtime: &SingletonHandle, ) -> Fetch where F: FnOnce() -> FU, diff --git a/foyer-memory/src/generic.rs b/foyer-memory/src/generic.rs index 8f20b089..8d8d7094 100644 --- a/foyer-memory/src/generic.rs +++ b/foyer-memory/src/generic.rs @@ -35,6 +35,7 @@ use foyer_common::{ future::{Diversion, DiversionFuture}, metrics::Metrics, object_pool::ObjectPool, + runtime::SingletonHandle, strict_assert, strict_assert_eq, }; use hashbrown::hash_map::{Entry as HashMapEntry, HashMap}; @@ -739,7 +740,12 @@ where FU: Future> + Send + 'static, ER: Send + 'static + Debug, { - self.fetch_inner(key, CacheContext::default(), fetch, &tokio::runtime::Handle::current()) + self.fetch_inner( + key, + CacheContext::default(), + fetch, + &tokio::runtime::Handle::current().into(), + ) } pub fn fetch_with_context( @@ -753,15 +759,16 @@ where FU: Future> + Send + 'static, ER: Send + 'static + Debug, { - self.fetch_inner(key, context, fetch, &tokio::runtime::Handle::current()) + self.fetch_inner(key, context, fetch, &tokio::runtime::Handle::current().into()) } + #[doc(hidden)] pub fn fetch_inner( self: &Arc, key: K, context: CacheContext, fetch: F, - runtime: &tokio::runtime::Handle, + runtime: &SingletonHandle, ) -> GenericFetch where F: FnOnce() -> FU, diff --git a/foyer-storage/src/large/flusher.rs b/foyer-storage/src/large/flusher.rs index 5bff8c7f..7a179701 100644 --- a/foyer-storage/src/large/flusher.rs +++ b/foyer-storage/src/large/flusher.rs @@ -24,10 +24,7 @@ use foyer_common::{ }; use foyer_memory::CacheEntry; use futures::future::{try_join, try_join_all}; -use tokio::{ - runtime::Handle, - sync::{oneshot, OwnedSemaphorePermit, Semaphore}, -}; +use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore}; use super::{ batch::{Batch, BatchMut, InvalidStats, TombstoneInfo}, @@ -41,6 +38,7 @@ use crate::{ device::MonitoredDevice, error::{Error, Result}, region::RegionManager, + runtime::Runtime, Compression, Statistics, }; @@ -138,7 +136,7 @@ where tombstone_log: Option, stats: Arc, metrics: Arc, - runtime: Handle, + runtime: &Runtime, ) -> Result { let (tx, rx) = flume::unbounded(); @@ -164,7 +162,7 @@ where metrics: metrics.clone(), }; - runtime.spawn(async move { + runtime.write().spawn(async move { if let Err(e) = runner.run().await { tracing::error!("[flusher]: flusher exit with error: {e}"); } diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index ae2d710d..bdf73fd8 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -32,7 +32,7 @@ use foyer_common::{ }; use foyer_memory::CacheEntry; use futures::future::{join_all, try_join_all}; -use tokio::{runtime::Handle, sync::Semaphore}; +use tokio::sync::Semaphore; use super::{ batch::InvalidStats, @@ -52,6 +52,7 @@ use crate::{ }, picker::{EvictionPicker, ReinsertionPicker}, region::RegionManager, + runtime::Runtime, serde::EntryDeserializer, statistics::Statistics, storage::Storage, @@ -79,9 +80,7 @@ where pub reinsertion_picker: Arc>, pub tombstone_log_config: Option, pub statistics: Arc, - pub read_runtime_handle: Handle, - pub write_runtime_handle: Handle, - pub user_runtime_handle: Handle, + pub runtime: Runtime, pub marker: PhantomData<(V, S)>, } @@ -108,8 +107,7 @@ where .field("reinsertion_pickers", &self.reinsertion_picker) .field("tombstone_log_config", &self.tombstone_log_config) .field("statistics", &self.statistics) - .field("read_runtime_handle", &self.read_runtime_handle) - .field("write_runtime_handle", &self.write_runtime_handle) + .field("runtime", &self.runtime) .finish() } } @@ -153,9 +151,7 @@ where sequence: AtomicSequence, - _read_runtime_handle: Handle, - write_runtime_handle: Handle, - _user_runtime_handle: Handle, + runtime: Runtime, active: AtomicBool, @@ -225,7 +221,7 @@ where ®ion_manager, &tombstones, metrics.clone(), - config.user_runtime_handle.clone(), + config.runtime.clone(), ) .await?; @@ -238,7 +234,7 @@ where tombstone_log.clone(), stats.clone(), metrics.clone(), - config.write_runtime_handle.clone(), + &config.runtime, ) .await })) @@ -254,7 +250,7 @@ where stats.clone(), config.flush, metrics.clone(), - config.write_runtime_handle.clone(), + &config.runtime, ) .await })) @@ -270,9 +266,7 @@ where statistics: stats, flush: config.flush, sequence, - _read_runtime_handle: config.read_runtime_handle, - write_runtime_handle: config.write_runtime_handle, - _user_runtime_handle: config.user_runtime_handle, + runtime: config.runtime, active: AtomicBool::new(true), metrics, }), @@ -390,7 +384,7 @@ where }); let this = self.clone(); - self.inner.write_runtime_handle.spawn(async move { + self.inner.runtime.write().spawn(async move { let sequence = this.inner.sequence.fetch_add(1, Ordering::Relaxed); this.inner.flushers[sequence as usize % this.inner.flushers.len()].submit(Submission::Tombstone { tombstone: Tombstone { hash, sequence }, @@ -497,6 +491,7 @@ mod tests { use ahash::RandomState; use foyer_memory::{Cache, CacheBuilder, FifoConfig}; use itertools::Itertools; + use tokio::runtime::Handle; use super::*; use crate::{ @@ -560,9 +555,7 @@ mod tests { tombstone_log_config: None, buffer_threshold: 16 * 1024 * 1024, statistics: Arc::::default(), - read_runtime_handle: Handle::current(), - write_runtime_handle: Handle::current(), - user_runtime_handle: Handle::current(), + runtime: Runtime::new(None, None, Handle::current()), marker: PhantomData, }; GenericLargeStorage::open(config).await.unwrap() @@ -591,9 +584,7 @@ mod tests { tombstone_log_config: Some(TombstoneLogConfigBuilder::new(path).with_flush(true).build()), buffer_threshold: 16 * 1024 * 1024, statistics: Arc::::default(), - read_runtime_handle: Handle::current(), - write_runtime_handle: Handle::current(), - user_runtime_handle: Handle::current(), + runtime: Runtime::new(None, None, Handle::current()), marker: PhantomData, }; GenericLargeStorage::open(config).await.unwrap() diff --git a/foyer-storage/src/large/reclaimer.rs b/foyer-storage/src/large/reclaimer.rs index b7a6b249..dafbff26 100644 --- a/foyer-storage/src/large/reclaimer.rs +++ b/foyer-storage/src/large/reclaimer.rs @@ -20,10 +20,7 @@ use foyer_common::{ }; use futures::future::join_all; use itertools::Itertools; -use tokio::{ - runtime::Handle, - sync::{mpsc, oneshot, Semaphore, SemaphorePermit}, -}; +use tokio::sync::{mpsc, oneshot, Semaphore, SemaphorePermit}; use crate::{ device::IO_BUFFER_ALLOCATOR, @@ -36,6 +33,7 @@ use crate::{ }, picker::ReinsertionPicker, region::{Region, RegionManager}, + runtime::Runtime, statistics::Statistics, IoBytes, }; @@ -56,7 +54,7 @@ impl Reclaimer { stats: Arc, flush: bool, metrics: Arc, - runtime: Handle, + runtime: &Runtime, ) -> Self where K: StorageKey, @@ -78,7 +76,7 @@ impl Reclaimer { runtime: runtime.clone(), }; - let _handle = runtime.spawn(async move { runner.run().await }); + let _handle = runtime.write().spawn(async move { runner.run().await }); Self { wait_tx } } @@ -116,7 +114,7 @@ where wait_rx: mpsc::UnboundedReceiver>, - runtime: Handle, + runtime: Runtime, } impl ReclaimRunner @@ -223,7 +221,7 @@ where let unpicked_count = unpicked.len(); let waits = self.flushers.iter().map(|flusher| flusher.wait()).collect_vec(); - self.runtime.spawn(async move { + self.runtime.write().spawn(async move { join_all(waits).await; }); self.indexer.remove_batch(&unpicked); diff --git a/foyer-storage/src/large/recover.rs b/foyer-storage/src/large/recover.rs index 74ed3bcb..007258a0 100644 --- a/foyer-storage/src/large/recover.rs +++ b/foyer-storage/src/large/recover.rs @@ -27,7 +27,7 @@ use foyer_common::{ use futures::future::try_join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use tokio::{runtime::Handle, sync::Semaphore}; +use tokio::sync::Semaphore; use super::{ generic::GenericLargeStorageConfig, @@ -43,6 +43,7 @@ use crate::{ tombstone::Tombstone, }, region::{Region, RegionManager}, + runtime::Runtime, }; /// The recover mode of the disk cache. @@ -72,7 +73,7 @@ impl RecoverRunner { region_manager: &RegionManager, tombstones: &[Tombstone], metrics: Arc, - runtime: Handle, + runtime: Runtime, ) -> Result<()> where K: StorageKey, @@ -86,7 +87,7 @@ impl RecoverRunner { let semaphore = semaphore.clone(); let region = region_manager.region(id).clone(); let metrics = metrics.clone(); - runtime.spawn(async move { + runtime.user().spawn(async move { let permit = semaphore.acquire().await; let res = RegionRecoverRunner::run(mode, region, metrics).await; drop(permit); diff --git a/foyer-storage/src/lib.rs b/foyer-storage/src/lib.rs index 2aac3b26..ceb30ebb 100644 --- a/foyer-storage/src/lib.rs +++ b/foyer-storage/src/lib.rs @@ -27,6 +27,7 @@ mod io_buffer_pool; mod large; mod picker; mod region; +mod runtime; mod serde; mod small; mod statistics; diff --git a/foyer-storage/src/prelude.rs b/foyer-storage/src/prelude.rs index 2744dee1..c2564896 100644 --- a/foyer-storage/src/prelude.rs +++ b/foyer-storage/src/prelude.rs @@ -30,7 +30,8 @@ pub use crate::{ utils::{AdmitAllPicker, FifoPicker, InvalidRatioPicker, RateLimitPicker, RejectAllPicker}, AdmissionPicker, EvictionPicker, ReinsertionPicker, }, + runtime::Runtime, statistics::Statistics, storage::{either::Order, Storage}, - store::{CombinedConfig, DeviceConfig, RuntimeConfig, RuntimeHandles, Store, StoreBuilder, TokioRuntimeConfig}, + store::{CombinedConfig, DeviceConfig, RuntimeConfig, Store, StoreBuilder, TokioRuntimeConfig}, }; diff --git a/foyer-storage/src/runtime.rs b/foyer-storage/src/runtime.rs new file mode 100644 index 00000000..fc361cb5 --- /dev/null +++ b/foyer-storage/src/runtime.rs @@ -0,0 +1,89 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use foyer_common::runtime::{BackgroundShutdownRuntime, SingletonHandle}; +use tokio::runtime::Handle; + +#[derive(Debug)] +struct RuntimeInner { + _read_runtime: Option>, + _write_runtime: Option>, + + read_runtime_handle: SingletonHandle, + write_runtime_handle: SingletonHandle, + user_runtime_handle: SingletonHandle, +} + +/// [`Runtime`] holds the runtime reference and non-clonable handles to prevent handle usage after runtime shutdown. +#[derive(Debug, Clone)] +pub struct Runtime { + inner: Arc, +} + +impl Runtime { + /// Create a new runtime with runtimes if given. + pub fn new( + read_runtime: Option>, + write_runtime: Option>, + user_runtime_handle: Handle, + ) -> Self { + let read_runtime_handle = read_runtime + .as_ref() + .map(|rt| rt.handle().clone()) + .unwrap_or(user_runtime_handle.clone()); + let write_runtime_handle = write_runtime + .as_ref() + .map(|rt| rt.handle().clone()) + .unwrap_or(user_runtime_handle.clone()); + Self { + inner: Arc::new(RuntimeInner { + _read_runtime: read_runtime, + _write_runtime: write_runtime, + read_runtime_handle: read_runtime_handle.into(), + write_runtime_handle: write_runtime_handle.into(), + user_runtime_handle: user_runtime_handle.into(), + }), + } + } + + /// Create a new runtime with current runtime env only. + pub fn current() -> Self { + Self { + inner: Arc::new(RuntimeInner { + _read_runtime: None, + _write_runtime: None, + read_runtime_handle: Handle::current().into(), + write_runtime_handle: Handle::current().into(), + user_runtime_handle: Handle::current().into(), + }), + } + } + + /// Get the non-clonable read runtime handle. + pub fn read(&self) -> &SingletonHandle { + &self.inner.read_runtime_handle + } + + /// Get the non-clonable write runtime handle. + pub fn write(&self) -> &SingletonHandle { + &self.inner.write_runtime_handle + } + + /// Get the non-clonable user runtime handle. + pub fn user(&self) -> &SingletonHandle { + &self.inner.user_runtime_handle + } +} diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index e9ecb23e..80dd8598 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -38,6 +38,7 @@ use crate::{ utils::{AdmitAllPicker, FifoPicker, InvalidRatioPicker, RejectAllPicker}, AdmissionPicker, EvictionPicker, ReinsertionPicker, }, + runtime::Runtime, serde::EntrySerializer, small::generic::GenericSmallStorageConfig, statistics::Statistics, @@ -72,12 +73,7 @@ where compression: Compression, - read_runtime: Option>, - write_runtime: Option>, - - read_runtime_handle: Handle, - write_runtime_handle: Handle, - user_runtime_handle: Handle, + runtime: Runtime, statistics: Arc, metrics: Arc, @@ -95,11 +91,7 @@ where .field("engine", &self.inner.engine) .field("admission_picker", &self.inner.admission_picker) .field("compression", &self.inner.compression) - .field("read_runtime", &self.inner.read_runtime) - .field("write_runtime", &self.inner.write_runtime) - .field("read_runtime_handle", &self.inner.read_runtime_handle) - .field("write_runtime_handle", &self.inner.write_runtime_handle) - .field("user_runtime_handle", &self.inner.user_runtime_handle) + .field("runtimes", &self.inner.runtime) .finish() } } @@ -156,7 +148,7 @@ where { let hash = self.inner.memory.hash(key); let future = self.inner.engine.load(hash); - match self.inner.read_runtime_handle.spawn(future).await.unwrap() { + match self.inner.runtime.read().spawn(future).await.unwrap() { Ok(Some((k, v))) if k.borrow() == key => Ok(Some((k, v))), Ok(_) => Ok(None), Err(e) => Err(e), @@ -196,12 +188,17 @@ where } /// Get the runtime handles. - pub fn runtimes(&self) -> RuntimeHandles<'_> { - RuntimeHandles { - read_runtime_handle: &self.inner.read_runtime_handle, - write_runtime_handle: &self.inner.write_runtime_handle, - user_runtime_handle: &self.inner.user_runtime_handle, - } + #[deprecated( + since = "0.11.5", + note = "The function will be renamed to \"runtime()\", use it instead." + )] + pub fn runtimes(&self) -> &Runtime { + &self.inner.runtime + } + + /// Get the runtime. + pub fn runtime(&self) -> &Runtime { + &self.inner.runtime } } @@ -311,16 +308,6 @@ pub enum RuntimeConfig { }, } -/// Runtime handles. -pub struct RuntimeHandles<'a> { - /// Runtime handle for reads. - pub read_runtime_handle: &'a Handle, - /// Runtime handle for writes. - pub write_runtime_handle: &'a Handle, - /// User runtime handle. - pub user_runtime_handle: &'a Handle, -} - /// The builder of the disk cache. pub struct StoreBuilder where @@ -582,19 +569,15 @@ where Ok::<_, Error>(Arc::new(runtime)) }; - let (read_runtime, write_runtime, read_runtime_handle, write_runtime_handle) = match self.runtime_config { + let user_runtime_handle = Handle::current(); + let (read_runtime, write_runtime) = match self.runtime_config { RuntimeConfig::Disabled => { tracing::warn!("[store]: Dedicated runtime is disabled"); - (None, None, Handle::current(), Handle::current()) + (None, None) } RuntimeConfig::Unified(runtime_config) => { let runtime = build_runtime(&runtime_config, "unified")?; - ( - Some(runtime.clone()), - Some(runtime.clone()), - runtime.handle().clone(), - runtime.handle().clone(), - ) + (Some(runtime.clone()), Some(runtime.clone())) } RuntimeConfig::Separated { read_runtime_config, @@ -602,24 +585,15 @@ where } => { let read_runtime = build_runtime(&read_runtime_config, "read")?; let write_runtime = build_runtime(&write_runtime_config, "write")?; - let read_runtime_handle = read_runtime.handle().clone(); - let write_runtime_handle = write_runtime.handle().clone(); - ( - Some(read_runtime), - Some(write_runtime), - read_runtime_handle, - write_runtime_handle, - ) + (Some(read_runtime), Some(write_runtime)) } }; - let user_runtime_handle = Handle::current(); + let runtime = Runtime::new(read_runtime, write_runtime, user_runtime_handle); let engine = { let statistics = statistics.clone(); let metrics = metrics.clone(); - let write_runtime_handle = write_runtime_handle.clone(); - let read_runtime_handle = read_runtime_handle.clone(); - let user_runtime_handle = user_runtime_handle.clone(); + let runtime = runtime.clone(); // Use the user runtime to open engine. tokio::spawn(async move { match self.device_config { @@ -658,9 +632,7 @@ where tombstone_log_config: self.tombstone_log_config, buffer_threshold: self.buffer_pool_size, statistics: statistics.clone(), - write_runtime_handle, - read_runtime_handle, - user_runtime_handle, + runtime, marker: PhantomData, })) .await @@ -701,9 +673,7 @@ where tombstone_log_config: self.tombstone_log_config, buffer_threshold: self.buffer_pool_size, statistics: statistics.clone(), - write_runtime_handle, - read_runtime_handle, - user_runtime_handle, + runtime, marker: PhantomData, }, load_order, @@ -721,11 +691,7 @@ where engine, admission_picker, compression, - read_runtime, - write_runtime, - read_runtime_handle, - write_runtime_handle, - user_runtime_handle, + runtime, statistics, metrics, }; diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index 9baf52c1..5dbc3e20 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -485,7 +485,7 @@ where context, || { let metrics = self.metrics.clone(); - let user_runtime_handle = self.storage().runtimes().user_runtime_handle.clone(); + let runtime = self.storage().runtime().clone(); async move { match store.load(&key).await.map_err(anyhow::Error::from) { @@ -502,7 +502,8 @@ where metrics.hybrid_miss.increment(1); metrics.hybrid_miss_duration.record(now.elapsed()); - user_runtime_handle + runtime + .user() .spawn( future .map(|res| Diversion { @@ -515,7 +516,7 @@ where .unwrap() } }, - self.storage().runtimes().read_runtime_handle, + self.storage().runtime().read(), ); if inner.state() == FetchState::Hit { diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index fd470ffd..d8af4d38 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -27,8 +27,8 @@ pub use storage::{ AdmissionPicker, AdmitAllPicker, Compression, Dev, DevExt, DevOptions, DeviceStats, DirectFileDevice, DirectFileDeviceOptions, DirectFileDeviceOptionsBuilder, DirectFsDevice, DirectFsDeviceOptions, DirectFsDeviceOptionsBuilder, EvictionPicker, FifoPicker, InvalidRatioPicker, RateLimitPicker, RecoverMode, - ReinsertionPicker, RejectAllPicker, RuntimeConfig, RuntimeHandles, Storage, Store, StoreBuilder, - TokioRuntimeConfig, TombstoneLogConfigBuilder, + ReinsertionPicker, RejectAllPicker, Runtime, RuntimeConfig, Storage, Store, StoreBuilder, TokioRuntimeConfig, + TombstoneLogConfigBuilder, }; pub use crate::hybrid::{ From 71489ac779d2a21d260618d40df23d813f0497f1 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Tue, 24 Sep 2024 09:44:29 +0000 Subject: [PATCH 2/3] fix: make ffmt happy Signed-off-by: MrCroxx --- foyer-common/src/runtime.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/foyer-common/src/runtime.rs b/foyer-common/src/runtime.rs index 40e5cce3..d2b269c2 100644 --- a/foyer-common/src/runtime.rs +++ b/foyer-common/src/runtime.rs @@ -14,15 +14,15 @@ use std::{ fmt::Debug, + future::Future, mem::ManuallyDrop, ops::{Deref, DerefMut}, }; -use std::future::Future; - -use tokio::{runtime::Handle, task::JoinHandle}; - -use tokio::runtime::Runtime; +use tokio::{ + runtime::{Handle, Runtime}, + task::JoinHandle, +}; /// A wrapper around [`Runtime`] that shuts down the runtime in the background when dropped. /// From ab65b31e1a9ab56d372294a357c4f96233c7eba5 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Tue, 24 Sep 2024 13:05:58 +0000 Subject: [PATCH 3/3] fix: make device hold runtime, too Signed-off-by: MrCroxx --- foyer-common/src/asyncify.rs | 8 +++---- foyer-storage/src/device/direct_file.rs | 20 ++++++++--------- foyer-storage/src/device/direct_fs.rs | 25 ++++++++++----------- foyer-storage/src/device/mod.rs | 11 +++++----- foyer-storage/src/device/monitor.rs | 10 ++++----- foyer-storage/src/large/generic.rs | 29 +++++++++++++++---------- foyer-storage/src/large/scanner.rs | 24 +++++++++++--------- foyer-storage/src/large/tombstone.rs | 25 ++++++++++++++------- foyer-storage/src/store.rs | 2 +- 9 files changed, 85 insertions(+), 69 deletions(-) diff --git a/foyer-common/src/asyncify.rs b/foyer-common/src/asyncify.rs index 9dacfb5b..ba441b49 100644 --- a/foyer-common/src/asyncify.rs +++ b/foyer-common/src/asyncify.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use tokio::runtime::Handle; +use crate::runtime::SingletonHandle; /// Convert the block call to async call. #[cfg(not(madsim))] @@ -36,9 +36,9 @@ where f() } -/// Convert the block call to async call with given runtime. +/// Convert the block call to async call with given runtime handle. #[cfg(not(madsim))] -pub async fn asyncify_with_runtime(runtime: &Handle, f: F) -> T +pub async fn asyncify_with_runtime(runtime: &SingletonHandle, f: F) -> T where F: FnOnce() -> T + Send + 'static, T: Send + 'static, @@ -50,7 +50,7 @@ where /// Convert the block call to async call with given runtime. /// /// madsim compatible mode. -pub async fn asyncify_with_runtime(_: &Handle, f: F) -> T +pub async fn asyncify_with_runtime(_: &SingletonHandle, f: F) -> T where F: FnOnce() -> T + Send + 'static, T: Send + 'static, diff --git a/foyer-storage/src/device/direct_file.rs b/foyer-storage/src/device/direct_file.rs index 6b8170f3..ec02e2c2 100644 --- a/foyer-storage/src/device/direct_file.rs +++ b/foyer-storage/src/device/direct_file.rs @@ -21,13 +21,12 @@ use std::{ use foyer_common::{asyncify::asyncify_with_runtime, bits}; use fs4::free_space; use serde::{Deserialize, Serialize}; -use tokio::runtime::Handle; use super::{Dev, DevExt, DevOptions, RegionId}; use crate::{ device::ALIGN, error::{Error, Result}, - IoBytes, IoBytesMut, + IoBytes, IoBytesMut, Runtime, }; /// Options for the direct file device. @@ -49,7 +48,7 @@ pub struct DirectFileDevice { capacity: usize, region_size: usize, - runtime: Handle, + runtime: Runtime, } impl DevOptions for DirectFileDeviceOptions { @@ -90,7 +89,7 @@ impl DirectFileDevice { let file = self.file.clone(); - asyncify_with_runtime(&self.runtime, move || { + asyncify_with_runtime(self.runtime.write(), move || { #[cfg(target_family = "windows")] let written = { use std::os::windows::fs::FileExt; @@ -133,7 +132,7 @@ impl DirectFileDevice { let file = self.file.clone(); - let mut buffer = asyncify_with_runtime(&self.runtime, move || { + let mut buffer = asyncify_with_runtime(self.runtime.read(), move || { #[cfg(target_family = "windows")] let read = { use std::os::windows::fs::FileExt; @@ -172,9 +171,7 @@ impl Dev for DirectFileDevice { } #[fastrace::trace(name = "foyer::storage::device::direct_file::open")] - async fn open(options: Self::Options) -> Result { - let runtime = Handle::current(); - + async fn open(options: Self::Options, runtime: Runtime) -> Result { options.verify()?; let dir = options @@ -253,7 +250,7 @@ impl Dev for DirectFileDevice { #[fastrace::trace(name = "foyer::storage::device::direct_file::flush")] async fn flush(&self, _: Option) -> Result<()> { let file = self.file.clone(); - asyncify_with_runtime(&self.runtime, move || file.sync_all().map_err(Error::from)).await + asyncify_with_runtime(self.runtime.write(), move || file.sync_all().map_err(Error::from)).await } } @@ -360,6 +357,7 @@ mod tests { #[test_log::test(tokio::test)] async fn test_direct_file_device_io() { let dir = tempfile::tempdir().unwrap(); + let runtime = Runtime::current(); let options = DirectFileDeviceOptionsBuilder::new(dir.path().join("test-direct-file")) .with_capacity(4 * 1024 * 1024) @@ -368,7 +366,7 @@ mod tests { tracing::debug!("{options:?}"); - let device = DirectFileDevice::open(options.clone()).await.unwrap(); + let device = DirectFileDevice::open(options.clone(), runtime.clone()).await.unwrap(); let mut buf = IoBytesMut::with_capacity(64 * 1024); buf.extend(repeat_n(b'x', 64 * 1024 - 100)); @@ -383,7 +381,7 @@ mod tests { drop(device); - let device = DirectFileDevice::open(options).await.unwrap(); + let device = DirectFileDevice::open(options, runtime).await.unwrap(); let b = device.read(0, 4096, 64 * 1024 - 100).await.unwrap().freeze(); assert_eq!(buf, b); diff --git a/foyer-storage/src/device/direct_fs.rs b/foyer-storage/src/device/direct_fs.rs index 307e8e7b..94fc3f11 100644 --- a/foyer-storage/src/device/direct_fs.rs +++ b/foyer-storage/src/device/direct_fs.rs @@ -23,13 +23,12 @@ use fs4::free_space; use futures::future::try_join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use tokio::runtime::Handle; use super::{Dev, DevExt, DevOptions, RegionId}; use crate::{ device::ALIGN, error::{Error, Result}, - IoBytes, IoBytesMut, + IoBytes, IoBytesMut, Runtime, }; /// Options for the direct fs device. @@ -56,7 +55,7 @@ struct DirectFsDeviceInner { capacity: usize, file_size: usize, - runtime: Handle, + runtime: Runtime, } impl DevOptions for DirectFsDeviceOptions { @@ -106,17 +105,16 @@ impl Dev for DirectFsDevice { } #[fastrace::trace(name = "foyer::storage::device::direct_fs::open")] - async fn open(options: Self::Options) -> Result { - let runtime = Handle::current(); - + async fn open(options: Self::Options, runtime: Runtime) -> Result { options.verify()?; // TODO(MrCroxx): write and read options to a manifest file for pinning let regions = options.capacity / options.file_size; - let path = options.dir.clone(); - asyncify_with_runtime(&runtime, move || create_dir_all(path)).await?; + if !options.dir.exists() { + create_dir_all(&options.dir)?; + } let futures = (0..regions) .map(|i| { @@ -165,7 +163,7 @@ impl Dev for DirectFsDevice { let file = self.file(region).clone(); - asyncify_with_runtime(&self.inner.runtime, move || { + asyncify_with_runtime(self.inner.runtime.write(), move || { #[cfg(target_family = "windows")] let written = { use std::os::windows::fs::FileExt; @@ -207,7 +205,7 @@ impl Dev for DirectFsDevice { let file = self.file(region).clone(); - let mut buffer = asyncify_with_runtime(&self.inner.runtime, move || { + let mut buffer = asyncify_with_runtime(self.inner.runtime.read(), move || { #[cfg(target_family = "unix")] let read = { use std::os::unix::fs::FileExt; @@ -237,7 +235,7 @@ impl Dev for DirectFsDevice { async fn flush(&self, region: Option) -> Result<()> { let flush = |region: RegionId| { let file = self.file(region).clone(); - asyncify_with_runtime(&self.inner.runtime, move || file.sync_all().map_err(Error::from)) + asyncify_with_runtime(self.inner.runtime.write(), move || file.sync_all().map_err(Error::from)) }; if let Some(region) = region { @@ -352,6 +350,7 @@ mod tests { #[test_log::test(tokio::test)] async fn test_direct_fd_device_io() { let dir = tempfile::tempdir().unwrap(); + let runtime = Runtime::current(); let options = DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(4 * 1024 * 1024) @@ -360,7 +359,7 @@ mod tests { tracing::debug!("{options:?}"); - let device = DirectFsDevice::open(options.clone()).await.unwrap(); + let device = DirectFsDevice::open(options.clone(), runtime.clone()).await.unwrap(); let mut buf = IoBytesMut::with_capacity(64 * 1024); buf.extend(repeat_n(b'x', 64 * 1024 - 100)); @@ -375,7 +374,7 @@ mod tests { drop(device); - let device = DirectFsDevice::open(options).await.unwrap(); + let device = DirectFsDevice::open(options, runtime).await.unwrap(); let b = device.read(0, 4096, 64 * 1024 - 100).await.unwrap().freeze(); assert_eq!(buf, b); diff --git a/foyer-storage/src/device/mod.rs b/foyer-storage/src/device/mod.rs index 23faa656..26dfe5f7 100644 --- a/foyer-storage/src/device/mod.rs +++ b/foyer-storage/src/device/mod.rs @@ -25,7 +25,7 @@ use monitor::Monitored; use crate::{ error::Result, DirectFileDevice, DirectFileDeviceOptions, DirectFsDevice, DirectFsDeviceOptions, IoBytes, - IoBytesMut, + IoBytesMut, Runtime, }; pub const ALIGN: usize = 4096; @@ -52,9 +52,10 @@ pub trait Dev: Send + Sync + 'static + Sized + Clone + Debug { /// The region size of the device, must be 4K aligned. fn region_size(&self) -> usize; + // TODO(MrCroxx): Refactor the builder. /// Open the device with the given options. #[must_use] - fn open(options: Self::Options) -> impl Future> + Send; + fn open(options: Self::Options, runtime: Runtime) -> impl Future> + Send; /// Write API for the device. #[must_use] @@ -134,10 +135,10 @@ impl Dev for Device { } } - async fn open(options: Self::Options) -> Result { + async fn open(options: Self::Options, runtime: Runtime) -> Result { match options { - DeviceOptions::DirectFile(opts) => Ok(Self::DirectFile(DirectFileDevice::open(opts).await?)), - DeviceOptions::DirectFs(opts) => Ok(Self::DirectFs(DirectFsDevice::open(opts).await?)), + DeviceOptions::DirectFile(opts) => Ok(Self::DirectFile(DirectFileDevice::open(opts, runtime).await?)), + DeviceOptions::DirectFs(opts) => Ok(Self::DirectFs(DirectFsDevice::open(opts, runtime).await?)), } } diff --git a/foyer-storage/src/device/monitor.rs b/foyer-storage/src/device/monitor.rs index 35354ace..3e338d1d 100644 --- a/foyer-storage/src/device/monitor.rs +++ b/foyer-storage/src/device/monitor.rs @@ -24,7 +24,7 @@ use std::{ use foyer_common::{bits, metrics::Metrics}; use super::RegionId; -use crate::{error::Result, Dev, DevExt, DevOptions, DirectFileDevice, IoBytes, IoBytesMut}; +use crate::{error::Result, Dev, DevExt, DevOptions, DirectFileDevice, IoBytes, IoBytesMut, Runtime}; /// The statistics information of the device. #[derive(Debug, Default)] @@ -87,8 +87,8 @@ impl Monitored where D: Dev, { - async fn open(options: MonitoredOptions) -> Result { - let device = D::open(options.options).await?; + async fn open(options: MonitoredOptions, runtime: Runtime) -> Result { + let device = D::open(options.options, runtime).await?; Ok(Self { device, stats: Arc::default(), @@ -159,8 +159,8 @@ where self.device.region_size() } - async fn open(options: Self::Options) -> Result { - Self::open(options).await + async fn open(options: Self::Options, runtime: Runtime) -> Result { + Self::open(options, runtime).await } async fn write(&self, buf: IoBytes, region: RegionId, offset: u64) -> Result<()> { diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index bdf73fd8..489c6be0 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -186,13 +186,14 @@ where let mut tombstones = vec![]; let tombstone_log = match &config.tombstone_log_config { None => None, - Some(config) => { + Some(tombstone_log_config) => { let log = TombstoneLog::open( - &config.path, + &tombstone_log_config.path, device.clone(), - config.flush, + tombstone_log_config.flush, &mut tombstones, metrics.clone(), + config.runtime.clone(), ) .await?; Some(log) @@ -514,15 +515,19 @@ mod tests { } async fn device_for_test(dir: impl AsRef) -> MonitoredDevice { - Monitored::open(MonitoredOptions { - options: DirectFsDeviceOptions { - dir: dir.as_ref().into(), - capacity: 64 * KB, - file_size: 16 * KB, - } - .into(), - metrics: Arc::new(Metrics::new("test")), - }) + let runtime = Runtime::current(); + Monitored::open( + MonitoredOptions { + options: DirectFsDeviceOptions { + dir: dir.as_ref().into(), + capacity: 64 * KB, + file_size: 16 * KB, + } + .into(), + metrics: Arc::new(Metrics::new("test")), + }, + runtime, + ) .await .unwrap() } diff --git a/foyer-storage/src/large/scanner.rs b/foyer-storage/src/large/scanner.rs index ce5eeb08..20f43309 100644 --- a/foyer-storage/src/large/scanner.rs +++ b/foyer-storage/src/large/scanner.rs @@ -248,21 +248,25 @@ mod tests { Dev, MonitoredDevice, }, region::RegionStats, - DirectFsDeviceOptions, + DirectFsDeviceOptions, Runtime, }; const KB: usize = 1024; async fn device_for_test(dir: impl AsRef) -> MonitoredDevice { - Monitored::open(MonitoredOptions { - options: DirectFsDeviceOptions { - dir: dir.as_ref().into(), - capacity: 64 * KB, - file_size: 16 * KB, - } - .into(), - metrics: Arc::new(Metrics::new("test")), - }) + let runtime = Runtime::current(); + Monitored::open( + MonitoredOptions { + options: DirectFsDeviceOptions { + dir: dir.as_ref().into(), + capacity: 64 * KB, + file_size: 16 * KB, + } + .into(), + metrics: Arc::new(Metrics::new("test")), + }, + runtime, + ) .await .unwrap() } diff --git a/foyer-storage/src/large/tombstone.rs b/foyer-storage/src/large/tombstone.rs index b44a9a6d..09a9ee27 100644 --- a/foyer-storage/src/large/tombstone.rs +++ b/foyer-storage/src/large/tombstone.rs @@ -30,7 +30,7 @@ use crate::{ Dev, DevExt, RegionId, }, error::{Error, Result}, - IoBytesMut, + IoBytesMut, Runtime, }; /// The configurations for the tombstone log. @@ -121,6 +121,7 @@ impl TombstoneLog { flush: bool, tombstones: &mut Vec, metrics: Arc, + runtime: Runtime, ) -> Result where D: Dev, @@ -134,13 +135,16 @@ impl TombstoneLog { // For the alignment is 4K and the slot size is 16B, tombstone log requires 1/256 of the cache device size. let capacity = bits::align_up(align, (cache_device.capacity() / align) * Tombstone::serialized_len()); - let device = Monitored::open(MonitoredOptions { - options: DirectFileDeviceOptionsBuilder::new(path) - .with_region_size(align) - .with_capacity(capacity) - .build(), - metrics, - }) + let device = Monitored::open( + MonitoredOptions { + options: DirectFileDeviceOptionsBuilder::new(path) + .with_region_size(align) + .with_capacity(capacity) + .build(), + metrics, + }, + runtime, + ) .await?; let tasks = bits::align_up(Self::RECOVER_IO_SIZE, capacity) / Self::RECOVER_IO_SIZE; @@ -312,6 +316,8 @@ mod tests { #[test_log::test(tokio::test)] async fn test_tombstone_log() { + let runtime = Runtime::current(); + let dir = tempdir().unwrap(); // 4 MB cache device => 16 KB tombstone log => 1K tombstones @@ -319,6 +325,7 @@ mod tests { DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(4 * 1024 * 1024) .build(), + runtime.clone(), ) .await .unwrap(); @@ -329,6 +336,7 @@ mod tests { true, &mut vec![], Arc::new(Metrics::new("test")), + runtime.clone(), ) .await .unwrap(); @@ -358,6 +366,7 @@ mod tests { true, &mut vec![], Arc::new(Metrics::new("test")), + runtime, ) .await .unwrap(); diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index 80dd8598..88a04d00 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -607,7 +607,7 @@ where let device = match Monitored::open(MonitoredOptions { options, metrics: metrics.clone(), - }) + }, runtime.clone()) .await { Ok(device) => device, Err(e) =>return Err(e),