Skip to content

Commit

Permalink
pageserver: upload flushed layers in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Dec 17, 2024
1 parent 3d30a7a commit e65230b
Showing 1 changed file with 136 additions and 53 deletions.
189 changes: 136 additions & 53 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use storage_broker::BrokerClientChannel;
use tokio::{
runtime::Handle,
sync::{oneshot, watch},
time::sleep,
};
use tokio_util::sync::CancellationToken;
use tracing::*;
Expand Down Expand Up @@ -75,7 +76,7 @@ use crate::{
tenant::{
config::AttachmentMode,
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
metadata::{MetadataUpdate, TimelineMetadata},
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
},
walingest::WalLagCooldown,
Expand Down Expand Up @@ -328,9 +329,9 @@ pub struct Timeline {
/// the flush finishes. You can use that to wait for the flush to finish.
/// - The LSN is updated to max() of its current value and the latest disk_consistent_lsn
/// read by whoever sends an update
layer_flush_start_tx: tokio::sync::watch::Sender<(u64, Lsn)>,
layer_flush_start_tx: watch::Sender<FlushRequest>,
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
layer_flush_done_tx: watch::Sender<FlushResponse>,

// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
Expand Down Expand Up @@ -660,6 +661,59 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
}
}

/// A layer flush request, which wakes the flush loop when changed.
#[derive(Copy, Clone)]
pub(crate) struct FlushRequest {
/// The sequence number of this flush request. Must be monotonically increasing.
seq: FlushRequestID,
/// The LSN to freeze and flush to disk. Later LSNs may also be flushed.
flush_lsn: Lsn,
}

pub(crate) type FlushRequestID = u64;

impl FlushRequest {
/// Creates a new, empty flush request.
pub(crate) fn new(disk_consistent_lsn: Lsn) -> Self {
Self {
seq: 0,
flush_lsn: disk_consistent_lsn,
}
}

/// Bumps the flush request to a later flush LSN, and increases the sequence number.
/// Returns the sequence number.
pub(crate) fn bump(&mut self, flush_lsn: Lsn) -> FlushRequestID {
self.seq += 1;
self.flush_lsn = std::cmp::max(self.flush_lsn, flush_lsn);
self.seq
}
}

/// A layer flush response.
pub(crate) struct FlushResponse {
/// The latest request sequence number that has been flushed.
seq: FlushRequestID,
/// The last flush result, containing the flush LSN. An error may be replaced by a later
/// success. This is fine, as long as we're making progress.
result: Result<Lsn, FlushLayerError>,
}

impl FlushResponse {
/// Creates a new flush response.
pub(crate) fn new(seq: FlushRequestID, result: Result<Lsn, FlushLayerError>) -> Self {
Self { seq, result }
}

/// Creates an empty flush response.
pub(crate) fn empty() -> Self {
Self {
seq: 0,
result: Ok(Lsn(0)),
}
}
}

#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
Expand Down Expand Up @@ -2298,8 +2352,8 @@ impl Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(state);

let (layer_flush_start_tx, _) = tokio::sync::watch::channel((0, disk_consistent_lsn));
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
let (layer_flush_start_tx, _) = watch::channel(FlushRequest::new(disk_consistent_lsn));
let (layer_flush_done_tx, _) = watch::channel(FlushResponse::empty());

let evictions_low_residence_duration_metric_threshold = {
let loaded_tenant_conf = tenant_conf.load();
Expand Down Expand Up @@ -3599,11 +3653,8 @@ impl Timeline {
return Err(FlushLayerError::NotRunning(flush_loop_state));
}

self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
*lsn = std::cmp::max(at, *lsn);
});
self.layer_flush_start_tx
.send_modify(|req| my_flush_request = req.bump(at));

assert_ne!(my_flush_request, 0);

Expand All @@ -3613,27 +3664,48 @@ impl Timeline {
/// Layer flusher task's main loop.
async fn flush_loop(
self: &Arc<Self>,
mut layer_flush_start_rx: tokio::sync::watch::Receiver<(u64, Lsn)>,
mut layer_flush_start_rx: watch::Receiver<FlushRequest>,
ctx: &RequestContext,
) {
/// To parallelize uploads, we only schedule metadata index uploads every few layers, or
/// when no flushes have happened in a while. Metadata uploads act as upload queue barriers.
const UPLOAD_METADATA_LAYERS: usize = 8;
const UPLOAD_METADATA_DELAY: Duration = Duration::from_secs(1);
let mut layers_since_metadata_upload = 0;

info!("started flush loop");
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
info!("shutting down layer flush task due to Timeline::cancel");
// TODO: is this necessary? Upload queue may be shutting down too.
if layers_since_metadata_upload > 0 {
_ = self.schedule_metadata_upload(self.disk_consistent_lsn.load());
}
break;
},
_ = layer_flush_start_rx.changed() => {}
_ = sleep(UPLOAD_METADATA_DELAY), if layers_since_metadata_upload > 0 => {
let disk_consistent_lsn = self.disk_consistent_lsn.load();
if let Err(err) = self.schedule_metadata_upload(disk_consistent_lsn) {
error!("failed to schedule index upload: {err}");
}
layers_since_metadata_upload = 0;
continue;
}
}
trace!("waking up");
let (flush_counter, frozen_to_lsn) = *layer_flush_start_rx.borrow();
let req = *layer_flush_start_rx.borrow();

// The highest LSN to which we flushed in the loop over frozen layers
let mut flushed_to_lsn = Lsn(0);

let result = loop {
if self.cancel.is_cancelled() {
info!("dropping out of flush loop for timeline shutdown");
if layers_since_metadata_upload > 0 {
_ = self.schedule_metadata_upload(self.disk_consistent_lsn.load());
}
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
// anyone waiting on that will respect self.cancel as well: they will stop
// waiting at the same time we as drop out of this loop.
Expand All @@ -3660,7 +3732,7 @@ impl Timeline {
// drop 'layers' lock to allow concurrent reads and writes
};
let Some(layer_to_flush) = layer_to_flush else {
break Ok(());
break Ok(flushed_to_lsn);
};
if num_frozen_layers
> std::cmp::max(
Expand All @@ -3673,6 +3745,8 @@ impl Timeline {
"too many frozen layers: {num_frozen_layers} layers with estimated in-mem size of {frozen_layer_total_size} bytes",
);
}
// Flush the layer and schedule it for upload, but don't update the index yet. The
// index update acts as an upload queue barrier and would prevent parallel uploads.
match self.flush_frozen_layer(layer_to_flush, ctx).await {
Ok(this_layer_to_lsn) => {
flushed_to_lsn = std::cmp::max(flushed_to_lsn, this_layer_to_lsn);
Expand All @@ -3681,46 +3755,54 @@ impl Timeline {
info!("dropping out of flush loop for timeline shutdown");
return;
}
err @ Err(
FlushLayerError::NotRunning(_)
| FlushLayerError::Other(_)
| FlushLayerError::CreateImageLayersError(_),
Err(
err @ FlushLayerError::NotRunning(_)
| err @ FlushLayerError::Other(_)
| err @ FlushLayerError::CreateImageLayersError(_),
) => {
error!("could not flush frozen layer: {err:?}");
break err.map(|_| ());
error!("could not flush frozen layer: {err}");
break Err(err);
}
}
timer.stop_and_record();

layers_since_metadata_upload += 1;
if layers_since_metadata_upload >= UPLOAD_METADATA_LAYERS {
if let Err(err) = self.schedule_metadata_upload(flushed_to_lsn) {
break Err(FlushLayerError::from_anyhow(self, err));
}
layers_since_metadata_upload = 0;
}
};

// Unsharded tenants should never advance their LSN beyond the end of the
// highest layer they write: such gaps between layer data and the frozen LSN
// are only legal on sharded tenants.
debug_assert!(
self.shard_identity.count.count() > 1
|| flushed_to_lsn >= frozen_to_lsn
|| flushed_to_lsn >= req.flush_lsn
|| !flushed_to_lsn.is_valid()
);

if flushed_to_lsn < frozen_to_lsn && self.shard_identity.count.count() > 1 {
if flushed_to_lsn < req.flush_lsn && self.shard_identity.count.count() > 1 {
// If our layer flushes didn't carry disk_consistent_lsn up to the `to_lsn` advertised
// to us via layer_flush_start_rx, then advance it here.
// to us via layer_flush_start_rx, then advance it here. We don't upload metadata
// until the next scheduled flush.
//
// This path is only taken for tenants with multiple shards: single sharded tenants should
// never encounter a gap in the wal.
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
tracing::debug!("Advancing disk_consistent_lsn across layer gap {old_disk_consistent_lsn}->{frozen_to_lsn}");
if self.set_disk_consistent_lsn(frozen_to_lsn) {
if let Err(e) = self.schedule_uploads(frozen_to_lsn, vec![]) {
tracing::warn!("Failed to schedule metadata upload after updating disk_consistent_lsn: {e}");
}
}
debug!(
"advancing disk_consistent_lsn across layer gap {}->{}",
self.disk_consistent_lsn.load(),
req.flush_lsn
);
self.set_disk_consistent_lsn(req.flush_lsn);
}

// Notify any listeners that we're done
// Notify any listeners that we're done.
let _ = self
.layer_flush_done_tx
.send_replace((flush_counter, result));
.send_replace(FlushResponse::new(req.seq, result));
}
}

Expand All @@ -3729,16 +3811,9 @@ impl Timeline {
let mut rx = self.layer_flush_done_tx.subscribe();
loop {
{
let (last_result_counter, last_result) = &*rx.borrow();
if *last_result_counter >= request {
if let Err(err) = last_result {
// We already logged the original error in
// flush_loop. We cannot propagate it to the caller
// here, because it might not be Cloneable
return Err(err.clone());
} else {
return Ok(());
}
let resp = &*rx.borrow();
if resp.seq >= request {
return resp.result.clone().map(|_| ());
}
}
trace!("waiting for flush to complete");
Expand All @@ -3757,7 +3832,10 @@ impl Timeline {
}
}

/// Flush one frozen in-memory layer to disk, as a new delta layer.
/// Flush one frozen in-memory layer to disk, as a new delta layer, and schedule
/// it for upload.
///
/// NB: this does not schedule a metadata index update, use schedule_metadata_upload().
///
/// Return value is the last lsn (inclusive) of the layer that was frozen.
#[instrument(skip_all, fields(layer=%frozen_layer))]
Expand Down Expand Up @@ -3886,8 +3964,8 @@ impl Timeline {
);

if self.set_disk_consistent_lsn(disk_consistent_lsn) {
// Schedule remote uploads that will reflect our new disk_consistent_lsn
self.schedule_uploads(disk_consistent_lsn, layers_to_upload)
// Schedule remote uploads that will reflect our new disk_consistent_lsn.
self.schedule_layer_upload(layers_to_upload)
.map_err(|e| FlushLayerError::from_anyhow(self, e))?;
}
// release lock on 'layers'
Expand Down Expand Up @@ -3919,12 +3997,20 @@ impl Timeline {
new_value != old_value
}

/// Update metadata file
fn schedule_uploads(
/// Schedule upload of layer files. Note that this does not upload a new index,
/// see schedule_metadata_upload().
fn schedule_layer_upload(
&self,
disk_consistent_lsn: Lsn,
layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
layers: impl IntoIterator<Item = ResidentLayer>,
) -> anyhow::Result<()> {
for layer in layers {
self.remote_client.schedule_layer_file_upload(layer)?;
}
Ok(())
}

/// Schedule upload of an index file based on a partial metadata update.
fn schedule_metadata_upload(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
Expand All @@ -3941,7 +4027,7 @@ impl Timeline {
None
};

let update = crate::tenant::metadata::MetadataUpdate::new(
let update = MetadataUpdate::new(
disk_consistent_lsn,
ondisk_prev_record_lsn,
*self.latest_gc_cutoff_lsn.read(),
Expand All @@ -3952,9 +4038,6 @@ impl Timeline {
x.unwrap()
));

for layer in layers_to_upload {
self.remote_client.schedule_layer_file_upload(layer)?;
}
self.remote_client
.schedule_index_upload_for_metadata_update(&update)?;

Expand Down Expand Up @@ -5304,7 +5387,7 @@ impl Timeline {
// This unconditionally schedules also an index_part.json update, even though, we will
// be doing one a bit later with the unlinked gc'd layers.
let disk_consistent_lsn = self.disk_consistent_lsn.load();
self.schedule_uploads(disk_consistent_lsn, None)
self.schedule_metadata_upload(disk_consistent_lsn)
.map_err(|e| {
if self.cancel.is_cancelled() {
GcError::TimelineCancelled
Expand Down

0 comments on commit e65230b

Please sign in to comment.