Skip to content

Commit

Permalink
refactor(pageserver): make partitioning an ArcSwap (#10377)
Browse files Browse the repository at this point in the history
## Problem

gc-compaction needs the partitioning data to decide the job split. This
refactor allows concurrent access/computing the partitioning.

## Summary of changes

Make `partitioning` an ArcSwap so that others can access the
partitioning while we compute it. Fully eliminate the `repartition is
called concurrently` warning when gc-compaction is going on.

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Jan 16, 2025
1 parent e436dca commit cccc196
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 19 deletions.
54 changes: 54 additions & 0 deletions libs/utils/src/guard_arc_swap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes
//! don't block reads.
use arc_swap::ArcSwap;
use std::sync::Arc;
use tokio::sync::TryLockError;

pub struct GuardArcSwap<T> {
inner: ArcSwap<T>,
guard: tokio::sync::Mutex<()>,
}

pub struct Guard<'a, T> {
_guard: tokio::sync::MutexGuard<'a, ()>,
inner: &'a ArcSwap<T>,
}

impl<T> GuardArcSwap<T> {
pub fn new(inner: T) -> Self {
Self {
inner: ArcSwap::new(Arc::new(inner)),
guard: tokio::sync::Mutex::new(()),
}
}

pub fn read(&self) -> Arc<T> {
self.inner.load_full()
}

pub async fn write_guard(&self) -> Guard<'_, T> {
Guard {
_guard: self.guard.lock().await,
inner: &self.inner,
}
}

pub fn try_write_guard(&self) -> Result<Guard<'_, T>, TryLockError> {
let guard = self.guard.try_lock()?;
Ok(Guard {
_guard: guard,
inner: &self.inner,
})
}
}

impl<T> Guard<'_, T> {
pub fn read(&self) -> Arc<T> {
self.inner.load_full()
}

pub fn write(&mut self, value: T) {
self.inner.store(Arc::new(value));
}
}
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub mod try_rcu;

pub mod pprof;

pub mod guard_arc_swap;

// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;
Expand Down
26 changes: 13 additions & 13 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
fs_ext, pausable_failpoint,
fs_ext,
guard_arc_swap::GuardArcSwap,
pausable_failpoint,
postgres_client::PostgresClientProtocol,
sync::gate::{Gate, GateGuard},
};
Expand Down Expand Up @@ -353,8 +355,8 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,

/// When did we last calculate the partitioning? Make it pub to test cases.
pub(super) partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// The repartitioning result. Allows a single writer and multiple readers.
pub(crate) partitioning: GuardArcSwap<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,

/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
Expand Down Expand Up @@ -2340,7 +2342,8 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: tokio::sync::Mutex::new((

partitioning: GuardArcSwap::new((
(KeyPartitioning::new(), KeyPartitioning::new().into_sparse()),
Lsn(0),
)),
Expand Down Expand Up @@ -4028,18 +4031,15 @@ impl Timeline {
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
let Ok(mut guard) = self.partitioning.try_write_guard() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
// Note that there are a third "caller" that will take the `partitioning` lock. It is `gc_compaction_split_jobs` for
// gc-compaction where it uses the repartition data to determine the split jobs. In the future, it might use its own
// heuristics, but for now, we should allow concurrent access to it and let the caller retry compaction.
return Err(CompactionError::Other(anyhow!(
"repartition() called concurrently, this is rare and a retry should be fine"
"repartition() called concurrently"
)));
};
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read();
if lsn < *partition_lsn {
return Err(CompactionError::Other(anyhow!(
"repartition() called with LSN going backwards, this should not happen"
Expand Down Expand Up @@ -4067,9 +4067,9 @@ impl Timeline {
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
}; // no partitioning for metadata keys for now
*partitioning_guard = ((dense_partitioning, sparse_partitioning), lsn);

Ok((partitioning_guard.0.clone(), partitioning_guard.1))
let result = ((dense_partitioning, sparse_partitioning), lsn);
guard.write(result.clone());
Ok(result)
}

// Is it time to create a new image layer for the given partition?
Expand Down
7 changes: 1 addition & 6 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2146,12 +2146,7 @@ impl Timeline {
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = {
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock during gc-compaction");
};
partition.clone()
};
let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,
Expand Down

1 comment on commit cccc196

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7477 tests run: 7090 passed, 0 failed, 387 skipped (full report)


Code coverage* (full report)

  • functions: 33.7% (8426 of 25029 functions)
  • lines: 49.2% (70493 of 143338 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
cccc196 at 2025-01-16T17:53:45.448Z :recycle:

Please sign in to comment.