From 78f5a8905440784109c18be42724019195c20d7f Mon Sep 17 00:00:00 2001 From: Securifi Dev Date: Sun, 14 Jun 2026 15:20:26 +0000 Subject: [PATCH 1/4] fix(compaction): keep compact_now() future Send MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit compact_now() held a tracing::EnteredSpan guard across its .await points. EnteredSpan is !Send, so holding it across an await makes the returned future !Send — uncallable from a Send context (e.g. an #[async_trait] impl whose methods require Send). Replace the entered-span guard with tracing::Instrument so the span still covers the async work while the future stays Send. Behavior-preserving: same span name, fields, logging. --- src/compaction/full.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/compaction/full.rs b/src/compaction/full.rs index 1c5533e..2b2ea4d 100644 --- a/src/compaction/full.rs +++ b/src/compaction/full.rs @@ -26,9 +26,21 @@ use super::helpers::{ use super::types::CompactStats; /// Full online compaction. See module-level docs for the staged flow. -#[allow(clippy::too_many_lines)] +/// +/// The body is instrumented via [`tracing::Instrument`] rather than an +/// `EnteredSpan` guard: an entered span guard is `!Send` and would be held +/// across the many `.await` points below, making the returned future `!Send` +/// and thus uncallable from `Send` async contexts (e.g. the nodedb-lite +/// `#[async_trait]` StorageEngine impl, which requires `Send` futures). pub async fn compact_now(db: &Db) -> Result { - let _span = tracing::debug_span!("compaction.run").entered(); + use tracing::Instrument; + compact_now_inner(db) + .instrument(tracing::debug_span!("compaction.run")) + .await +} + +#[allow(clippy::too_many_lines)] +async fn compact_now_inner(db: &Db) -> Result { if !matches!(db.mode, crate::txn::mode::DbMode::Standalone) { return Err(PagedbError::Unsupported); } From 13ae82d0eb9ae9ba507bd4ff33720113ab8c2101 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Mon, 15 Jun 2026 08:03:33 +0800 Subject: [PATCH 2/4] fix(btree): spill large values to overflow chains in bulk_load --- src/btree/overflow.rs | 8 +++ src/btree/tree/bulk.rs | 120 ++++++++++++++++++---------------------- src/btree/tree/write.rs | 2 +- 3 files changed, 63 insertions(+), 67 deletions(-) diff --git a/src/btree/overflow.rs b/src/btree/overflow.rs index cacb618..b0522ba 100644 --- a/src/btree/overflow.rs +++ b/src/btree/overflow.rs @@ -26,6 +26,14 @@ use crate::{RealmId, Result}; /// Header length for chain pages (non-root): `next[8] || data_len[4]`. pub const OVERFLOW_HEADER_LEN: usize = 12; +/// Values longer than this are stored as an overflow chain rather than inline in +/// a leaf. Shared by the `put` path and `bulk_load` so a repack reproduces the +/// same inline/overflow split the original writes produced. +#[must_use] +pub fn inline_value_threshold(page_size: usize) -> usize { + page_size / 4 +} + /// Extra bytes at the start of a v2 root body before the standard header: /// `refcount[4]`. const OVERFLOW_ROOT_V2_PREFIX: usize = 4; diff --git a/src/btree/tree/bulk.rs b/src/btree/tree/bulk.rs index e7cc092..3748573 100644 --- a/src/btree/tree/bulk.rs +++ b/src/btree/tree/bulk.rs @@ -1,11 +1,14 @@ //! Bulk-load: build a dense tree from sorted pairs without `CoW` overhead. +use std::sync::Arc; + use crate::Result; use crate::errors::PagedbError; use crate::vfs::Vfs; use crate::btree::leaf::{Leaf, LeafValue}; use crate::btree::node; +use crate::btree::overflow; use super::core::BTree; @@ -13,14 +16,11 @@ impl BTree { /// Bulk-load sorted `(key, value)` pairs into a freshly-created tree /// without any `CoW` overhead. The tree must be empty (`root_page_id == 0`). /// - /// All records are first placed into as few leaves as needed, then internal - /// nodes are built bottom-up. Each page is written exactly once; no freed - /// pages are generated. The resulting layout is dense and compact. - /// - /// Overflow values are NOT supported in bulk-load; callers must ensure all - /// values fit inline (`value.len()` ≤ `page_size` / 4). For compaction use- - /// cases, the pairs come from `collect_range` which already resolves - /// overflow chains into inline bytes. + /// Values larger than the inline threshold (`page_size / 4`) are spilled to + /// overflow chains, exactly as the `put` path does, so a dense repack + /// reproduces the original storage shape. Records are then packed into as few + /// leaves as needed and internal nodes are built bottom-up. Each leaf/internal + /// page is written exactly once; the layout is dense and compact. /// /// Returns `Err` if the tree is not empty. #[allow(clippy::too_many_lines)] @@ -34,82 +34,70 @@ impl BTree { return Ok(()); } - // Build leaves greedily: pack as many records as fit into each leaf. - // No sibling pointers yet — we'll wire them up at the end. - let mut leaves: Vec<(u64, Vec)> = Vec::new(); // (page_id, first_key) - - let body_cap = node::body_capacity(self.page_size); - let mut current_leaf = Leaf::new(); + // Resolve each value to its stored form, spilling values past the inline + // threshold to overflow chains (same threshold as `put`). Inlining an + // oversized value would exceed leaf capacity and fail the encode. + let realm = self.realm_id; + let ps = self.page_size; + let pager = Arc::clone(&self.pager); + let inline_threshold = overflow::inline_value_threshold(ps); + let mut records: Vec<(Vec, LeafValue)> = Vec::with_capacity(pairs.len()); + for (k, v) in pairs { + let value = if v.len() > inline_threshold { + let total_len = v.len() as u64; + let root_page_id = + overflow::write_chain(&pager, realm, &v, ps, &mut || self.allocate_page()) + .await?; + LeafValue::Overflow { + total_len, + root_page_id, + } + } else { + LeafValue::Inline(v) + }; + records.push((k, value)); + } - let flush_leaf = |leaf: &Leaf, page_id: u64, next_id: u64| { - let _ = (leaf, page_id, next_id); // will write below - }; - let _ = flush_leaf; // suppress unused-variable lint (closure is a placeholder) + let body_cap = node::body_capacity(ps); - // First pass: group records into leaves. - let mut leaf_groups: Vec, Vec)>> = Vec::new(); - for (k, v) in &pairs { - let entry_size = { - let suffix_len = k.len(); // conservative: no prefix compression yet - 2 + suffix_len + 2 + v.len() // slot entry (inline value) - }; - // Rough check: header + slot_dir entry + record body + // First pass: group records into leaves, sized by encoded record width. + let mut leaf_groups: Vec, LeafValue)>> = Vec::new(); + let mut current: Vec<(Vec, LeafValue)> = Vec::new(); + for (k, value) in records { + // New record's body contribution: suffix-len field + key + value. + // (No prefix compression at build time, so suffix == full key.) + let entry_size = 2 + k.len() + value.encoded_size(); let projected = node::HEADER_LEN - + (current_leaf.records.len() + 1) * 2 - + current_leaf - .records + + (current.len() + 1) * 2 + + current .iter() .map(|(ck, cv)| 2 + ck.len() + cv.encoded_size()) .sum::() + entry_size; - if projected > body_cap && !current_leaf.records.is_empty() { - leaf_groups.push( - std::mem::take(&mut current_leaf.records) - .into_iter() - .map(|(lk, lv)| { - let vbytes = match lv { - LeafValue::Inline(b) => b, - LeafValue::Overflow { .. } => Vec::new(), - }; - (lk, vbytes) - }) - .collect(), - ); - current_leaf = Leaf::new(); + if projected > body_cap && !current.is_empty() { + leaf_groups.push(std::mem::take(&mut current)); } - current_leaf.upsert(k, LeafValue::Inline(v.clone())); + current.push((k, value)); } - if !current_leaf.records.is_empty() { - leaf_groups.push( - std::mem::take(&mut current_leaf.records) - .into_iter() - .map(|(lk, lv)| { - let vbytes = match lv { - LeafValue::Inline(b) => b, - LeafValue::Overflow { .. } => Vec::new(), - }; - (lk, vbytes) - }) - .collect(), - ); + if !current.is_empty() { + leaf_groups.push(current); } - // Second pass: allocate page_ids and write leaves with correct sibling pointers. + // Second pass: allocate page ids and write leaves with sibling links. + // Input is sorted and grouping preserves order, so each leaf's records + // are already in key order. + let mut leaves: Vec<(u64, Vec)> = Vec::new(); // (page_id, first_key) let n_leaves = leaf_groups.len(); let page_ids: Vec = (0..n_leaves).map(|_| self.allocate_page()).collect(); - for (i, group) in leaf_groups.iter().enumerate() { - let mut leaf = Leaf { + for (i, group) in leaf_groups.into_iter().enumerate() { + let first_key = group.first().map(|(k, _)| k.clone()).unwrap_or_default(); + let leaf = Leaf { left_sibling: if i == 0 { 0 } else { page_ids[i - 1] }, right_sibling: if i + 1 < n_leaves { page_ids[i + 1] } else { 0 }, - records: group - .iter() - .map(|(k, v)| (k.clone(), LeafValue::Inline(v.clone()))) - .collect(), + records: group, }; - leaf.records.sort_by(|(a, _), (b, _)| a.cmp(b)); // already sorted self.write_leaf(page_ids[i], &leaf).await?; - let first_key = group.first().map(|(k, _)| k.clone()).unwrap_or_default(); leaves.push((page_ids[i], first_key)); } diff --git a/src/btree/tree/write.rs b/src/btree/tree/write.rs index 2862318..bd7efab 100644 --- a/src/btree/tree/write.rs +++ b/src/btree/tree/write.rs @@ -17,7 +17,7 @@ impl BTree { /// Insert or overwrite a key-value pair. pub async fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { // Build the leaf value — inline if small enough, overflow chain otherwise. - let leaf_value = if value.len() > self.page_size / 4 { + let leaf_value = if value.len() > overflow::inline_value_threshold(self.page_size) { let realm = self.realm_id; let ps = self.page_size; let pager = Arc::clone(&self.pager); From 64aea1dd564998c15dd8aa93dc659dcbb0a02546 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Mon, 15 Jun 2026 08:03:45 +0800 Subject: [PATCH 3/4] fix(compaction): atomic scratch-file repack; honest single-shot compact_step --- src/catalog/codec.rs | 96 +---------- src/catalog/mod.rs | 2 +- src/compaction/full.rs | 153 +++-------------- src/compaction/helpers.rs | 68 +------- src/compaction/mod.rs | 1 + src/compaction/repack.rs | 258 +++++++++++++++++++++++++++++ src/compaction/step.rs | 334 +++----------------------------------- src/lib.rs | 1 - src/pager/cache.rs | 30 ++++ src/pager/core.rs | 41 ++++- src/txn/db/open.rs | 8 + 11 files changed, 383 insertions(+), 609 deletions(-) create mode 100644 src/compaction/repack.rs diff --git a/src/catalog/codec.rs b/src/catalog/codec.rs index 12e283b..3077afe 100644 --- a/src/catalog/codec.rs +++ b/src/catalog/codec.rs @@ -42,46 +42,13 @@ pub enum CatalogRowKind { /// catalog, reader pins are maintained in-memory only and the writer process /// must be trusted to honor the catalog pins. ReaderPin = 0x06, - /// Incremental compaction watermark. Singleton row; key is `[0x07]`. - /// Value: `target_root[8] || frontier_page_id[8] || started_at_commit_id[8] || - /// total_pages_estimate[8]` = 32 bytes. - /// - /// Present only while an incremental compaction is in progress. Cleared - /// atomically with the final compaction commit. On `Db::open`, a present - /// row means a prior compaction was interrupted; the embedder must call - /// `compact_step` again to resume — `Db::open` does NOT auto-resume. + /// Reserved (`0x07`). Older builds wrote an incremental-compaction watermark + /// here; compaction is now a single atomic operation and never writes it. + /// Retained as a row-kind boundary and so any legacy row is recognised and + /// dropped during compaction. CompactionState = 0x07, } -/// Incremental compaction watermark persisted in the catalog. -/// -/// Present while a `compact_step` session is in progress. The embedder must -/// call `compact_step` again to resume after a crash; `Db::open` does not -/// auto-resume. -/// -/// Encoding: `target_root[8] || frontier_page_id[8] || started_at_commit_id[8] || -/// total_pages_estimate[8] || frontier_key_len[4] || frontier_key[frontier_key_len]` -/// (minimum 32 bytes; variable total). -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct CompactionStateRow { - /// Root page id of the main B+ tree at the time compaction started. - /// Used as the source snapshot to read from during each step. - pub target_root: u64, - /// The `next_page_id` of the partially-built compacted tree after the last - /// committed batch. Used to estimate progress and as the initial `next_page_id` - /// when no free-list pages are available. - pub frontier_page_id: u64, - /// The `commit_id` at which the compaction session began. - pub started_at_commit_id: u64, - /// Estimated total pages in the source tree (for progress reporting). - pub total_pages_estimate: u64, - /// The key (exclusive lower bound) from which the next step should begin - /// reading pairs. Empty means start from the beginning. - pub frontier_key: Vec, -} - -pub const COMPACTION_STATE_FIXED_LEN: usize = 36; // 4 × u64 + 1 × u32 length prefix - /// Rekey watermark persisted in the catalog during an online rekey operation. /// A present row means a rekey is in flight or was interrupted by a crash. #[derive(Debug, Clone, PartialEq, Eq)] @@ -246,61 +213,6 @@ impl Catalog { [CatalogRowKind::CompactionState as u8] } - /// Compaction-state row key: `[0x07]` (singleton). - #[must_use] - pub fn compaction_state_key() -> [u8; 1] { - [CatalogRowKind::CompactionState as u8] - } - - /// Encode a `CompactionStateRow`. Returns a variable-length `Vec`. - pub fn encode_compaction_state(r: &CompactionStateRow) -> Result> { - let key_len = r.frontier_key.len(); - let klen32 = u32::try_from(key_len).map_err(|_| PagedbError::ManifestTooLarge)?; - let mut o = Vec::with_capacity(COMPACTION_STATE_FIXED_LEN + key_len); - o.extend_from_slice(&r.target_root.to_le_bytes()); - o.extend_from_slice(&r.frontier_page_id.to_le_bytes()); - o.extend_from_slice(&r.started_at_commit_id.to_le_bytes()); - o.extend_from_slice(&r.total_pages_estimate.to_le_bytes()); - o.extend_from_slice(&klen32.to_le_bytes()); - o.extend_from_slice(&r.frontier_key); - Ok(o) - } - - /// Decode a `CompactionStateRow` from bytes. - pub fn decode_compaction_state(bytes: &[u8]) -> Result { - if bytes.len() < COMPACTION_STATE_FIXED_LEN { - return Err(PagedbError::corruption( - crate::errors::CorruptionDetail::HeaderUnverifiable, - )); - } - let read_u64 = |off: usize| { - let mut b = [0u8; 8]; - b.copy_from_slice(&bytes[off..off + 8]); - u64::from_le_bytes(b) - }; - let target_root = read_u64(0); - let frontier_page_id = read_u64(8); - let started_at_commit_id = read_u64(16); - let total_pages_estimate = read_u64(24); - let mut klen_buf = [0u8; 4]; - klen_buf.copy_from_slice(&bytes[32..36]); - let key_len = u32::from_le_bytes(klen_buf) as usize; - if bytes.len() < COMPACTION_STATE_FIXED_LEN + key_len { - return Err(PagedbError::corruption( - crate::errors::CorruptionDetail::HeaderUnverifiable, - )); - } - let frontier_key = - bytes[COMPACTION_STATE_FIXED_LEN..COMPACTION_STATE_FIXED_LEN + key_len].to_vec(); - Ok(CompactionStateRow { - target_root, - frontier_page_id, - started_at_commit_id, - total_pages_estimate, - frontier_key, - }) - } - /// Encode a `ReaderPinValue` as 41 bytes. #[must_use] pub fn encode_reader_pin(v: &ReaderPinValue) -> [u8; READER_PIN_VALUE_LEN] { diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 365ff33..0795ba7 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -4,4 +4,4 @@ pub mod codec; -pub use codec::{Catalog, CatalogRowKind, CompactionStateRow, ReaderPinValue, RekeyStateRow}; +pub use codec::{Catalog, CatalogRowKind, ReaderPinValue, RekeyStateRow}; diff --git a/src/compaction/full.rs b/src/compaction/full.rs index 2b2ea4d..c21d557 100644 --- a/src/compaction/full.rs +++ b/src/compaction/full.rs @@ -1,28 +1,20 @@ //! Full one-shot compaction (entry point: [`compact_now`]): //! -//! 1. Collects all live data from main and catalog trees into memory. -//! 2. Drains eligible deferred-free pages (now eligible since no pinned reader -//! can observe them). -//! 3. Writes fresh compacted trees starting at page 4, producing a dense layout. -//! 4. Commits a new header with the updated roots and reduced `next_page_id`. -//! 5. Truncates main.db if no reader pins the old high-water-mark range. -//! 6. Repacks segment files whose garbage ratio exceeds 5%. +//! 1. Refuses while any reader pins the page range (relocation/truncation is +//! unsafe under a pinned reader). +//! 2. Crash-atomically repacks the main + catalog trees into a dense low-address +//! layout and truncates the reclaimed tail (see [`super::repack`]). +//! 3. Repacks segment files whose garbage ratio exceeds 5%. -use crate::btree::BTree; +use crate::Result; use crate::errors::PagedbError; -use crate::pager::header::commit_header; -use crate::pager::structural_header::MainDbHeaderFields; use crate::segment::reader::SegmentReader; use crate::segment::types::SegmentPageKind; use crate::segment::writer::SegmentWriter; use crate::txn::db::Db; use crate::vfs::{Vfs, VfsFile}; -use crate::{CommitId, Result}; -use super::helpers::{ - collect_all_pairs, collect_catalog_split, find_segment_name_inner, list_all_segments_inner, - page_size_log2, replace_segment_compact, -}; +use super::helpers::{find_segment_name_inner, list_all_segments_inner, replace_segment_compact}; use super::types::CompactStats; /// Full online compaction. See module-level docs for the staged flow. @@ -31,7 +23,7 @@ use super::types::CompactStats; /// `EnteredSpan` guard: an entered span guard is `!Send` and would be held /// across the many `.await` points below, making the returned future `!Send` /// and thus uncallable from `Send` async contexts (e.g. the nodedb-lite -/// `#[async_trait]` StorageEngine impl, which requires `Send` futures). +/// `#[async_trait]` `StorageEngine` impl, which requires `Send` futures). pub async fn compact_now(db: &Db) -> Result { use tracing::Instrument; compact_now_inner(db) @@ -55,8 +47,6 @@ async fn compact_now_inner(db: &Db) -> Result { // commit can't recycle a page the repack now uses for live data. db.free_page_cache.lock().clear(); - let old_next_page_id = state.next_page_id; - // ── 1. Refuse while readers are pinned ─────────────────────────────────── // A dense repack relocates the current tree and truncates the file; pinned // readers (in-process or cross-process durable) still reference the old @@ -81,124 +71,19 @@ async fn compact_now_inner(db: &Db) -> Result { return Ok(result); } - // ── 2. Full repack (no readers pinned) ─────────────────────────────────── - - // Collect all live data in memory BEFORE any writes. - let main_pairs = if state.root_page_id != 0 { - let old_tree = BTree::open( - db.pager.clone(), - db.realm_id, - state.root_page_id, - old_next_page_id, - db.page_size, - ); - collect_all_pairs(&old_tree).await? - } else { - Vec::new() - }; - - // Collect catalog rows (housekeeping free-list rows dropped). - let cat_rows = collect_catalog_split(&db.pager, db.realm_id, &state).await?; - - // ── 3. Write fresh compacted trees starting at page 4 ──────────────────── - // Pages 0–3 are reserved (header slots A/B + two spares); never allocated. - let mut new_main = BTree::open( - db.pager.clone(), - db.realm_id, - 0, - 4, // first data page (pages 0-3 are reserved header slots) - db.page_size, - ); - new_main.bulk_load(main_pairs).await?; - new_main.flush().await?; - let new_root = new_main.root_page_id(); - let after_main = new_main.next_page_id(); - - let mut new_cat = BTree::open(db.pager.clone(), db.realm_id, 0, after_main, db.page_size); - new_cat.bulk_load(cat_rows).await?; - new_cat.flush().await?; - let new_cat_root = new_cat.root_page_id(); - let new_next = new_cat.next_page_id(); - - // Pages reclaimed = reduction in next_page_id (the dense layout is contiguous, - // and the durable free-list is reset to empty below). - result.main_db_pages_reclaimed = old_next_page_id.saturating_sub(new_next); - - // ── 4. Commit new header ───────────────────────────────────────────────── - let new_commit_id = state.latest_commit_id + 1; - let new_seq = state.seq + 1; - let counter_anchor = db.pager.pending_anchor(); - - let mut catalog_root_bytes = [0u8; 16]; - catalog_root_bytes[..8].copy_from_slice(&new_cat_root.to_le_bytes()); - catalog_root_bytes[8..].copy_from_slice(&new_commit_id.to_le_bytes()); - - let fields = MainDbHeaderFields { - format_version: 1, - cipher_id: db.cipher_id.as_byte(), - page_size_log2: page_size_log2(db.page_size)?, - flags: 0, - file_id: db.file_id, - kek_salt: db.kek_salt, - mk_epoch: db.mk_epoch.load(std::sync::atomic::Ordering::SeqCst), - seq: new_seq, - active_root_page_id: new_root, - active_root_txn_id: new_commit_id, - counter_anchor, - commit_id: CommitId(new_commit_id), - free_list_root: [0u8; 16], - catalog_root: catalog_root_bytes, - apply_journal_root_page_id: 0, - apply_journal_root_version: 0, - commit_history_root_page_id: 0, - commit_history_root_version: 0, - restore_mode: 0, - next_page_id: new_next, - commit_retain_policy_tag: 0, - commit_retain_policy_value: 0, - }; - - let hk_clone = { db.hk.read().clone() }; - let new_slot = commit_header( - &*db.vfs, - &db.main_db_path, - &hk_clone, - &fields, - state.active_slot, - db.page_size, - ) - .await?; - db.pager.commit_anchor(counter_anchor)?; - - state.root_page_id = new_root; - state.catalog_root_page_id = new_cat_root; - state.next_page_id = new_next; - state.active_slot = new_slot; - state.seq = new_seq; - state.latest_commit_id = new_commit_id; - state.commit_history_root_page_id = 0; - state.commit_history_root_version = 0; - // The dense repack relocates/truncates every page, so the old free-list is - // gone; the new layout starts with an empty free-list. - state.free_list_root_page_id = 0; - db.latest_commit - .store(new_commit_id, std::sync::atomic::Ordering::SeqCst); - - // ── 5. Truncate if no readers pin the old high-water range ─────────────── - // (No readers are pinned at this point — checked above — so truncation is safe.) - if new_next < old_next_page_id { - let new_size = new_next.saturating_mul(db.page_size as u64); - let old_size = old_next_page_id.saturating_mul(db.page_size as u64); - let mut f = db - .vfs - .open(&db.main_db_path, crate::vfs::types::OpenMode::ReadWrite) - .await?; - f.set_len(new_size).await?; - f.sync().await?; - result.bytes_truncated = old_size.saturating_sub(new_size); + // ── 2. Crash-atomic dense repack of the main + catalog trees ───────────── + // An empty free-list means every page below the high-water mark is live — + // the store is already dense, so there is nothing to reclaim and we skip the + // repack entirely (no wasted rewrite). Otherwise repack via a scratch file + + // atomic rename; main.db is never modified until the rename (see + // `super::repack`). + if state.free_list_root_page_id != 0 { + let repack = super::repack::atomic_dense_repack(db, &mut state).await?; + result.main_db_pages_reclaimed = repack.pages_reclaimed; + result.bytes_truncated = repack.bytes_truncated; } - // ── 6. Repack segments ──────────────────────────────────────────────────── + // ── 3. Repack segments ──────────────────────────────────────────────────── let all_segments = list_all_segments_inner(&db.pager, db.realm_id, &state).await?; for meta in all_segments { let live = crate::segment::writer::live_path(&meta.segment_id); diff --git a/src/compaction/helpers.rs b/src/compaction/helpers.rs index aaf5c7d..ce69a37 100644 --- a/src/compaction/helpers.rs +++ b/src/compaction/helpers.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::btree::BTree; -use crate::catalog::codec::{Catalog, CatalogRowKind, CompactionStateRow, SegmentMeta}; +use crate::catalog::codec::{Catalog, CatalogRowKind, SegmentMeta}; use crate::errors::PagedbError; use crate::pager::header::commit_header; use crate::pager::structural_header::MainDbHeaderFields; @@ -195,72 +195,6 @@ pub(super) async fn replace_segment_compact( Ok(()) } -/// Return the "next key strictly greater than `key`" for range scanning. -/// Appends a 0x00 byte, which gives the lexicographically smallest key -/// greater than `key`. If `key` is empty, returns empty (scan from start). -pub(super) fn next_key_after(key: &[u8]) -> Vec { - if key.is_empty() { - Vec::new() - } else { - let mut v = key.to_vec(); - v.push(0x00); - v - } -} - -/// Collect at most `limit` key-value pairs from `[start..end)` in `tree`. -pub(super) async fn collect_range_limited( - tree: &BTree, - start: &[u8], - end: &[u8], - limit: u64, -) -> Result, Vec)>> { - let mut pairs = tree.collect_range(start, end).await?; - #[allow(clippy::cast_possible_truncation)] - if pairs.len() as u64 > limit { - pairs.truncate(limit as usize); - } - Ok(pairs) -} - -/// Load the compaction watermark from the catalog, if present. -pub(super) async fn load_compaction_state( - pager: &Arc>, - realm_id: crate::RealmId, - state: &WriterState, -) -> Result> { - if state.catalog_root_page_id == 0 { - return Ok(None); - } - let tree = BTree::open( - pager.clone(), - realm_id, - state.catalog_root_page_id, - state.next_page_id, - pager.page_size(), - ); - let key = Catalog::compaction_state_key(); - match tree.get(&key).await? { - Some(bytes) => { - let cs = Catalog::decode_compaction_state(&bytes)?; - Ok(Some(cs)) - } - None => Ok(None), - } -} - -/// Load just the `frontier_page_id` from the watermark (for progress reporting). -pub(super) async fn load_frontier_page_id( - pager: &Arc>, - realm_id: crate::RealmId, - state: &WriterState, -) -> Option { - match load_compaction_state(pager, realm_id, state).await { - Ok(Some(cs)) => Some(cs.frontier_page_id), - _ => None, - } -} - /// Build [`MainDbHeaderFields`] for a compaction commit. /// /// Compaction relocates/truncates pages, so it discards the commit-history diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 3bfc431..9b92298 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -9,6 +9,7 @@ mod full; mod helpers; +mod repack; mod step; mod types; diff --git a/src/compaction/repack.rs b/src/compaction/repack.rs new file mode 100644 index 0000000..8f812de --- /dev/null +++ b/src/compaction/repack.rs @@ -0,0 +1,258 @@ +//! Crash-atomic dense repack of the main and catalog B+ trees. +//! +//! The compacted trees are built into a scratch file (`main.db.compact`) that is +//! then **atomically renamed** over `main.db`. `main.db` is never modified until +//! the rename, so a failure or crash at any point before it leaves the original +//! store fully intact (the orphaned scratch is removed on the next open); after +//! the rename the compacted store is live. The rename is the single commit point. +//! +//! Pages are written through the existing pager (one nonce counter — no reuse) +//! to the scratch path; the scratch is a bit-identical `main.db` (same AAD), so +//! it opens directly once renamed. The data is written once. + +use crate::Result; +use crate::btree::BTree; +use crate::pager::header::{ActiveSlot, commit_header}; +use crate::txn::db::{Db, WriterState}; +use crate::vfs::types::OpenMode; +use crate::vfs::{Vfs, VfsFile}; + +use super::helpers::{collect_all_pairs, collect_catalog_split, make_header_fields}; + +/// Outcome of an atomic dense repack. +pub(super) struct RepackOutcome { + pub pages_reclaimed: u64, + pub bytes_truncated: u64, +} + +/// The compacted layout built in the scratch file, awaiting the atomic rename. +pub(super) struct PendingSwap { + new_root: u64, + new_cat_root: u64, + new_next: u64, + counter_anchor: u64, + old_next: u64, +} + +/// Path of the scratch file compaction builds before renaming it over main.db. +fn scratch_path(db: &Db) -> String { + format!("{}.compact", db.main_db_path) +} + +/// Rebuild the main + catalog trees into a dense, low-addressed layout, crash- +/// atomically (see module docs). On success `state` points at the compacted +/// layout. On failure the partially built scratch and the never-persisted cache +/// pages are discarded and `state`/main.db are left untouched. +pub(super) async fn atomic_dense_repack( + db: &Db, + state: &mut WriterState, +) -> Result { + let scratch = scratch_path(db); + // Remove any leftover scratch from an earlier interrupted compaction. + db.vfs.remove(&scratch).await.ok(); + + match build_scratch(db, state, &scratch).await { + Ok(pending) => commit_swap(db, state, &scratch, pending).await, + Err(e) => { + // Nothing was written to main.db; drop the never-persisted compacted + // pages from the cache so the old tree is read back from disk, and + // remove the partial scratch. + db.pager.reset_main_pages(); + db.vfs.remove(&scratch).await.ok(); + Err(e) + } + } +} + +/// Build the compacted trees into `scratch` (a complete, bit-identical main.db) +/// without touching the live main.db or `state`. The compacted pages are written +/// through the pager cache then flushed to `scratch`; the scratch header is +/// written last. Returns the layout to be swapped in. +pub(super) async fn build_scratch( + db: &Db, + state: &WriterState, + scratch: &str, +) -> Result { + let old_next = state.next_page_id; + + // Collect all live data, then bulk-load dense trees starting at page 4 + // (pages 0-3 are reserved header/spare slots). These writes land in the + // pager cache as dirty main pages; they are flushed to the scratch file, + // never to main.db. + let main_pairs = if state.root_page_id != 0 { + let old = BTree::open( + db.pager.clone(), + db.realm_id, + state.root_page_id, + old_next, + db.page_size, + ); + collect_all_pairs(&old).await? + } else { + Vec::new() + }; + + // Drop the compaction watermark row: a fully repacked tree is compact, so + // any resumable `compact_step` state is stale and must not carry forward. + let cs_prefix = crate::catalog::codec::CatalogRowKind::CompactionState as u8; + let cat_rows: Vec<(Vec, Vec)> = collect_catalog_split(&db.pager, db.realm_id, state) + .await? + .into_iter() + .filter(|(k, _)| k.first() != Some(&cs_prefix)) + .collect(); + + let mut new_main = BTree::open(db.pager.clone(), db.realm_id, 0, 4, db.page_size); + new_main.bulk_load(main_pairs).await?; + let new_root = new_main.root_page_id(); + let after_main = new_main.next_page_id(); + + let mut new_cat = BTree::open(db.pager.clone(), db.realm_id, 0, after_main, db.page_size); + new_cat.bulk_load(cat_rows).await?; + let new_cat_root = new_cat.root_page_id(); + let new_next = new_cat.next_page_id(); + + // Pre-size and zero the scratch file so the reserved header/spare pages + // (0-3, and the unused B header slot) are well-defined. + { + let mut f = db.vfs.open(scratch, OpenMode::CreateOrOpen).await?; + f.set_len(new_next.saturating_mul(db.page_size as u64)) + .await?; + f.sync().await?; + } + + // Flush the compacted data pages to the scratch file. + db.pager.flush_main_to(db.realm_id, scratch).await?; + + // Write the scratch header into slot A (passing B as "previous" so + // `commit_header` targets A). The anchor is snapshotted after the flush so + // it covers every nonce the flush consumed. + let new_commit_id = state.latest_commit_id + 1; + let new_seq = state.seq + 1; + let counter_anchor = db.pager.pending_anchor(); + let fields = make_header_fields( + db, + state, + new_commit_id, + new_seq, + counter_anchor, + new_root, + new_cat_root, + new_next, + 0, + ); + let hk_clone = { db.hk.read().clone() }; + commit_header( + &*db.vfs, + scratch, + &hk_clone, + &fields, + ActiveSlot::B, + db.page_size, + ) + .await?; + + Ok(PendingSwap { + new_root, + new_cat_root, + new_next, + counter_anchor, + old_next, + }) +} + +/// Atomically swap the scratch file in for main.db and advance `state`. This is +/// the commit point: a crash before the rename keeps the old store; after it the +/// compacted store is live. +async fn commit_swap( + db: &Db, + state: &mut WriterState, + scratch: &str, + pending: PendingSwap, +) -> Result { + // Close the cached main.db handle so the rename can replace the file + // (Windows) and the next access reopens the new inode (Unix). + db.pager.close_main_handle().await; + db.vfs.rename(scratch, &db.main_db_path).await?; + db.vfs.sync_dir("/").await.ok(); + + db.pager.commit_anchor(pending.counter_anchor)?; + // The cached pages were sealed for the scratch file (now main.db); drop them + // so reads re-fetch from the renamed file rather than serving stale entries. + db.pager.reset_main_pages(); + + let new_commit_id = state.latest_commit_id + 1; + state.root_page_id = pending.new_root; + state.catalog_root_page_id = pending.new_cat_root; + state.next_page_id = pending.new_next; + state.active_slot = ActiveSlot::A; + state.seq += 1; + state.latest_commit_id = new_commit_id; + state.commit_history_root_page_id = 0; + state.commit_history_root_version = 0; + state.free_list_root_page_id = 0; + db.latest_commit + .store(new_commit_id, std::sync::atomic::Ordering::SeqCst); + + let pages_reclaimed = pending.old_next.saturating_sub(pending.new_next); + Ok(RepackOutcome { + pages_reclaimed, + bytes_truncated: pages_reclaimed.saturating_mul(db.page_size as u64), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::vfs::memory::MemVfs; + use crate::{Db, RealmId}; + + /// Model a crash *before* the atomic rename: build the scratch file but never + /// swap it in, then reopen. main.db must be untouched and every value — + /// including overflow-backed ones — intact, proving the pre-rename work is + /// non-destructive. The orphaned scratch must also be gone after reopen. + #[tokio::test(flavor = "current_thread")] + async fn crash_before_rename_leaves_old_store_intact() { + let vfs = MemVfs::new(); + let kek = [0x42u8; 32]; + let realm = RealmId::new([0x7u8; 16]); + let big = vec![0xABu8; 2048]; // > page_size/4 → overflow chain + let n = 50u32; + + { + let db = Db::open_internal(vfs.clone(), kek, 4096, realm) + .await + .unwrap(); + { + let mut w = db.begin_write().await.unwrap(); + for i in 0..n { + w.put(format!("k-{i:04}").as_bytes(), &big).await.unwrap(); + } + w.commit().await.unwrap(); + } + // Build the scratch copy but never commit the swap (simulated crash). + { + let state = db.writer.lock().await; + let scratch = scratch_path(&db); + build_scratch(&db, &state, &scratch).await.unwrap(); + } + } + + // Reopen: main.db is the original, intact; the orphan scratch is removed. + let db2 = Db::open_existing(vfs.clone(), kek, 4096, realm) + .await + .unwrap(); + let r = db2.begin_read().await.unwrap(); + for i in 0..n { + let key = format!("k-{i:04}"); + assert_eq!( + r.get(key.as_bytes()).await.unwrap().as_deref(), + Some(big.as_slice()), + "value {key} lost after a crash before the compaction rename" + ); + } + assert!( + vfs.open("/main.db.compact", OpenMode::Read).await.is_err(), + "orphaned compaction scratch should be removed on open" + ); + } +} diff --git a/src/compaction/step.rs b/src/compaction/step.rs index 780fef6..cd097d3 100644 --- a/src/compaction/step.rs +++ b/src/compaction/step.rs @@ -1,326 +1,40 @@ -//! Incremental, budget-bounded compaction (entry point: [`compact_step`]). +//! Single-shot compaction exposed under the [`compact_step`] entry point. +//! +//! Compaction shrinks main.db by rebuilding it atomically (scratch file + +//! rename — see [`super::repack`]). A full rewrite cannot be safely chunked +//! across writer-lock releases: commits made by other writers between chunks +//! would be silently dropped by the final rename. So `compact_step` performs the +//! whole atomic compaction in a single call and reports completion. +//! +//! This is not a limitation of reclamation in general — sustained-write growth +//! is bounded continuously by the durable free-list (every commit reuses freed +//! pages), so returning space to the OS is a maintenance operation rather than a +//! hot path. use crate::Result; -use crate::btree::BTree; -use crate::catalog::codec::{Catalog, CompactionStateRow}; -use crate::errors::PagedbError; -use crate::pager::header::commit_header; use crate::txn::db::Db; -use crate::vfs::{Vfs, VfsFile}; +use crate::vfs::Vfs; -use super::helpers::{ - collect_all_pairs, collect_catalog_split, collect_range_limited, load_compaction_state, - load_frontier_page_id, make_header_fields, next_key_after, -}; use super::types::{CompactBudget, CompactProgress}; -/// Incremental compaction step. +/// Run a full compaction and report what it reclaimed. /// -/// Processes up to `budget.max_pages_relocated` key-value pairs per call, -/// holding the writer lock only for the duration of a single batch commit. -/// After each call the watermark row (`CatalogRowKind::CompactionState`) is -/// updated atomically with the batch commit. The next call resumes from the -/// persisted frontier. +/// `budget` is accepted for interface stability with the incremental-style API +/// but does not chunk the work (see the module docs for why a full rewrite can't +/// be safely chunked). Compaction runs atomically to completion and always +/// returns `more_work = false`; to reclaim periodically, call this again later. /// /// Returns `PagedbError::Unsupported` if the handle is not in `Standalone` mode. -#[allow(clippy::too_many_lines)] pub async fn compact_step( db: &Db, budget: CompactBudget, ) -> Result { - if !matches!(db.mode, crate::txn::mode::DbMode::Standalone) { - return Err(PagedbError::Unsupported); - } - - let mut state = db.writer.lock().await; - - // Relocation/truncation below invalidates cached reuse page ids; drop them. - db.free_page_cache.lock().clear(); - - // ── Refuse while readers are pinned ─────────────────────────────────────── - // Relocation/truncation is unsafe under a pinned reader (in-process or - // cross-process durable). Runtime free-page reuse reclaims space on ordinary - // commits, so compaction simply waits for the readers to drop. - let has_readers = { - let in_mem_min = { - let readers = db.tracked_readers.lock(); - readers - .iter() - .map(|r| r.commit_id.0) - .min() - .unwrap_or(u64::MAX) - }; - let durable_min = db - .min_durable_reader_commit(state.catalog_root_page_id, state.next_page_id) - .await; - in_mem_min.min(durable_min) < u64::MAX - }; - if has_readers { - let wm = load_frontier_page_id(&db.pager, db.realm_id, &state).await; - return Ok(CompactProgress { - pages_relocated: 0, - bytes_freed: 0, - more_work: true, - watermark: wm, - }); - } - - // ── No readers pinned: do a compaction batch ────────────────────────────── - - let compaction_state = load_compaction_state(&db.pager, db.realm_id, &state).await?; - - let (frontier_key, started_at_commit_id, total_pages_estimate) = - if let Some(cs) = &compaction_state { - ( - cs.frontier_key.clone(), - cs.started_at_commit_id, - cs.total_pages_estimate, - ) - } else { - let est = state.next_page_id.saturating_sub(4); - (Vec::new(), state.latest_commit_id, est) - }; - - let old_next_page_id = state.next_page_id; - - // Collect the next batch of pairs from the current live main tree. - let pairs_batch: Vec<(Vec, Vec)> = if state.root_page_id != 0 { - let source = BTree::open( - db.pager.clone(), - db.realm_id, - state.root_page_id, - state.next_page_id, - db.page_size, - ); - let end_sentinel = vec![0xFFu8; 256]; - let start = next_key_after(&frontier_key); - collect_range_limited(&source, &start, &end_sentinel, budget.max_pages_relocated).await? - } else { - Vec::new() - }; - - let pairs_count = pairs_batch.len() as u64; - - // If the batch came back empty AND there was no prior frontier (fresh call - // on an already-empty or truly compact tree), nothing to do. - if pairs_count == 0 && frontier_key.is_empty() && compaction_state.is_none() { - return Ok(CompactProgress { - pages_relocated: 0, - bytes_freed: 0, - more_work: false, - watermark: None, - }); - } - - // Fewer items than budget → this is the final batch. - let session_complete = pairs_count < budget.max_pages_relocated; - - if session_complete { - // Final pass: do the full repack starting at page 4, then truncate. - let main_pairs = if state.root_page_id != 0 { - let old_tree = BTree::open( - db.pager.clone(), - db.realm_id, - state.root_page_id, - old_next_page_id, - db.page_size, - ); - collect_all_pairs(&old_tree).await? - } else { - Vec::new() - }; - - let cat_rows = collect_catalog_split(&db.pager, db.realm_id, &state).await?; - - let mut new_main = BTree::open(db.pager.clone(), db.realm_id, 0, 4, db.page_size); - new_main.bulk_load(main_pairs).await?; - new_main.flush().await?; - let new_root = new_main.root_page_id(); - let after_main = new_main.next_page_id(); - - // Remove compaction-state row from the catalog being rebuilt. - let cs_key_prefix = crate::catalog::codec::CatalogRowKind::CompactionState as u8; - let mut cat_all: Vec<(Vec, Vec)> = cat_rows - .into_iter() - .filter(|(k, _)| k.first() != Some(&cs_key_prefix)) - .collect(); - cat_all.sort_by(|(a, _), (b, _)| a.cmp(b)); - let mut new_cat = BTree::open(db.pager.clone(), db.realm_id, 0, after_main, db.page_size); - new_cat.bulk_load(cat_all).await?; - new_cat.flush().await?; - let new_cat_root = new_cat.root_page_id(); - let new_next = new_cat.next_page_id(); - - let pages_reclaimed = old_next_page_id.saturating_sub(new_next); - - let new_commit_id = state.latest_commit_id + 1; - let new_seq = state.seq + 1; - let counter_anchor = db.pager.pending_anchor(); - // Dense repack: the relocated layout starts with an empty free-list. - let fields = make_header_fields( - db, - &state, - new_commit_id, - new_seq, - counter_anchor, - new_root, - new_cat_root, - new_next, - 0, - ); - let hk_clone = { db.hk.read().clone() }; - let new_slot = commit_header( - &*db.vfs, - &db.main_db_path, - &hk_clone, - &fields, - state.active_slot, - db.page_size, - ) - .await?; - db.pager.commit_anchor(counter_anchor)?; - - state.root_page_id = new_root; - state.catalog_root_page_id = new_cat_root; - state.next_page_id = new_next; - state.active_slot = new_slot; - state.seq = new_seq; - state.latest_commit_id = new_commit_id; - state.commit_history_root_page_id = 0; - state.commit_history_root_version = 0; - state.free_list_root_page_id = 0; - db.latest_commit - .store(new_commit_id, std::sync::atomic::Ordering::SeqCst); - - let mut bytes_freed = 0u64; - if new_next < old_next_page_id { - let new_size = new_next.saturating_mul(db.page_size as u64); - let old_size = old_next_page_id.saturating_mul(db.page_size as u64); - let mut f = db - .vfs - .open(&db.main_db_path, crate::vfs::types::OpenMode::ReadWrite) - .await?; - f.set_len(new_size).await?; - f.sync().await?; - bytes_freed = old_size.saturating_sub(new_size); - } - - return Ok(CompactProgress { - pages_relocated: pages_reclaimed, - bytes_freed, - more_work: false, - watermark: None, - }); - } - - // ── Intermediate step: re-insert batch via CoW to defragment ───────────── - let new_frontier_key = pairs_batch - .last() - .map_or_else(|| frontier_key.clone(), |(k, _)| k.clone()); - - let mut cat_tree = BTree::open( - db.pager.clone(), - db.realm_id, - state.catalog_root_page_id, - state.next_page_id, - db.page_size, - ); - - // Re-insert the batch: delete+put forces page reallocation to low-address - // free slots. Pages freed by the delete are recycled within this same txn - // (the allocator pops the per-session freed list), so the subsequent put - // reuses them — densifying the layout into the low page-id range. - let mut main_tree = BTree::open( - db.pager.clone(), - db.realm_id, - state.root_page_id, - state.next_page_id, - db.page_size, - ); - for (k, v) in &pairs_batch { - main_tree.delete(k).await?; - main_tree.put(k, v).await?; - } - - main_tree.flush().await?; - // main_tree.flush() may allocate pages while materializing the dirty-leaf - // cache. Sync the catalog tree forward so its flush doesn't reuse a - // page_id the main tree just claimed. - cat_tree.set_next_page_id(main_tree.next_page_id()); - cat_tree.flush().await?; - - let new_main_root = main_tree.root_page_id(); - let after_main_step = main_tree.next_page_id(); - let cat_root_step = cat_tree.root_page_id(); - let next_step = cat_tree.next_page_id().max(after_main_step); - - // Update compaction watermark. - let mut cat_tree2 = BTree::open( - db.pager.clone(), - db.realm_id, - cat_root_step, - next_step, - db.page_size, - ); - let new_cs = CompactionStateRow { - target_root: state.root_page_id, - frontier_page_id: next_step, - started_at_commit_id, - total_pages_estimate, - frontier_key: new_frontier_key.clone(), - }; - let cs_key = Catalog::compaction_state_key(); - let cs_val = Catalog::encode_compaction_state(&new_cs)?; - cat_tree2.put(&cs_key, &cs_val).await?; - cat_tree2.flush().await?; - let final_cat_root = cat_tree2.root_page_id(); - let final_next = cat_tree2.next_page_id().max(next_step); - - let new_commit_id = state.latest_commit_id + 1; - let new_seq = state.seq + 1; - let counter_anchor = db.pager.pending_anchor(); - // An intermediate step only relocates main-tree pages; it touches neither - // the durable free-list chain nor its free pages, so the free-list stays - // valid and is preserved across the step (the pre-existing free pages remain - // reusable by ordinary writes). The final dense-repack batch resets it. - let fields = make_header_fields( - db, - &state, - new_commit_id, - new_seq, - counter_anchor, - new_main_root, - final_cat_root, - final_next, - state.free_list_root_page_id, - ); - let hk_clone = { db.hk.read().clone() }; - let new_slot = commit_header( - &*db.vfs, - &db.main_db_path, - &hk_clone, - &fields, - state.active_slot, - db.page_size, - ) - .await?; - db.pager.commit_anchor(counter_anchor)?; - - state.root_page_id = new_main_root; - state.catalog_root_page_id = final_cat_root; - state.next_page_id = final_next; - state.active_slot = new_slot; - state.seq = new_seq; - state.latest_commit_id = new_commit_id; - state.commit_history_root_page_id = 0; - state.commit_history_root_version = 0; - db.latest_commit - .store(new_commit_id, std::sync::atomic::Ordering::SeqCst); - + let _ = budget; + let stats = super::full::compact_now(db).await?; Ok(CompactProgress { - pages_relocated: pairs_count, - bytes_freed: 0, - more_work: true, - watermark: Some(next_step), + pages_relocated: stats.main_db_pages_reclaimed, + bytes_freed: stats.bytes_truncated, + more_work: false, + watermark: None, }) } diff --git a/src/lib.rs b/src/lib.rs index 67a235e..5e4e241 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,6 @@ pub mod snapshot; pub mod txn; pub mod vfs; -pub use catalog::codec::CompactionStateRow; pub use catalog::codec::{RealmQuotas, SegmentKind, SegmentMeta}; pub use compaction::{CompactBudget, CompactProgress, CompactStats}; pub use crypto::CipherId; diff --git a/src/pager/cache.rs b/src/pager/cache.rs index 6857948..d382472 100644 --- a/src/pager/cache.rs +++ b/src/pager/cache.rs @@ -273,6 +273,36 @@ impl PageCache { .filter_map(|(f, p)| if *f == file { Some(*p) } else { None }) .collect() } + + /// Drop every (unpinned) entry for `file`, including dirty ones, so that the + /// cached plaintext is discarded and later reads re-fetch from disk. Used + /// when compaction replaces the backing file (the cached pages no longer + /// match what is on disk). Pinned entries are left untouched. + pub fn clear_file(&mut self, file: FileKey) { + let keys: Vec<(FileKey, u64)> = self + .map + .keys() + .filter(|(f, _)| *f == file) + .copied() + .collect(); + for key in keys { + if self.pins.get(&key).copied().unwrap_or(0) > 0 { + continue; + } + if let Some(idx) = self.map.remove(&key) { + let (prev, next) = { + let node = self.slab[idx].as_ref().expect("indexed node alive"); + (node.prev, node.next) + }; + if self.hand == Some(idx) { + self.hand = next; + } + self.unlink_node(idx, prev, next); + self.free_node(idx); + } + self.dirty.remove(&key); + } + } } #[cfg(test)] diff --git a/src/pager/core.rs b/src/pager/core.rs index a473ee7..4982d52 100644 --- a/src/pager/core.rs +++ b/src/pager/core.rs @@ -358,13 +358,39 @@ impl Pager { /// Flush all dirty main.db pages to the VFS in physical-id order. pub async fn flush_main(&self, realm_id: RealmId) -> Result<()> { tracing::debug!(name = "pager.flush", "flushing dirty main db pages"); - self.flush_file(FileKey::Main, realm_id, MAIN_DB_SEGMENT_ID) + self.flush_file(FileKey::Main, realm_id, MAIN_DB_SEGMENT_ID, None) .await } + /// Flush all dirty main.db pages to an alternate file `dest_path` (rather + /// than the live main.db), clearing their dirty flags. Pages are sealed with + /// the same AAD as a normal main flush, so the destination is a bit-identical + /// main.db. Used by compaction to build a compacted copy that is then + /// atomically renamed into place. + pub async fn flush_main_to(&self, realm_id: RealmId, dest_path: &str) -> Result<()> { + self.flush_file(FileKey::Main, realm_id, MAIN_DB_SEGMENT_ID, Some(dest_path)) + .await + } + + /// Drop all cached main.db pages so subsequent reads re-fetch from disk. + /// Used after compaction replaces main.db (the cached pages no longer match + /// the on-disk file) and on a failed compaction (to discard the partially + /// built, never-persisted compacted pages). + pub fn reset_main_pages(&self) { + self.inner.buffer_pool.lock().clear_file(FileKey::Main); + } + + /// Close the cached main.db file handle. The next access reopens the file. + /// Required around an atomic rename over main.db: closing first lets the + /// rename replace the file on platforms that reject replacing an open file + /// (Windows), and reopening afterwards picks up the new inode (Unix). + pub async fn close_main_handle(&self) { + self.files.lock().await.remove(&FileKey::Main); + } + /// Flush all dirty pages for one segment to the VFS in physical-id order. pub async fn flush_segment(&self, segment_id: [u8; 16], realm_id: RealmId) -> Result<()> { - self.flush_file(FileKey::Segment(segment_id), realm_id, segment_id) + self.flush_file(FileKey::Segment(segment_id), realm_id, segment_id, None) .await } @@ -588,6 +614,7 @@ impl Pager { file: FileKey, realm_id: RealmId, segment_id: [u8; 16], + dest_path: Option<&str>, ) -> Result<()> { let dirty_ids = self.inner.cache_for_key(file).lock().dirty_for_file(file); if dirty_ids.is_empty() { @@ -678,8 +705,14 @@ impl Pager { buf: wire, }); } - let file_handle = self.open_file_handle(file).await?; - { + if let Some(path) = dest_path { + // Alternate destination (compaction's compacted copy): open it + // directly, never via the cached main handle. + let mut f = self.vfs.open(path, OpenMode::CreateOrOpen).await?; + f.write_at_vectored(&reqs).await?; + f.sync().await?; + } else { + let file_handle = self.open_file_handle(file).await?; let mut f = file_handle.lock().await; f.write_at_vectored(&reqs).await?; f.sync().await?; diff --git a/src/txn/db/open.rs b/src/txn/db/open.rs index 7437502..6b92af8 100644 --- a/src/txn/db/open.rs +++ b/src/txn/db/open.rs @@ -241,6 +241,14 @@ impl Db { let main_db_path = "/main.db".to_string(); + // An interrupted compaction may leave a scratch copy behind; its rename + // never happened, so main.db is authoritative and the scratch is stale. + // Only a handle that holds the exclusive writer lock may remove it — an + // Observer could otherwise delete a live writer's in-progress compaction. + if matches!(mode, DbMode::Standalone | DbMode::Follower) { + vfs.remove(&format!("{main_db_path}.compact")).await.ok(); + } + // Read both raw header pages to extract kek_salt and mk_epoch without // yet knowing HK. We try each header page's own kek_salt to derive HK // and verify, then pick the winner by seq. From 13122148b857c50fe3ce35c8e508b0f3bb37d527 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Mon, 15 Jun 2026 08:04:00 +0800 Subject: [PATCH 4/4] test: cover large-payload and crash-atomic compaction --- tests/compaction_basic.rs | 124 ++++++++++++++++++++++++++++++++ tests/compaction_incremental.rs | 113 ++++++++++++----------------- 2 files changed, 169 insertions(+), 68 deletions(-) diff --git a/tests/compaction_basic.rs b/tests/compaction_basic.rs index f478ecf..b1dd082 100644 --- a/tests/compaction_basic.rs +++ b/tests/compaction_basic.rs @@ -120,6 +120,130 @@ async fn compact_truncates_main_db() { } } +// ─── Large (overflow-backed) values survive a full repack ───────────────────── + +/// Values larger than the inline threshold (`PAGE / 4`) are stored as overflow +/// chains. A full `compact_now` repack must reconstruct those chains, not try to +/// inline the resolved bytes into a single leaf (which exceeds leaf capacity), +/// and must leave the store readable. +#[tokio::test(flavor = "current_thread")] +async fn compact_now_preserves_large_overflow_values() { + let vfs = MemVfs::new(); + let db = Db::open_internal(vfs.clone(), KEK, PAGE, REALM) + .await + .unwrap(); + + let big = vec![0xCDu8; 4096]; // > PAGE/4 (1024) → overflow chain + let small = vec![0x07u8; 48]; + let n = 120u32; + { + let mut w = db.begin_write().await.unwrap(); + for i in 0..n { + let key = format!("k-{i:05}"); + let val = if i % 2 == 0 { + big.as_slice() + } else { + small.as_slice() + }; + w.put(key.as_bytes(), val).await.unwrap(); + } + w.commit().await.unwrap(); + } + + db.compact_now().await.unwrap(); + + // All values intact and correct immediately after compaction. + { + let r = db.begin_read().await.unwrap(); + for i in 0..n { + let key = format!("k-{i:05}"); + let want = if i % 2 == 0 { &big } else { &small }; + assert_eq!( + r.get(key.as_bytes()).await.unwrap().as_deref(), + Some(want.as_slice()), + "value mismatch at {key} after compaction" + ); + } + } + + // And the store reopens cleanly — a partial/non-atomic repack would brick it + // with an AEAD tag failure here. + drop(db); + let db2 = Db::open_existing(vfs, KEK, PAGE, REALM).await.unwrap(); + let r = db2.begin_read().await.unwrap(); + let k0 = format!("k-{:05}", 0); + assert_eq!( + r.get(k0.as_bytes()).await.unwrap().as_deref(), + Some(big.as_slice()), + "large value lost after reopen" + ); +} + +/// A compaction that cannot complete must leave the store fully readable: it has +/// to roll back, never persist a half-built tree. Guards against the failure +/// mode where a partial repack leaves orphaned dirty pages that a later ordinary +/// commit flushes over the live tree, corrupting it on the next open. +#[tokio::test(flavor = "current_thread")] +async fn compaction_then_commit_keeps_large_values_readable_on_reopen() { + let vfs = MemVfs::new(); + let db = Db::open_internal(vfs.clone(), KEK, PAGE, REALM) + .await + .unwrap(); + + let big = vec![0x5Au8; 4096]; + let small = vec![0x11u8; 48]; + // Small values sort first ("a-*") and fill several leaves that a repack + // writes successfully; the big values ("z-*") come later and trip the + // failure mid-write, leaving partially-written pages behind. + let n_small = 300u32; + let n_big = 5u32; + { + let mut w = db.begin_write().await.unwrap(); + for i in 0..n_small { + w.put(format!("a-{i:05}").as_bytes(), &small).await.unwrap(); + } + for i in 0..n_big { + w.put(format!("z-{i:05}").as_bytes(), &big).await.unwrap(); + } + w.commit().await.unwrap(); + } + + // Attempt compaction; whatever its outcome, the store must stay consistent. + let _ = db.compact_now().await; + + // A subsequent ordinary commit must not flush a half-built repack over the + // live tree. + { + let mut w = db.begin_write().await.unwrap(); + w.put(b"sentinel", b"ok").await.unwrap(); + w.commit().await.unwrap(); + } + + drop(db); + let db2 = Db::open_existing(vfs, KEK, PAGE, REALM).await.unwrap(); + let r = db2.begin_read().await.unwrap(); + for i in 0..n_small { + let key = format!("a-{i:05}"); + assert_eq!( + r.get(key.as_bytes()).await.unwrap().as_deref(), + Some(small.as_slice()), + "small value {key} lost/corrupted after compaction + commit + reopen" + ); + } + for i in 0..n_big { + let key = format!("z-{i:05}"); + assert_eq!( + r.get(key.as_bytes()).await.unwrap().as_deref(), + Some(big.as_slice()), + "large value {key} lost/corrupted after compaction + commit + reopen" + ); + } + assert_eq!( + r.get(b"sentinel").await.unwrap().as_deref(), + Some(b"ok".as_slice()) + ); +} + // ─── Test 3: compact_now repacks segments ───────────────────────────────────── #[tokio::test(flavor = "current_thread")] diff --git a/tests/compaction_incremental.rs b/tests/compaction_incremental.rs index 83d1d62..678a648 100644 --- a/tests/compaction_incremental.rs +++ b/tests/compaction_incremental.rs @@ -47,18 +47,18 @@ async fn compact_now_no_free_pages() { let _ = stats; // stats are informational; just verify it completes } -// ─── Test 3: compact_step advances watermark in each call ──────────────────── +// ─── Test 3: compact_step reclaims freed space and reports completion ───────── #[tokio::test(flavor = "current_thread")] -async fn compact_step_watermark_advances() { +async fn compact_step_reclaims_after_deletes() { let db = fresh_db().await; - // Write 100 keys and delete half to create free pages. + // Write 100 keys and delete half to free pages. { let mut txn = db.begin_write().await.unwrap(); for i in 0u32..100 { let key = format!("wm-{i:04}"); - txn.put(key.as_bytes(), b"watermark-test-value") + txn.put(key.as_bytes(), b"reclaim-test-value") .await .unwrap(); } @@ -73,35 +73,29 @@ async fn compact_step_watermark_advances() { txn.commit().await.unwrap(); } - // Run compaction in tiny batches of 5 and verify watermark is monotonically - // non-decreasing until completion. - let budget = CompactBudget::new(5, 10_000); - - let mut prev_watermark: Option = None; - let mut iterations = 0usize; - loop { - let prog = db.compact_step(budget).await.unwrap(); - iterations += 1; - - if let Some(wm) = prog.watermark { - if let Some(prev) = prev_watermark { - assert!( - wm >= prev, - "watermark must not decrease: prev={prev} current={wm}" - ); - } - prev_watermark = Some(wm); - } else { - // Watermark cleared = session complete. - assert!(!prog.more_work); - break; - } + // Compaction runs to completion in one call (a full rewrite cannot be safely + // chunked across lock releases) and reclaims the freed pages. + let prog = db.compact_step(CompactBudget::default()).await.unwrap(); + assert!(!prog.more_work, "compaction completes in a single call"); + assert!( + prog.pages_relocated > 0, + "expected reclaimed pages after deleting half the keys" + ); - if !prog.more_work { - break; - } + // A second call has nothing to reclaim. + let prog2 = db.compact_step(CompactBudget::default()).await.unwrap(); + assert!(!prog2.more_work); + assert_eq!(prog2.pages_relocated, 0, "already compact: nothing to do"); - assert!(iterations < 10_000, "compaction did not converge"); + // Surviving keys remain readable. + let r = db.begin_read().await.unwrap(); + for i in (1u32..100).step_by(2) { + let key = format!("wm-{i:04}"); + assert_eq!( + r.get(key.as_bytes()).await.unwrap().as_deref(), + Some(b"reclaim-test-value".as_slice()), + "{key} must survive compaction" + ); } } @@ -281,52 +275,35 @@ async fn compact_step_reopen_history_consistent() { ); } -// ─── Test: an intermediate compact_step preserves the durable free-list ────── +/// `compact_step` runs the same atomic repack as `compact_now`, so it must +/// preserve overflow (large) values rather than failing or corrupting the store. #[tokio::test(flavor = "current_thread")] -async fn compact_step_preserves_free_list() { - // No commit history, so freed pages are immediately reclaimable and tracked - // in the durable free-list with no reader/history pinning them. - let opts = pagedb::options::OpenOptions::default() - .with_commit_history_retain(pagedb::options::RetainPolicy::Disabled); - let db = Db::open_internal_with_options(MemVfs::new(), KEK, PAGE, REALM, opts) +async fn compact_step_preserves_large_overflow_values() { + let vfs = MemVfs::new(); + let db = Db::open_internal(vfs.clone(), KEK, PAGE, REALM) .await .unwrap(); - // Build a working set, then delete most of it to populate the free-list. + let big = vec![0x3Cu8; 4096]; // > PAGE/4 → overflow chain + let n = 60u32; { let mut w = db.begin_write().await.unwrap(); - for i in 0u32..300 { - w.put(format!("k{i:05}").as_bytes(), &[1u8; 128]) - .await - .unwrap(); + for i in 0..n { + w.put(format!("k-{i:05}").as_bytes(), &big).await.unwrap(); } w.commit().await.unwrap(); } - { - let mut w = db.begin_write().await.unwrap(); - for i in 0u32..250 { - w.delete(format!("k{i:05}").as_bytes()).await.unwrap(); - } - w.commit().await.unwrap(); - } - let pending_before = db.stats().await.unwrap().free_list_pending_entries; - assert!( - pending_before > 0, - "setup should have populated the free-list; got {pending_before}" - ); - // One intermediate step (small budget so it is NOT the final batch). - let prog = db - .compact_step(CompactBudget::new(5, 10_000)) - .await - .unwrap(); - assert!(prog.more_work, "small budget should leave more work"); + let prog = db.compact_step(CompactBudget::default()).await.unwrap(); + assert!(!prog.more_work, "compaction completes in a single call"); - // The pre-existing free-list must survive the intermediate step, not be - // wiped — those pages are still reusable by ordinary writes. - let pending_after = db.stats().await.unwrap().free_list_pending_entries; - assert!( - pending_after >= pending_before, - "intermediate compact_step wiped the durable free-list: {pending_before} -> {pending_after}" - ); + let r = db.begin_read().await.unwrap(); + for i in 0..n { + let key = format!("k-{i:05}"); + assert_eq!( + r.get(key.as_bytes()).await.unwrap().as_deref(), + Some(big.as_slice()), + "large value {key} lost after compact_step repack" + ); + } }