Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions nodedb-lite/src/nodedb/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

use serde::Serialize;

use nodedb_types::error::NodeDbResult;

use crate::memory::{EngineId, PressureLevel};
use crate::storage::engine::StorageEngine;

Expand Down Expand Up @@ -124,6 +126,19 @@ impl<S: StorageEngine> NodeDbLite<S> {
&self.storage
}

/// Compact the backing storage engine, reclaiming dead pages and
/// truncating the file to bound on-disk growth.
///
/// Forwards to [`StorageEngine::compact`]. For the pagedb-backed engine this
/// drains the deferred-free list and truncates `main.db`; for in-memory or
/// test engines it is a no-op returning a zero
/// [`CompactionOutcome`](crate::storage::engine::CompactionOutcome).
pub async fn compact(
&self,
) -> NodeDbResult<crate::storage::engine::CompactionOutcome> {
Ok(self.storage.compact().await?)
}

/// Get a structured health report.
///
/// This is a cheap, non-blocking call — reads atomic counters and lock-free state.
Expand Down
28 changes: 28 additions & 0 deletions nodedb-lite/src/storage/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ use nodedb_types::Namespace;
/// `StorageEngine` trait's scan interface.
pub type KvPair = (Vec<u8>, Vec<u8>);

/// Summary of what a [`StorageEngine::compact`] call reclaimed.
///
/// Lite-owned (not a pagedb type) so the trait doesn't force pagedb types on
/// non-pagedb impls. The pagedb-backed engine maps `pagedb::CompactStats` into
/// this; other engines return the `Default` (all-zero) value from the trait's
/// default no-op `compact`.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct CompactionOutcome {
/// Number of underlying pages reclaimed (moved to free-list or freed by
/// repacking). Zero for engines with nothing to compact.
pub reclaimed_pages: u64,
/// Number of segment files repacked.
pub segments_repacked: u32,
/// Bytes truncated from the backing file by lowering the high-water mark.
pub file_bytes_freed: u64,
}

/// A write operation for batch writes.
#[derive(Debug, Clone)]
pub enum WriteOp {
Expand Down Expand Up @@ -72,6 +89,17 @@ pub trait StorageEngine: Send + Sync + 'static {
/// Useful for cold-start progress reporting and memory governor decisions.
async fn count(&self, ns: Namespace) -> Result<u64, LiteError>;

/// Compact the backing store, reclaiming dead pages and (when possible)
/// truncating the file to bound on-disk growth.
///
/// The default implementation is a no-op returning a zero
/// [`CompactionOutcome`], so engines with nothing to compact (in-memory
/// stores, test doubles) need not override it. The pagedb-backed engine
/// overrides this to drain the deferred-free list and truncate `main.db`.
async fn compact(&self) -> Result<CompactionOutcome, LiteError> {
Ok(CompactionOutcome::default())
}

/// Range scan: return up to `limit` entries where key >= `start`.
///
/// Results are ordered by key (lexicographic byte order).
Expand Down
79 changes: 78 additions & 1 deletion nodedb-lite/src/storage/pagedb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use pagedb::vfs::traits::Vfs;
use pagedb::{Db, RealmId};

use crate::error::LiteError;
use crate::storage::engine::{KvPair, StorageEngine, WriteOp};
use crate::storage::engine::{CompactionOutcome, KvPair, StorageEngine, WriteOp};
use nodedb_types::Namespace;

// ─── VFS aliases ─────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -453,6 +453,15 @@ where
.collect())
}

async fn compact(&self) -> Result<CompactionOutcome, LiteError> {
let stats = self.db.compact_now().await.map_err(LiteError::from)?;
Ok(CompactionOutcome {
reclaimed_pages: stats.main_db_pages_reclaimed,
segments_repacked: stats.segments_repacked,
file_bytes_freed: stats.bytes_truncated,
})
}

fn as_vector_segment_ext(
&self,
) -> Option<&dyn crate::storage::vector_segment_ext::VectorSegmentExt> {
Expand Down Expand Up @@ -744,6 +753,15 @@ impl<V: Vfs + Clone + 'static> StorageEngine for PagedbStorage<V> {
.map(|(k, v)| (strip_prefix(&k).to_vec(), v))
.collect())
}

async fn compact(&self) -> Result<CompactionOutcome, LiteError> {
let stats = self.db.compact_now().await.map_err(LiteError::from)?;
Ok(CompactionOutcome {
reclaimed_pages: stats.main_db_pages_reclaimed,
segments_repacked: stats.segments_repacked,
file_bytes_freed: stats.bytes_truncated,
})
}
}

// ─── VectorSegmentExt impl ────────────────────────────────────────────────────
Expand Down Expand Up @@ -1227,6 +1245,65 @@ mod tests {
assert_eq!(results[2].0, &[4u8]);
}

/// In-memory engine: `compact()` is a successful no-op (nothing to reclaim).
#[tokio::test]
async fn compact_mem_is_ok_noop() {
let s = make_storage().await;
s.put(Namespace::Vector, b"v1", b"hello").await.unwrap();
s.put(Namespace::Graph, b"g1", b"world").await.unwrap();
let outcome = s.compact().await.unwrap();
// Data still readable after compaction.
assert_eq!(
s.get(Namespace::Vector, b"v1").await.unwrap().as_deref(),
Some(b"hello".as_slice())
);
// MemVfs has no file truncation, but the call must succeed regardless.
let _ = outcome.reclaimed_pages;
}

/// Disk-backed engine on a tempdir: write rows (including churn that leaves
/// dead pages), then `compact()` must succeed and report a non-negative
/// outcome. Data must remain intact afterward.
#[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
async fn compact_default_disk_is_ok() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("compact-test.db");
let s = PagedbStorage::<DefaultVfs>::open(
&path,
crate::storage::encryption::Encryption::Plaintext,
)
.await
.unwrap();

// Churn: write then overwrite/delete a batch of keys so the
// deferred-free list has pages to reclaim.
for i in 0u32..200 {
let key = i.to_be_bytes();
s.put(Namespace::Meta, &key, &vec![0xCDu8; 512])
.await
.unwrap();
}
for i in 0u32..150 {
let key = i.to_be_bytes();
s.delete(Namespace::Meta, &key).await.unwrap();
}

let outcome = s.compact().await.unwrap();

// Surviving keys still readable.
let survivor = 175u32.to_be_bytes();
assert!(s.get(Namespace::Meta, &survivor).await.unwrap().is_some());

// Outcome fields are well-formed (u64/u32 — always >= 0); just touch
// them so the assertion documents the reported shape.
let _ = (
outcome.reclaimed_pages,
outcome.segments_repacked,
outcome.file_bytes_freed,
);
}

/// Keys in namespace N must not appear in a scan of namespace N+1, and
/// vice versa. Verifies the single-byte prefix boundary.
#[tokio::test]
Expand Down