diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index f600f1bd03988..511107e53b4af 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -17,11 +17,28 @@ use std::mem; use std::sync::Arc; use std::time::Instant; +use crate::async_runtime::IsolatedRuntime; +use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB}; +use crate::error::InvalidUsage; +use crate::internal::compact::{CompactConfig, Compactor}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas}; +use crate::internal::machine::{Machine, retry_external}; +use crate::internal::merge::{MergeTree, Pending}; +use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics}; +use crate::internal::paths::{PartId, PartialBatchKey, WriterKey}; +use crate::internal::state::{ + BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart, + RunMeta, RunOrder, RunPart, +}; +use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns}; +use crate::{PersistConfig, ShardId}; use arrow::array::{Array, Int64Array}; +use arrow::compute::kernels::filter; use bytes::Bytes; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; +use differential_dataflow::trace::implementations::merge_batcher::container::vec; use futures_util::stream::StreamExt; use futures_util::{FutureExt, stream}; use mz_dyncfg::Config; @@ -45,23 +62,7 @@ use semver::Version; use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; -use tracing::{Instrument, debug_span, trace_span, warn}; - -use crate::async_runtime::IsolatedRuntime; -use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB}; -use crate::error::InvalidUsage; -use crate::internal::compact::{CompactConfig, Compactor}; -use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas}; -use crate::internal::machine::retry_external; -use crate::internal::merge::{MergeTree, Pending}; -use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics}; -use crate::internal::paths::{PartId, PartialBatchKey, WriterKey}; -use crate::internal::state::{ - BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart, - RunMeta, RunOrder, RunPart, -}; -use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns}; -use crate::{PersistConfig, ShardId}; +use tracing::{Instrument, debug_span, info, trace_span, warn}; include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs")); @@ -676,6 +677,50 @@ where } } + pub fn batch_with_finished_parts( + &self, + registered_desc: Description, + ) -> Option> { + let runs = self.parts.finish_completed_runs(); + + if runs.is_empty() { + return None; + } + + let mut run_parts = vec![]; + let mut run_splits = vec![]; + let mut run_meta = vec![]; + for (order, parts) in runs { + if parts.is_empty() { + continue; + } + if run_parts.len() != 0 { + run_splits.push(run_parts.len()); + } + run_meta.push(RunMeta { + order: Some(order), + schema: self.write_schemas.id, + // Field has been deprecated but kept around to roundtrip state. + deprecated_schema: None, + }); + run_parts.extend(parts); + } + let desc = registered_desc; + + let len = run_parts.iter().fold(0, |len, part| { + let stats = part.stats(); + if let Some(stats) = stats { + len + stats.decode().key.len + } else { + len + } + }); + + let batch = HollowBatch::new(desc, run_parts, len, run_meta, run_splits); + + Some(batch) + } + /// Finish writing this batch and return a handle to the written batch. /// /// This fails if any of the updates in this batch are beyond the given @@ -843,6 +888,8 @@ impl BatchParts { shard_metrics, isolated_runtime, write_schemas, + &None, + None, ) .await .expect("successful compaction"); @@ -1019,14 +1066,21 @@ impl BatchParts { let part = Pending::new(mz_ore::task::spawn(|| name, write_future)); run.push(part); + let take_num = run + .iter() + .filter(|p| !p.is_finished()) + .count() + .saturating_sub(self.cfg.batch_builder_max_outstanding_parts); + println!("take_num: {take_num}"); // If there are more than the max outstanding parts, block on all but the // most recent. for part in run .iter_mut() - .rev() - .skip(self.cfg.batch_builder_max_outstanding_parts) - .take_while(|p| !p.is_finished()) + .filter(|p| !p.is_finished()) + // .take_while(|p| !p.is_finished()) + .take(take_num) { + println!("blocking on part"); self.batch_metrics.write_stalls.inc(); part.block_until_ready().await; } @@ -1229,6 +1283,38 @@ impl BatchParts { }) } + pub(crate) fn finish_completed_runs(&self) -> Vec<(RunOrder, Vec>)> { + match &self.writing_runs { + WritingRuns::Ordered(RunOrder::Unordered, tree) => tree + .iter() + .take_while(|part| matches!(part, Pending::Finished(_))) + .map(|part| match part { + Pending::Finished(p) => (RunOrder::Unordered, vec![p.clone()]), + _ => (RunOrder::Unordered, vec![]), + }) + .collect(), + WritingRuns::Ordered(order, tree) => { + let parts = tree + .iter() + .take_while(|part| matches!(part, Pending::Finished(_))) + .filter_map(|part| match part { + Pending::Finished(p) => Some(p.clone()), + _ => None, + }) + .collect(); + vec![(order.clone(), parts)] + } + WritingRuns::Compacting(tree) => tree + .iter() + .take_while(|(_, run)| matches!(run, Pending::Finished(_))) + .map(|(order, run)| match run { + Pending::Finished(parts) => (order.clone(), parts.clone()), + _ => (order.clone(), vec![]), + }) + .collect(), + } + } + #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))] pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec>)> { match self.writing_runs { diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index bb5e000081a2f..d1416b2242ba7 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -462,6 +462,7 @@ where .into_iter() .map(|b| Arc::unwrap_or_clone(b.batch)) .collect(), + prev_batch: None, }; let parts = req.inputs.iter().map(|x| x.part_count()).sum::(); let bytes = req @@ -499,6 +500,7 @@ where Arc::new(IsolatedRuntime::default()), req.clone(), schemas, + &machine, ); pin_mut!(stream); diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 7cf476773f4c6..dae9126ccdde2 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -15,6 +15,23 @@ use std::pin::pin; use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::async_runtime::IsolatedRuntime; +use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes}; +use crate::cfg::{ + COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS, + COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES, + GC_BLOB_DELETE_CONCURRENCY_LIMIT, INCREMENTAL_COMPACTION_DISABLED, MiB, +}; +use crate::fetch::{EncodedPart, FetchBatchFilter}; +use crate::internal::encoding::Schemas; +use crate::internal::gc::GarbageCollector; +use crate::internal::machine::Machine; +use crate::internal::maintenance::RoutineMaintenance; +use crate::internal::metrics::ShardMetrics; +use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart}; +use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; +use crate::iter::{Consolidator, LowerBound, StructuredSort}; +use crate::{Metrics, PersistConfig, ShardId}; use anyhow::anyhow; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; @@ -26,31 +43,14 @@ use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; use mz_ore::now::SYSTEM_TIME; use mz_persist::location::Blob; +use mz_persist_types::arrow::ArrayBound; use mz_persist_types::part::Part; use mz_persist_types::{Codec, Codec64}; use timely::PartialOrder; use timely::progress::{Antichain, Timestamp}; use tokio::sync::mpsc::Sender; use tokio::sync::{TryAcquireError, mpsc, oneshot}; -use tracing::{Instrument, Span, debug, debug_span, error, trace, warn}; - -use crate::async_runtime::IsolatedRuntime; -use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes}; -use crate::cfg::{ - COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS, - COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES, - GC_BLOB_DELETE_CONCURRENCY_LIMIT, INCREMENTAL_COMPACTION_DISABLED, MiB, -}; -use crate::fetch::FetchBatchFilter; -use crate::internal::encoding::Schemas; -use crate::internal::gc::GarbageCollector; -use crate::internal::machine::Machine; -use crate::internal::maintenance::RoutineMaintenance; -use crate::internal::metrics::ShardMetrics; -use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart}; -use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; -use crate::iter::{Consolidator, StructuredSort}; -use crate::{Metrics, PersistConfig, ShardId}; +use tracing::{Instrument, Span, debug, debug_span, error, info, trace, warn}; use super::trace::ActiveCompaction; @@ -68,6 +68,10 @@ pub struct CompactReq { /// The updates to include in the output batch. Any data in these outside of /// the output descriptions bounds should be ignored. pub inputs: Vec>, + + /// If this compaction is a resume of a previously interrupted compaction + /// then prev_batch contains the work done so far. + pub prev_batch: Option>, } /// A response from compaction. @@ -393,6 +397,7 @@ where Arc::clone(&machine_clone.isolated_runtime), req.clone(), compaction_schema, + &machine_clone, ); let maintenance = @@ -566,6 +571,7 @@ where isolated_runtime: Arc, req: CompactReq, write_schemas: Schemas, + machine: &Machine, ) -> impl Stream, anyhow::Error>> { async_stream::stream! { let _ = Self::validate_req(&req)?; @@ -609,6 +615,8 @@ where let chunked_runs = Self::chunk_runs(&ordered_runs, &cfg, &*metrics, run_reserved_memory_bytes); + let (incremental_tx, mut incremental_rx) = mpsc::channel(1); + let total_chunked_runs = chunked_runs.len(); let mut applied = 0; for (runs, run_chunk_max_memory_usage) in chunked_runs { @@ -624,7 +632,8 @@ where / cfg.batch.blob_target_size; let mut run_cfg = cfg.clone(); - run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts; + // run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts; + run_cfg.batch.batch_builder_max_outstanding_parts = 1; let batch = Self::compact_runs( &run_cfg, @@ -636,9 +645,17 @@ where Arc::clone(&shard_metrics), Arc::clone(&isolated_runtime), write_schemas.clone(), + &req.prev_batch, + Some(incremental_tx.clone()) ) .await?; + let incremental = incremental_rx.try_recv(); + if let Ok(res) = incremental { + let now = SYSTEM_TIME.clone(); + machine.checkpoint_compaction_progress(&res, now()).await; + } + let (parts, run_splits, run_meta, updates) = (batch.parts, batch.run_splits, batch.run_meta, batch.len); @@ -657,7 +674,7 @@ where // Set up active compaction metadata let clock = SYSTEM_TIME.clone(); let active_compaction = if applied < total_chunked_runs - 1 { - Some(ActiveCompaction { start_ms: clock() }) + Some(ActiveCompaction { start_ms: clock(), batch_so_far: None }) } else { None }; @@ -681,6 +698,7 @@ where applied += 1; } + drop(incremental_tx); } } @@ -818,6 +836,28 @@ where Ok(ordered_runs) } + fn combine_hollow_batch_with_previous( + previous_batch: &HollowBatch, + batch: &HollowBatch, + ) -> HollowBatch { + info!("combine_hollow_batch_with_previous"); + // Simplifying assumption: you can't combine batches with different descriptions. + assert_eq!(previous_batch.desc, batch.desc); + let len = previous_batch.len + batch.len; + let mut parts = Vec::with_capacity(previous_batch.parts.len() + batch.parts.len()); + parts.extend(previous_batch.parts.clone()); + parts.extend(batch.parts.clone()); + assert!(previous_batch.run_splits.is_empty()); + assert!(batch.run_splits.is_empty()); + HollowBatch::new( + previous_batch.desc.clone(), + parts, + len, + previous_batch.run_meta.clone(), + previous_batch.run_splits.clone(), + ) + } + /// Compacts runs together. If the input runs are sorted, a single run will be created as output. /// /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]` @@ -825,12 +865,14 @@ where cfg: &CompactConfig, shard_id: &ShardId, desc: &Description, - runs: Vec<(&Description, &RunMeta, &[RunPart])>, + mut runs: Vec<(&Description, &RunMeta, &[RunPart])>, blob: Arc, metrics: Arc, shard_metrics: Arc, isolated_runtime: Arc, write_schemas: Schemas, + batch_so_far: &Option>, + incremental_tx: Option>>, ) -> Result, anyhow::Error> { // TODO: Figure out a more principled way to allocate our memory budget. // Currently, we give any excess budget to write parallelism. If we had @@ -845,12 +887,65 @@ where let mut batch_cfg = cfg.batch.clone(); + let mut lower_bound = None; + // Use compaction as a method of getting inline writes out of state, to // make room for more inline writes. We could instead do this at the end // of compaction by flushing out the batch, but doing it here based on // the config allows BatchBuilder to do its normal pipelining of writes. batch_cfg.inline_writes_single_max_bytes = 0; + if let Some(batch_so_far) = batch_so_far.as_ref() { + let last_part = batch_so_far + .last_part(shard_id.clone(), &*blob, &metrics) + .await; + if let Some(last_part) = last_part { + let fetched = EncodedPart::fetch( + shard_id, + &*blob, + &metrics, + &shard_metrics, + &metrics.read.batch_fetcher, + &batch_so_far.desc, + &last_part, + ) + .await + .map_err(|blob_key| anyhow!("missing key {blob_key}"))?; + + let updates = fetched.normalize(&metrics.columnar); + let structured = updates + .as_structured::(write_schemas.key.as_ref(), write_schemas.val.as_ref()); + let part = match structured.as_part() { + Some(p) => p, + None => return Err(anyhow!("unexpected empty part")), + }; + + let last = part.len() - 1; + let key_bound = ArrayBound::new(part.key.clone(), last); + let val_bound = ArrayBound::new(part.val.clone(), last); + let t = T::decode(part.time.values()[last].to_le_bytes()); + lower_bound = Some(LowerBound { + val_bound, + key_bound, + t, + }); + } + }; + + if let Some(lower_bound) = lower_bound.as_ref() { + for (_, _, run) in &mut runs { + let start = run + .iter() + .position(|part| { + part.structured_key_lower() + .map_or(true, |lower| lower.get() >= lower_bound.key_bound.get()) + }) + .unwrap_or(run.len()); + + *run = &run[start.saturating_sub(1)..]; + } + } + let parts = BatchParts::new_ordered( batch_cfg, cfg.batch.preferred_order, @@ -887,6 +982,7 @@ where FetchBatchFilter::Compaction { since: desc.since().clone(), }, + lower_bound, prefetch_budget_bytes, ); @@ -928,7 +1024,31 @@ where else { break; }; + batch.flush_part(desc.clone(), updates).await; + + if let Some(tx) = incremental_tx.as_ref() { + // This is where we record whatever parts were successfully flushed + // to blob. That way we can resume an interrupted compaction later. + let partial_batch = batch.batch_with_finished_parts(desc.clone()); + + if let Some(partial_batch) = partial_batch { + let hollow_batch = if let Some(batch_so_far) = batch_so_far.as_ref() { + Self::combine_hollow_batch_with_previous(batch_so_far, &partial_batch) + } else { + partial_batch + }; + match tx.send(hollow_batch).await { + Ok(_) => { + // metrics.compaction.incremental_batch_sent.inc(); + } + Err(e) => { + error!("Failed to send batch to incremental compaction: {}", e); + // metrics.compaction.incremental_batch_send_fail.inc() + } + }; + } + } } let mut batch = batch.finish(desc.clone()).await?; @@ -950,8 +1070,19 @@ where .await; } + let hollow_batch = if let Some(batch_so_far) = batch_so_far.as_ref() { + let hollow_batch = batch.into_hollow_batch(); + info!( + "combining hollow batch {:?} with previous batch", + hollow_batch + ); + Self::combine_hollow_batch_with_previous(batch_so_far, &hollow_batch) + } else { + batch.into_hollow_batch() + }; + timings.record(&metrics); - Ok(batch.into_hollow_batch()) + Ok(hollow_batch) } fn validate_req(req: &CompactReq) -> Result<(), anyhow::Error> { @@ -1013,15 +1144,17 @@ impl Timings { #[cfg(test)] mod tests { + use differential_dataflow::trace::implementations::merge_batcher::container::vec; use mz_dyncfg::ConfigUpdates; use mz_ore::{assert_contains, assert_err}; use mz_persist_types::codec_impls::StringSchema; use timely::order::Product; use timely::progress::Antichain; - use crate::PersistLocation; use crate::batch::BLOB_TARGET_SIZE; + use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS; use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache}; + use crate::{PersistLocation, batch}; use super::*; @@ -1062,6 +1195,7 @@ mod tests { Antichain::from_elem(10u64), ), inputs: vec![b0, b1], + prev_batch: None, }; let schemas = Schemas { id: None, @@ -1076,6 +1210,7 @@ mod tests { Arc::new(IsolatedRuntime::default()), req.clone(), schemas.clone(), + &write.machine, ); let res = Compactor::::compact_all(stream, req.clone()) @@ -1141,6 +1276,7 @@ mod tests { Antichain::from_elem(Product::new(10, 0)), ), inputs: vec![b0, b1], + prev_batch: None, }; let schemas = Schemas { id: None, @@ -1155,6 +1291,7 @@ mod tests { Arc::new(IsolatedRuntime::default()), req.clone(), schemas.clone(), + &write.machine, ); let res = @@ -1211,6 +1348,7 @@ mod tests { Antichain::from_elem(10u64), ), inputs: vec![b0, b1], + prev_batch: None, }; write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1); let compactor = write.compact.as_ref().expect("compaction hard disabled"); @@ -1256,6 +1394,7 @@ mod tests { Antichain::from_elem(20u64), ), inputs: vec![b2, b3], + prev_batch: None, }; let compactor = write.compact.as_ref().expect("compaction hard disabled"); @@ -1270,4 +1409,184 @@ mod tests { .expect("channel closed") .expect("compaction success"); } + + #[mz_persist_proc::test(tokio::test)] + // #[mz_ore::test(tokio::test)] + #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented + async fn incremental_compaction(dyncfgs: ConfigUpdates) { + // let dyncfgs = ::mz_dyncfg::ConfigUpdates::default(); + // Generate a bunch of data and batches for testing incremental compaction. + let mut data = Vec::new(); + let num_keys = 10; + let num_times = 10; + for time in 0..num_times { + for key in 0..num_keys { + // Ensure time is monotonically increasing across all keys + let t = time * num_keys + key; + data.push(((key.to_string(), format!("val_{key}")), t as u64, 1)); + } + } + + let cache = new_test_client_cache(&dyncfgs); + cache.cfg.set_config(&BLOB_TARGET_SIZE, 100); + + let (mut write, _) = cache + .open(PersistLocation::new_in_mem()) + .await + .expect("client construction failed") + .expect_open::(ShardId::new()) + .await; + + // Split data into batches of 3 updates each. + let batch_size = 3; + let mut batches = Vec::new(); + let mut lower = 0; + while lower < data.len() { + let upper = (lower + batch_size).min(data.len()); + let batch = write + .expect_batch(&data[lower..upper], lower as u64, upper as u64) + .await + .into_hollow_batch(); + batches.push(batch); + lower = upper; + } + + let req = CompactReq { + shard_id: write.machine.shard_id(), + desc: Description::new( + batches.first().unwrap().desc.lower().clone(), + batches.last().unwrap().desc.upper().clone(), + Antichain::from_elem(10u64), + ), + inputs: batches.clone(), + prev_batch: None, + }; + let schemas = Schemas { + id: None, + key: Arc::new(StringSchema), + val: Arc::new(StringSchema), + }; + let ordered_runs = Compactor::::order_runs( + &req, + RunOrder::Structured, + &*write.blob, + &write.metrics, + ) + .await + .expect("order runs failed"); + + // Set this to an arbitrarily small number to force writes to flush + // to blob. + write + .cfg + .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1); + + // Set this to an arbitrarily small number to force multiple parts to be + // written. + write.cfg.set_config(&BLOB_TARGET_SIZE, 10); + + let cfg = CompactConfig::new(&write.cfg, write.shard_id()); + + let chunked_runs = Compactor::::chunk_runs( + &ordered_runs, + &cfg, + &write.metrics, + 1000000, + ); + + let mut incremental_result = None; + let mut first_batch = None; + let mut second_batch = None; + + let chunked_runs_clone = chunked_runs.clone(); + + for (runs, _max_memory) in chunked_runs_clone.iter() { + let (incremental_tx, mut incremental_rx) = tokio::sync::mpsc::channel(1); + let shard_id = write.shard_id(); + + let batch_handle = Compactor::::compact_runs( + &cfg, + &shard_id, + &req.desc, + runs.clone(), + Arc::clone(&write.blob), + Arc::clone(&write.metrics), + write.metrics.shards.shard(&write.machine.shard_id(), ""), + Arc::new(IsolatedRuntime::default()), + schemas.clone(), + &None, + Some(incremental_tx), + ); + + let incremental_handle = tokio::spawn(async move { + let mut batches = vec![]; + while let Some(b) = incremental_rx.recv().await { + batches.push(b); + } + batches + }); + + let (batch_result, incremental) = tokio::join! { + batch_handle, + incremental_handle, + }; + let incremental = incremental.unwrap(); + + incremental_result = Some(incremental[incremental.len() - 1].clone()); + first_batch = Some(batch_result.unwrap()); + } + + for (runs, _max_memory) in chunked_runs.iter() { + let (incremental_tx, mut incremental_rx) = tokio::sync::mpsc::channel(1); + let shard_id = write.shard_id(); + + let batch_handle = Compactor::::compact_runs( + &cfg, + &shard_id, + &req.desc, + runs.clone(), + Arc::clone(&write.blob), + Arc::clone(&write.metrics), + write.metrics.shards.shard(&write.machine.shard_id(), ""), + Arc::new(IsolatedRuntime::default()), + schemas.clone(), + &incremental_result, + Some(incremental_tx), + ); + + let incremental_handle = tokio::spawn(async move { + let mut batch = None; + while let Some(b) = incremental_rx.recv().await { + batch = Some(b); + } + batch + }); + + let (batch_result, _incremental) = tokio::join! { + batch_handle, + incremental_handle, + }; + + let batch_result = batch_result.unwrap(); + second_batch = Some(batch_result.clone()); + } + + println!("first_batch={:#?}", first_batch); + println!("second_batch={:#?}", second_batch); + + // We want to assert that the first batch is equal to the second batch, + // _except_ for the last part, which is where the second run should have + // picked up incrementally. + let (mut first_batch, mut second_batch) = ( + first_batch.expect("first batch"), + second_batch.expect("second batch"), + ); + + first_batch.parts.pop(); + second_batch.parts.pop(); + + assert_eq!(first_batch, second_batch); + + println!("chunked_runs={:#?}", chunked_runs.iter().len()); + } } diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index e81a462dd78e4..f65262454b373 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1099,16 +1099,18 @@ impl ProtoMapEntry> for Proto } } -impl RustType for ActiveCompaction { +impl RustType for ActiveCompaction { fn into_proto(&self) -> ProtoCompaction { ProtoCompaction { start_ms: self.start_ms, + batch_so_far: self.batch_so_far.as_ref().map(|x| x.into_proto()), } } fn from_proto(proto: ProtoCompaction) -> Result { Ok(Self { start_ms: proto.start_ms, + batch_so_far: proto.batch_so_far.map(|x| x.into_rust()).transpose()?, }) } } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 3cbd669f314b3..0f105d998f8fe 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -278,6 +278,7 @@ where .into_iter() .map(|b| Arc::unwrap_or_clone(b.batch)) .collect(), + prev_batch: None, }) .collect(); (reqs, maintenance) @@ -500,6 +501,7 @@ where .into_iter() .map(|b| Arc::unwrap_or_clone(b.batch)) .collect(), + prev_batch: req.active_compaction, }; compact_reqs.push(req); } @@ -596,6 +598,30 @@ where } } + /// As we build up batches during compaction, we incrementally commit + /// information about the completed (merged and persisted) parts. + /// That way if compaction is interrupted, we can safely resume the work. + /// + /// `checkpoint_compaction_progress` stores that in progress batch in state. + pub async fn checkpoint_compaction_progress( + &self, + batch_so_far: &HollowBatch, + current_ts: u64, + ) { + let metrics = Arc::clone(&self.applier.metrics); + + //TODO(dov): new metric + let _ = self + .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| { + let ret = state.apply_compaction_progress(batch_so_far, current_ts); + if let Continue(_) = ret { + // metrics.state.compaction_progress_applied.inc(); + } + ret + }) + .await; + } + pub async fn merge_res( &self, res: &FueledMergeRes, @@ -1997,6 +2023,7 @@ pub mod datadriven { shard_id: datadriven.shard_id, desc: Description::new(lower, upper, since), inputs, + prev_batch: None, }; let req_clone = req.clone(); @@ -2008,6 +2035,7 @@ pub mod datadriven { Arc::clone(&datadriven.client.isolated_runtime), req_clone, SCHEMAS.clone(), + &datadriven.machine, ); let res = Compactor::::compact_all(stream, req.clone()).await?; diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto index f89a4e678f851..4a370011cf1b6 100644 --- a/src/persist-client/src/internal/state.proto +++ b/src/persist-client/src/internal/state.proto @@ -131,6 +131,7 @@ message ProtoIdSpineBatch { message ProtoCompaction { uint64 start_ms = 1; + optional ProtoHollowBatch batch_so_far = 2; } message ProtoMerge { diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index dd47b77c476ec..1a237cf91c673 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::ops::ControlFlow::{self, Break, Continue}; -use std::ops::{Deref, DerefMut}; +use std::ops::{Add, Deref, DerefMut}; use std::time::Duration; use arrow::array::{Array, ArrayData, make_array}; @@ -638,6 +638,27 @@ impl RunPart { } } } + pub async fn last_part<'a>( + &'a self, + shard_id: ShardId, + blob: &'a dyn Blob, + metrics: &'a Metrics, + ) -> Result>, MissingBlob> { + match self { + RunPart::Single(p) => Ok(Box::new(p.clone())), + RunPart::Many(r) => { + let fetched = r + .get(shard_id, blob, metrics) + .await + .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; + let last_part = fetched + .parts + .last() + .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; + Ok(Box::pin(last_part.last_part(shard_id, blob, metrics)).await?) + } + } + } } impl PartialOrd for BatchPart { @@ -883,6 +904,20 @@ impl HollowBatch { } } } + + pub async fn last_part<'a>( + &'a self, + shard_id: ShardId, + blob: &'a dyn Blob, + metrics: &'a Metrics, + ) -> Option> { + let last_part = self.parts.last()?; + let last_part = last_part.last_part(shard_id, blob, metrics).await; + match last_part { + Ok(part) => Some(*part), + Err(MissingBlob(_)) => None, + } + } } impl HollowBatch { /// Construct an in-memory hollow batch from the given metadata. @@ -1658,6 +1693,7 @@ where req.id, ActiveCompaction { start_ms: heartbeat_timestamp_ms, + batch_so_far: None, }, ) } @@ -1684,6 +1720,28 @@ where Continue(merge_reqs) } + /// This is a best effort attempt to apply incremental compaction + /// to the spine. + pub fn apply_compaction_progress( + &mut self, + batch_so_far: &HollowBatch, + new_ts: u64, + ) -> ControlFlow, ()> { + if self.is_tombstone() { + return Break(NoOpStateTransition(())); + } + + let new_active_compaction = ActiveCompaction { + start_ms: new_ts, + batch_so_far: Some(batch_so_far.clone()), + }; + + self.trace + .apply_incremental_compaction(&new_active_compaction); + + Continue(()) + } + pub fn apply_merge_res( &mut self, res: &FueledMergeRes, diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index fcf27e21afd87..d58f0cf217a02 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -72,12 +72,13 @@ pub struct FueledMergeReq { pub id: SpineId, pub desc: Description, pub inputs: Vec>, + pub active_compaction: Option>, } #[derive(Debug)] pub struct FueledMergeRes { pub output: HollowBatch, - pub new_active_compaction: Option, + pub new_active_compaction: Option>, } /// An append-only collection of compactable update batches. @@ -140,7 +141,7 @@ impl PartialEq for ThinSpineBatch { pub struct ThinMerge { pub(crate) since: Antichain, pub(crate) remaining_work: usize, - pub(crate) active_compaction: Option, + pub(crate) active_compaction: Option>, } impl ThinMerge { @@ -517,7 +518,7 @@ impl Trace { Self::remove_redundant_merge_reqs(merge_reqs) } - pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) { + pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) { // TODO: we ought to be able to look up the id for a batch by binary searching the levels. // In the meantime, search backwards, since most compactions are for recent batches. for batch in self.spine.spine_batches_mut().rev() { @@ -563,6 +564,15 @@ impl Trace { self.spine.validate() } + pub fn apply_incremental_compaction(&mut self, compaction: &ActiveCompaction) { + for batch in self.spine.spine_batches_mut().rev() { + let result = batch.maybe_update_active_compaction(compaction); + if result.matched() { + return; + } + } + } + pub fn apply_merge_res(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { for batch in self.spine.spine_batches_mut().rev() { let result = batch.maybe_replace(res); @@ -601,10 +611,23 @@ impl Trace { .as_ref() .map_or(true, move |c| c.start_ms <= threshold_ms) }) - .map(|b| FueledMergeReq { - id: b.id, - desc: b.desc.clone(), - inputs: b.parts.clone(), + .map(|b| { + let inputs = b + .active_compaction + .as_ref() + .and_then(|ac| ac.batch_so_far.as_ref()) // If Some, chain to Option<&BatchSoFarType> + .map_or_else(|| b.parts.clone(), |bsf| b.filtered_batches(&bsf.desc)); + let active_compaction_hollow_batch = b + .active_compaction + .as_ref() + .and_then(|ac| ac.batch_so_far.clone()) + .map_or_else(|| None, |bsf| Some(bsf)); + FueledMergeReq { + id: b.id, + desc: b.desc.clone(), + inputs, + active_compaction: active_compaction_hollow_batch, + } }) } @@ -690,8 +713,9 @@ pub struct IdHollowBatch { } #[derive(Debug, Clone, Eq, PartialEq, Serialize)] -pub struct ActiveCompaction { +pub struct ActiveCompaction { pub start_ms: u64, + pub batch_so_far: Option>, } #[derive(Debug, Clone, PartialEq)] @@ -699,13 +723,13 @@ struct SpineBatch { id: SpineId, desc: Description, parts: Vec>, - active_compaction: Option, + active_compaction: Option>, // A cached version of parts.iter().map(|x| x.len).sum() len: usize, } impl SpineBatch { - fn merged(batch: IdHollowBatch, active_compaction: Option) -> Self + fn merged(batch: IdHollowBatch, active_compaction: Option>) -> Self where T: Clone, { @@ -828,6 +852,61 @@ impl SpineBatch { }) } + fn maybe_update_active_compaction( + &mut self, + active_compaction: &ActiveCompaction, + ) -> ApplyMergeResult { + let batch = match active_compaction.batch_so_far.as_ref() { + Some(batch) => batch, + None => return ApplyMergeResult::NotAppliedNoMatch, + }; + + // The spine's and merge res's sinces don't need to match (which could occur if Spine + // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine + // since must be in advance of the merge res since. + if !PartialOrder::less_equal(batch.desc.since(), self.desc().since()) { + return ApplyMergeResult::NotAppliedInvalidSince; + } + + // If our merge result exactly matches a spine batch, we can swap it in directly + let exact_match = + batch.desc.lower() == self.desc().lower() && batch.desc.upper() == self.desc().upper(); + if exact_match { + self.active_compaction = Some(active_compaction.clone()); + + return ApplyMergeResult::AppliedExact; + } + + let SpineBatch { + id: _, + parts: _, + desc, + active_compaction: _, + len: _, + } = self; + + if PartialOrder::less_equal(desc.lower(), batch.desc.lower()) + && PartialOrder::less_equal(batch.desc.upper(), desc.upper()) + { + self.active_compaction = Some(active_compaction.clone()); + ApplyMergeResult::AppliedSubset + } else { + ApplyMergeResult::NotAppliedNoMatch + } + } + + fn filtered_batches(&self, desc: &Description) -> Vec> { + self.parts + .iter() + .filter(|b| { + let part_desc = &b.batch.desc; + PartialOrder::less_equal(part_desc.lower(), desc.lower()) + && PartialOrder::less_equal(desc.upper(), part_desc.upper()) + }) + .cloned() + .collect() + } + // TODO: Roundtrip the SpineId through FueledMergeReq/FueledMergeRes? fn maybe_replace(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { // The spine's and merge res's sinces don't need to match (which could occur if Spine @@ -1033,6 +1112,7 @@ impl FuelingMerge { id, desc: desc.clone(), inputs: merged_parts.clone(), + active_compaction: None, }); } @@ -1876,7 +1956,13 @@ pub(crate) mod tests { .fueled_merge_reqs_before_ms(timeout_ms, None) .collect(); for req in reqs { - trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 }) + trace.claim_compaction( + req.id, + ActiveCompaction { + start_ms: 0, + batch_so_far: None, + }, + ) } trace.roundtrip_structure = roundtrip_structure; trace @@ -1946,6 +2032,7 @@ pub(crate) mod tests { Antichain::new(), ), inputs: vec![], + active_compaction: None, } } @@ -2015,6 +2102,7 @@ pub(crate) mod tests { Antichain::from_elem(5), ), inputs: vec![], + active_compaction: None, }; assert_eq!( Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]), diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 129b85f065292..2f2fc44554c9d 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -32,9 +32,9 @@ use mz_persist::metrics::ColumnarMetrics; use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd}; use mz_persist_types::part::Part; use mz_persist_types::{Codec, Codec64}; -use semver::Version; +use semver::{Op, Version}; use timely::progress::Timestamp; -use tracing::{Instrument, debug_span}; +use tracing::{Instrument, debug_span, info}; use crate::ShardId; use crate::fetch::{EncodedPart, FetchBatchFilter}; @@ -157,7 +157,7 @@ impl StructuredSort { } } -type SortKV<'a> = (ArrayIdx<'a>, Option>); +pub type SortKV<'a> = (ArrayIdx<'a>, Option>); fn kv_lower(data: &FetchData) -> Option> { let key_idx = data.structured_lower.as_ref().map(|l| l.get())?; @@ -343,6 +343,7 @@ pub(crate) struct Consolidator> { runs: Vec, usize)>>, filter: FetchBatchFilter, budget: usize, + lower_bound: Option>, // NB: this is the tricky part! // One hazard of streaming consolidation is that we may start consolidating a particular KVT, // but not be able to finish, because some other part that might also contain the same KVT @@ -351,6 +352,22 @@ pub(crate) struct Consolidator> { drop_stash: Option, } +#[derive(Debug)] +pub struct LowerBound { + pub(crate) key_bound: ArrayBound, + pub(crate) val_bound: ArrayBound, + pub(crate) t: T, +} + +impl LowerBound { + pub fn kvt_bound(&self) -> (SortKV<'_>, T) { + ( + (self.key_bound.get(), Some(self.val_bound.get())), + self.t.clone(), + ) + } +} + impl Consolidator where T: Timestamp + Codec64 + Lattice, @@ -369,6 +386,7 @@ where shard_metrics: Arc, read_metrics: ReadMetrics, filter: FetchBatchFilter, + lower_bound: Option>, prefetch_budget_bytes: usize, ) -> Self { Self { @@ -383,6 +401,7 @@ where filter, budget: prefetch_budget_bytes, drop_stash: None, + lower_bound, } } } @@ -485,7 +504,9 @@ where return None; } - let mut iter = ConsolidatingIter::new(&self.context, &self.filter, &mut self.drop_stash); + let bound = self.lower_bound.as_ref().map(|b| b.kvt_bound()); + let mut iter = + ConsolidatingIter::new(&self.context, &self.filter, bound, &mut self.drop_stash); for run in &mut self.runs { let last_in_run = run.len() < 2; @@ -837,6 +858,7 @@ where parts: Vec<&'a StructuredUpdates>, heap: BinaryHeap>, upper_bound: Option<(SortKV<'a>, T)>, + lower_bound: Option<(SortKV<'a>, T)>, state: Option<(Indices, SortKV<'a>, T, D)>, drop_stash: &'a mut Option, } @@ -849,6 +871,7 @@ where fn new( context: &'a str, filter: &'a FetchBatchFilter, + lower_bound: Option<(SortKV<'a>, T)>, drop_stash: &'a mut Option, ) -> Self { Self { @@ -857,6 +880,7 @@ where parts: vec![], heap: BinaryHeap::new(), upper_bound: None, + lower_bound, state: None, drop_stash, } @@ -926,6 +950,21 @@ where } } + // This code checks our inclusive lower bound against + if let Some((kv_lower, t_lower)) = &self.lower_bound { + if (kv_lower, t_lower) >= (kv1, t1) { + info!( + "discarding {kv1:?} {t1:?} from {context} because it's below our lower bound", + context = self.context + ); + // Discard this item from the part, since it's past our lower bound. + let _ = part.pop(&self.parts, self.filter); + + // Continue to the next part, since it might still be relevant. + continue; + } + } + self.state = part.pop(&self.parts, self.filter); } } else { @@ -1093,6 +1132,7 @@ mod tests { filter, budget: 0, drop_stash: None, + lower_bound: None, }; let mut out = vec![]; @@ -1164,6 +1204,7 @@ mod tests { FetchBatchFilter::Compaction { since: desc.since().clone(), }, + None, budget, ); diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 03cebe1e64a76..01e4c17286e34 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -1001,6 +1001,7 @@ where Arc::clone(&self.machine.applier.shard_metrics), self.metrics.read.snapshot.clone(), filter, + None, COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg), ); for batch in batches {