Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/btree/overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ use crate::{RealmId, Result};
/// Header length for chain pages (non-root): `next[8] || data_len[4]`.
pub const OVERFLOW_HEADER_LEN: usize = 12;

/// Values longer than this are stored as an overflow chain rather than inline in
/// a leaf. Shared by the `put` path and `bulk_load` so a repack reproduces the
/// same inline/overflow split the original writes produced.
#[must_use]
pub fn inline_value_threshold(page_size: usize) -> usize {
page_size / 4
}

/// Extra bytes at the start of a v2 root body before the standard header:
/// `refcount[4]`.
const OVERFLOW_ROOT_V2_PREFIX: usize = 4;
Expand Down
120 changes: 54 additions & 66 deletions src/btree/tree/bulk.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
//! Bulk-load: build a dense tree from sorted pairs without `CoW` overhead.

use std::sync::Arc;

use crate::Result;
use crate::errors::PagedbError;
use crate::vfs::Vfs;

use crate::btree::leaf::{Leaf, LeafValue};
use crate::btree::node;
use crate::btree::overflow;

use super::core::BTree;

impl<V: Vfs> BTree<V> {
/// Bulk-load sorted `(key, value)` pairs into a freshly-created tree
/// without any `CoW` overhead. The tree must be empty (`root_page_id == 0`).
///
/// All records are first placed into as few leaves as needed, then internal
/// nodes are built bottom-up. Each page is written exactly once; no freed
/// pages are generated. The resulting layout is dense and compact.
///
/// Overflow values are NOT supported in bulk-load; callers must ensure all
/// values fit inline (`value.len()` ≤ `page_size` / 4). For compaction use-
/// cases, the pairs come from `collect_range` which already resolves
/// overflow chains into inline bytes.
/// Values larger than the inline threshold (`page_size / 4`) are spilled to
/// overflow chains, exactly as the `put` path does, so a dense repack
/// reproduces the original storage shape. Records are then packed into as few
/// leaves as needed and internal nodes are built bottom-up. Each leaf/internal
/// page is written exactly once; the layout is dense and compact.
///
/// Returns `Err` if the tree is not empty.
#[allow(clippy::too_many_lines)]
Expand All @@ -34,82 +34,70 @@ impl<V: Vfs> BTree<V> {
return Ok(());
}

// Build leaves greedily: pack as many records as fit into each leaf.
// No sibling pointers yet — we'll wire them up at the end.
let mut leaves: Vec<(u64, Vec<u8>)> = Vec::new(); // (page_id, first_key)

let body_cap = node::body_capacity(self.page_size);
let mut current_leaf = Leaf::new();
// Resolve each value to its stored form, spilling values past the inline
// threshold to overflow chains (same threshold as `put`). Inlining an
// oversized value would exceed leaf capacity and fail the encode.
let realm = self.realm_id;
let ps = self.page_size;
let pager = Arc::clone(&self.pager);
let inline_threshold = overflow::inline_value_threshold(ps);
let mut records: Vec<(Vec<u8>, LeafValue)> = Vec::with_capacity(pairs.len());
for (k, v) in pairs {
let value = if v.len() > inline_threshold {
let total_len = v.len() as u64;
let root_page_id =
overflow::write_chain(&pager, realm, &v, ps, &mut || self.allocate_page())
.await?;
LeafValue::Overflow {
total_len,
root_page_id,
}
} else {
LeafValue::Inline(v)
};
records.push((k, value));
}

let flush_leaf = |leaf: &Leaf, page_id: u64, next_id: u64| {
let _ = (leaf, page_id, next_id); // will write below
};
let _ = flush_leaf; // suppress unused-variable lint (closure is a placeholder)
let body_cap = node::body_capacity(ps);

// First pass: group records into leaves.
let mut leaf_groups: Vec<Vec<(Vec<u8>, Vec<u8>)>> = Vec::new();
for (k, v) in &pairs {
let entry_size = {
let suffix_len = k.len(); // conservative: no prefix compression yet
2 + suffix_len + 2 + v.len() // slot entry (inline value)
};
// Rough check: header + slot_dir entry + record body
// First pass: group records into leaves, sized by encoded record width.
let mut leaf_groups: Vec<Vec<(Vec<u8>, LeafValue)>> = Vec::new();
let mut current: Vec<(Vec<u8>, LeafValue)> = Vec::new();
for (k, value) in records {
// New record's body contribution: suffix-len field + key + value.
// (No prefix compression at build time, so suffix == full key.)
let entry_size = 2 + k.len() + value.encoded_size();
let projected = node::HEADER_LEN
+ (current_leaf.records.len() + 1) * 2
+ current_leaf
.records
+ (current.len() + 1) * 2
+ current
.iter()
.map(|(ck, cv)| 2 + ck.len() + cv.encoded_size())
.sum::<usize>()
+ entry_size;
if projected > body_cap && !current_leaf.records.is_empty() {
leaf_groups.push(
std::mem::take(&mut current_leaf.records)
.into_iter()
.map(|(lk, lv)| {
let vbytes = match lv {
LeafValue::Inline(b) => b,
LeafValue::Overflow { .. } => Vec::new(),
};
(lk, vbytes)
})
.collect(),
);
current_leaf = Leaf::new();
if projected > body_cap && !current.is_empty() {
leaf_groups.push(std::mem::take(&mut current));
}
current_leaf.upsert(k, LeafValue::Inline(v.clone()));
current.push((k, value));
}
if !current_leaf.records.is_empty() {
leaf_groups.push(
std::mem::take(&mut current_leaf.records)
.into_iter()
.map(|(lk, lv)| {
let vbytes = match lv {
LeafValue::Inline(b) => b,
LeafValue::Overflow { .. } => Vec::new(),
};
(lk, vbytes)
})
.collect(),
);
if !current.is_empty() {
leaf_groups.push(current);
}

// Second pass: allocate page_ids and write leaves with correct sibling pointers.
// Second pass: allocate page ids and write leaves with sibling links.
// Input is sorted and grouping preserves order, so each leaf's records
// are already in key order.
let mut leaves: Vec<(u64, Vec<u8>)> = Vec::new(); // (page_id, first_key)
let n_leaves = leaf_groups.len();
let page_ids: Vec<u64> = (0..n_leaves).map(|_| self.allocate_page()).collect();

for (i, group) in leaf_groups.iter().enumerate() {
let mut leaf = Leaf {
for (i, group) in leaf_groups.into_iter().enumerate() {
let first_key = group.first().map(|(k, _)| k.clone()).unwrap_or_default();
let leaf = Leaf {
left_sibling: if i == 0 { 0 } else { page_ids[i - 1] },
right_sibling: if i + 1 < n_leaves { page_ids[i + 1] } else { 0 },
records: group
.iter()
.map(|(k, v)| (k.clone(), LeafValue::Inline(v.clone())))
.collect(),
records: group,
};
leaf.records.sort_by(|(a, _), (b, _)| a.cmp(b)); // already sorted
self.write_leaf(page_ids[i], &leaf).await?;
let first_key = group.first().map(|(k, _)| k.clone()).unwrap_or_default();
leaves.push((page_ids[i], first_key));
}

Expand Down
2 changes: 1 addition & 1 deletion src/btree/tree/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl<V: Vfs> BTree<V> {
/// Insert or overwrite a key-value pair.
pub async fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
// Build the leaf value — inline if small enough, overflow chain otherwise.
let leaf_value = if value.len() > self.page_size / 4 {
let leaf_value = if value.len() > overflow::inline_value_threshold(self.page_size) {
let realm = self.realm_id;
let ps = self.page_size;
let pager = Arc::clone(&self.pager);
Expand Down
96 changes: 4 additions & 92 deletions src/catalog/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,46 +42,13 @@ pub enum CatalogRowKind {
/// catalog, reader pins are maintained in-memory only and the writer process
/// must be trusted to honor the catalog pins.
ReaderPin = 0x06,
/// Incremental compaction watermark. Singleton row; key is `[0x07]`.
/// Value: `target_root[8] || frontier_page_id[8] || started_at_commit_id[8] ||
/// total_pages_estimate[8]` = 32 bytes.
///
/// Present only while an incremental compaction is in progress. Cleared
/// atomically with the final compaction commit. On `Db::open`, a present
/// row means a prior compaction was interrupted; the embedder must call
/// `compact_step` again to resume — `Db::open` does NOT auto-resume.
/// Reserved (`0x07`). Older builds wrote an incremental-compaction watermark
/// here; compaction is now a single atomic operation and never writes it.
/// Retained as a row-kind boundary and so any legacy row is recognised and
/// dropped during compaction.
CompactionState = 0x07,
}

/// Incremental compaction watermark persisted in the catalog.
///
/// Present while a `compact_step` session is in progress. The embedder must
/// call `compact_step` again to resume after a crash; `Db::open` does not
/// auto-resume.
///
/// Encoding: `target_root[8] || frontier_page_id[8] || started_at_commit_id[8] ||
/// total_pages_estimate[8] || frontier_key_len[4] || frontier_key[frontier_key_len]`
/// (minimum 32 bytes; variable total).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompactionStateRow {
/// Root page id of the main B+ tree at the time compaction started.
/// Used as the source snapshot to read from during each step.
pub target_root: u64,
/// The `next_page_id` of the partially-built compacted tree after the last
/// committed batch. Used to estimate progress and as the initial `next_page_id`
/// when no free-list pages are available.
pub frontier_page_id: u64,
/// The `commit_id` at which the compaction session began.
pub started_at_commit_id: u64,
/// Estimated total pages in the source tree (for progress reporting).
pub total_pages_estimate: u64,
/// The key (exclusive lower bound) from which the next step should begin
/// reading pairs. Empty means start from the beginning.
pub frontier_key: Vec<u8>,
}

pub const COMPACTION_STATE_FIXED_LEN: usize = 36; // 4 × u64 + 1 × u32 length prefix

/// Rekey watermark persisted in the catalog during an online rekey operation.
/// A present row means a rekey is in flight or was interrupted by a crash.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -246,61 +213,6 @@ impl Catalog {
[CatalogRowKind::CompactionState as u8]
}

/// Compaction-state row key: `[0x07]` (singleton).
#[must_use]
pub fn compaction_state_key() -> [u8; 1] {
[CatalogRowKind::CompactionState as u8]
}

/// Encode a `CompactionStateRow`. Returns a variable-length `Vec<u8>`.
pub fn encode_compaction_state(r: &CompactionStateRow) -> Result<Vec<u8>> {
let key_len = r.frontier_key.len();
let klen32 = u32::try_from(key_len).map_err(|_| PagedbError::ManifestTooLarge)?;
let mut o = Vec::with_capacity(COMPACTION_STATE_FIXED_LEN + key_len);
o.extend_from_slice(&r.target_root.to_le_bytes());
o.extend_from_slice(&r.frontier_page_id.to_le_bytes());
o.extend_from_slice(&r.started_at_commit_id.to_le_bytes());
o.extend_from_slice(&r.total_pages_estimate.to_le_bytes());
o.extend_from_slice(&klen32.to_le_bytes());
o.extend_from_slice(&r.frontier_key);
Ok(o)
}

/// Decode a `CompactionStateRow` from bytes.
pub fn decode_compaction_state(bytes: &[u8]) -> Result<CompactionStateRow> {
if bytes.len() < COMPACTION_STATE_FIXED_LEN {
return Err(PagedbError::corruption(
crate::errors::CorruptionDetail::HeaderUnverifiable,
));
}
let read_u64 = |off: usize| {
let mut b = [0u8; 8];
b.copy_from_slice(&bytes[off..off + 8]);
u64::from_le_bytes(b)
};
let target_root = read_u64(0);
let frontier_page_id = read_u64(8);
let started_at_commit_id = read_u64(16);
let total_pages_estimate = read_u64(24);
let mut klen_buf = [0u8; 4];
klen_buf.copy_from_slice(&bytes[32..36]);
let key_len = u32::from_le_bytes(klen_buf) as usize;
if bytes.len() < COMPACTION_STATE_FIXED_LEN + key_len {
return Err(PagedbError::corruption(
crate::errors::CorruptionDetail::HeaderUnverifiable,
));
}
let frontier_key =
bytes[COMPACTION_STATE_FIXED_LEN..COMPACTION_STATE_FIXED_LEN + key_len].to_vec();
Ok(CompactionStateRow {
target_root,
frontier_page_id,
started_at_commit_id,
total_pages_estimate,
frontier_key,
})
}

/// Encode a `ReaderPinValue` as 41 bytes.
#[must_use]
pub fn encode_reader_pin(v: &ReaderPinValue) -> [u8; READER_PIN_VALUE_LEN] {
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

pub mod codec;

pub use codec::{Catalog, CatalogRowKind, CompactionStateRow, ReaderPinValue, RekeyStateRow};
pub use codec::{Catalog, CatalogRowKind, ReaderPinValue, RekeyStateRow};
Loading
Loading