Skip to content

Commit

Permalink
feat(pageserver): gc-compaction split job and partial scheduler (#9897)
Browse files Browse the repository at this point in the history
## Problem

part of #9114, stacked PR
over #9809

The compaction scheduler now schedules partial compaction jobs.

## Summary of changes

* Add the compaction job splitter based on size.
* Schedule subcompactions using the compaction scheduler.
* Test subcompaction scheduler in the smoke regress test.
* Temporarily disable layer map checks

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Dec 6, 2024
1 parent e4837b0 commit c42c28b
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 20 deletions.
10 changes: 9 additions & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2036,15 +2036,23 @@ async fn timeline_compact_handler(
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
.unwrap_or(false);

let sub_compaction = compact_request
.as_ref()
.map(|r| r.sub_compaction)
.unwrap_or(false);
let options = CompactOptions {
compact_range: compact_request
.as_ref()
.and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
flags,
sub_compaction,
};

let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false);
let scheduled = compact_request
.as_ref()
.map(|r| r.scheduled)
.unwrap_or(false);

async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
Expand Down
49 changes: 41 additions & 8 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
Expand Down Expand Up @@ -2987,10 +2988,16 @@ impl Tenant {
if has_pending_l0_compaction_task {
Some(true)
} else {
let has_pending_scheduled_compaction_task;
let mut has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
if !tline_pending_tasks.is_empty() {
info!(
"{} tasks left in the compaction schedule queue",
tline_pending_tasks.len()
);
}
let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task
Expand All @@ -3007,6 +3014,32 @@ impl Tenant {
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else if next_scheduled_compaction_task.options.sub_compaction {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = timeline
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
} else {
has_pending_scheduled_compaction_task = true;
let jobs_len = jobs.len();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(ScheduledCompactionTask {
options: job,
result_tx: if idx == jobs_len - 1 {
// The last compaction job sends the completion signal
next_scheduled_compaction_task.result_tx.take()
} else {
None
},
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
}
} else {
let _ = timeline
.compact_with_options(
Expand Down Expand Up @@ -9244,7 +9277,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -9481,7 +9514,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -9973,7 +10006,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(2)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10020,7 +10053,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(2)..get_key(4)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10072,7 +10105,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(4)..get_key(9)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10123,7 +10156,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(9)..get_key(10)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down Expand Up @@ -10179,7 +10212,7 @@ mod tests {
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(10)).into()),
compact_below_lsn: None,
..Default::default()
},
&ctx,
)
Expand Down
7 changes: 7 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,9 @@ pub(crate) struct CompactRequest {
/// Whether the compaction job should be scheduled.
#[serde(default)]
pub scheduled: bool,
/// Whether the compaction job should be split across key ranges.
#[serde(default)]
pub sub_compaction: bool,
}

#[serde_with::serde_as]
Expand Down Expand Up @@ -814,6 +817,9 @@ pub(crate) struct CompactOptions {
/// If set, the compaction will only compact the LSN below this value.
/// This option is only used by GC compaction.
pub compact_below_lsn: Option<Lsn>,
/// Enable sub-compaction (split compaction job across key ranges).
/// This option is only used by GC compaction.
pub sub_compaction: bool,
}

impl std::fmt::Debug for Timeline {
Expand Down Expand Up @@ -1637,6 +1643,7 @@ impl Timeline {
flags,
compact_range: None,
compact_below_lsn: None,
sub_compaction: false,
},
ctx,
)
Expand Down
162 changes: 151 additions & 11 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::sync::Arc;

use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
RecordedDuration, Timeline,
CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
ImageLayerCreationMode, RecordedDuration, Timeline,
};

use anyhow::{anyhow, bail, Context};
Expand All @@ -29,7 +29,6 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
Expand Down Expand Up @@ -1752,6 +1751,116 @@ impl Timeline {
Ok(())
}

/// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
/// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
/// ad-hoc information about gc compaction itself.
pub(crate) async fn gc_compaction_split_jobs(
self: &Arc<Self>,
options: CompactOptions,
) -> anyhow::Result<Vec<CompactOptions>> {
if !options.sub_compaction {
return Ok(vec![options]);
}
let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
start: Key::MIN,
end: Key::MAX,
});
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
compact_below_lsn
} else {
let gc_info = self.gc_info.read().unwrap();
gc_info.cutoffs.select_min() // use the real gc cutoff
};
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock");
};
let ((dense_ks, sparse_ks), _) = &*partition;
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,
source_end: &Key,
target_start: &Key,
target_end: &Key,
) -> Option<(Key, Key)> {
let start = source_start.max(target_start);
let end = source_end.min(target_end);
if start < end {
Some((*start, *end))
} else {
None
}
}
let mut split_key_ranges = Vec::new();
let ranges = dense_ks
.parts
.iter()
.map(|partition| partition.ranges.iter())
.chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
.flatten()
.cloned()
.collect_vec();
for range in ranges.iter() {
let Some((start, end)) = truncate_to(
&range.start,
&range.end,
&compact_range.start,
&compact_range.end,
) else {
continue;
};
split_key_ranges.push((start, end));
}
split_key_ranges.sort();
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut current_start = None;
// Split compaction job to about 2GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
let ranges_num = split_key_ranges.len();
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
if current_start.is_none() {
current_start = Some(start);
}
let start = current_start.unwrap();
if start >= end {
// We have already processed this partition.
continue;
}
let res = layer_map.range_search(start..end, compact_below_lsn);
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
let mut compact_options = options.clone();
// Try to extend the compaction range so that we include at least one full layer file.
let extended_end = res
.found
.keys()
.map(|layer| layer.layer.key_range.end)
.min();
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
// In this case, we simply use the specified key range end.
let end = if let Some(extended_end) = extended_end {
extended_end.max(end)
} else {
end
};
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_options.compact_range = Some(CompactRange { start, end });
compact_options.compact_below_lsn = Some(compact_below_lsn);
compact_options.sub_compaction = false;
compact_jobs.push(compact_options);
current_start = Some(end);
}
}
drop(guard);
Ok(compact_jobs)
}

/// An experimental compaction building block that combines compaction with garbage collection.
///
/// The current implementation picks all delta + image layers that are below or intersecting with
Expand All @@ -1774,6 +1883,36 @@ impl Timeline {
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
if options.sub_compaction {
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = self.gc_compaction_split_jobs(options).await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx).await?;
}
if jobs_len == 0 {
info!("no jobs to run, skipping gc bottom-most compaction");
}
return Ok(());
}
self.compact_with_gc_inner(cancel, options, ctx).await
}

async fn compact_with_gc_inner(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
assert!(
!options.sub_compaction,
"sub-compaction should be handled by the outer function"
);
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
Expand Down Expand Up @@ -1943,14 +2082,15 @@ impl Timeline {

// Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let layer_names = job_desc
.selected_layers
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
}
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
// let layer_names = job_desc
// .selected_layers
// .iter()
// .map(|layer| layer.layer_desc().layer_name())
// .collect_vec();
// if let Some(err) = check_valid_layermap(&layer_names) {
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
// The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc
.selected_layers
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this
Expand Down

1 comment on commit c42c28b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7040 tests run: 6728 passed, 3 failed, 309 skipped (full report)


Failures on Postgres 15

Failures on Postgres 14

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_gc_compaction_smoke[release-pg14] or test_pageserver_gc_compaction_smoke[release-pg15] or test_pull_timeline[release-pg15-True]"
Flaky tests (7)

Postgres 17

Postgres 16

Postgres 15

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
c42c28b at 2024-12-06T19:34:25.965Z :recycle:

Please sign in to comment.