diff --git a/libs/utils/src/guard_arc_swap.rs b/libs/utils/src/guard_arc_swap.rs new file mode 100644 index 000000000000..cec520246067 --- /dev/null +++ b/libs/utils/src/guard_arc_swap.rs @@ -0,0 +1,54 @@ +//! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes +//! don't block reads. + +use arc_swap::ArcSwap; +use std::sync::Arc; +use tokio::sync::TryLockError; + +pub struct GuardArcSwap { + inner: ArcSwap, + guard: tokio::sync::Mutex<()>, +} + +pub struct Guard<'a, T> { + _guard: tokio::sync::MutexGuard<'a, ()>, + inner: &'a ArcSwap, +} + +impl GuardArcSwap { + pub fn new(inner: T) -> Self { + Self { + inner: ArcSwap::new(Arc::new(inner)), + guard: tokio::sync::Mutex::new(()), + } + } + + pub fn read(&self) -> Arc { + self.inner.load_full() + } + + pub async fn write_guard(&self) -> Guard<'_, T> { + Guard { + _guard: self.guard.lock().await, + inner: &self.inner, + } + } + + pub fn try_write_guard(&self) -> Result, TryLockError> { + let guard = self.guard.try_lock()?; + Ok(Guard { + _guard: guard, + inner: &self.inner, + }) + } +} + +impl Guard<'_, T> { + pub fn read(&self) -> Arc { + self.inner.load_full() + } + + pub fn write(&mut self, value: T) { + self.inner.store(Arc::new(value)); + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 2c56dd750f75..1fb18e9e9a21 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -98,6 +98,8 @@ pub mod try_rcu; pub mod pprof; +pub mod guard_arc_swap; + // Re-export used in macro. Avoids adding git-version as dep in target crates. #[doc(hidden)] pub use git_version; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d6ae11e67d8d..f24611e1d833 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -51,7 +51,9 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{ - fs_ext, pausable_failpoint, + fs_ext, + guard_arc_swap::GuardArcSwap, + pausable_failpoint, postgres_client::PostgresClientProtocol, sync::gate::{Gate, GateGuard}, }; @@ -353,8 +355,8 @@ pub struct Timeline { // though let's keep them both for better error visibility. pub initdb_lsn: Lsn, - /// When did we last calculate the partitioning? Make it pub to test cases. - pub(super) partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>, + /// The repartitioning result. Allows a single writer and multiple readers. + pub(crate) partitioning: GuardArcSwap<((KeyPartitioning, SparseKeyPartitioning), Lsn)>, /// Configuration: how often should the partitioning be recalculated. repartition_threshold: u64, @@ -2340,7 +2342,8 @@ impl Timeline { // initial logical size is 0. LogicalSize::empty_initial() }, - partitioning: tokio::sync::Mutex::new(( + + partitioning: GuardArcSwap::new(( (KeyPartitioning::new(), KeyPartitioning::new().into_sparse()), Lsn(0), )), @@ -4028,18 +4031,15 @@ impl Timeline { flags: EnumSet, ctx: &RequestContext, ) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> { - let Ok(mut partitioning_guard) = self.partitioning.try_lock() else { + let Ok(mut guard) = self.partitioning.try_write_guard() else { // NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline. // The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()` // and hence before the compaction task starts. - // Note that there are a third "caller" that will take the `partitioning` lock. It is `gc_compaction_split_jobs` for - // gc-compaction where it uses the repartition data to determine the split jobs. In the future, it might use its own - // heuristics, but for now, we should allow concurrent access to it and let the caller retry compaction. return Err(CompactionError::Other(anyhow!( - "repartition() called concurrently, this is rare and a retry should be fine" + "repartition() called concurrently" ))); }; - let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard; + let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read(); if lsn < *partition_lsn { return Err(CompactionError::Other(anyhow!( "repartition() called with LSN going backwards, this should not happen" @@ -4067,9 +4067,9 @@ impl Timeline { let sparse_partitioning = SparseKeyPartitioning { parts: vec![sparse_ks], }; // no partitioning for metadata keys for now - *partitioning_guard = ((dense_partitioning, sparse_partitioning), lsn); - - Ok((partitioning_guard.0.clone(), partitioning_guard.1)) + let result = ((dense_partitioning, sparse_partitioning), lsn); + guard.write(result.clone()); + Ok(result) } // Is it time to create a new image layer for the given partition? diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 2042a18e96e1..06a21f6b3c42 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -2146,12 +2146,7 @@ impl Timeline { let mut compact_jobs = Vec::new(); // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning // by estimating the amount of files read for a compaction job. We should also partition on LSN. - let ((dense_ks, sparse_ks), _) = { - let Ok(partition) = self.partitioning.try_lock() else { - bail!("failed to acquire partition lock during gc-compaction"); - }; - partition.clone() - }; + let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone(); // Truncate the key range to be within user specified compaction range. fn truncate_to( source_start: &Key,