Skip to content

PagePool::{default -> new_for_test} + temporary hack for IN_MEMORY_CONFIG / test_index_scans #2707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bench/benches/special.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn serialize_benchmarks<
Arc::new(table_schema),
spacetimedb_table::indexes::SquashedOffset::COMMITTED_STATE,
);
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = spacetimedb_table::blob_store::HashMapBlobStore::default();

let ptrs = data_pv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ mod tests {
}

fn get_datastore() -> Result<Locking> {
Locking::bootstrap(Identity::ZERO, <_>::default())
Locking::bootstrap(Identity::ZERO, PagePool::new_for_test())
}

fn col(col: u16) -> ColList {
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ pub mod tests_utils {
history,
durability,
snapshot_repo,
PagePool::default(),
PagePool::new_for_test(),
)?;
assert_eq!(connected_clients.len(), expected_num_clients);
let db = db.with_row_count(Self::row_count_fn());
Expand Down Expand Up @@ -1828,7 +1828,7 @@ mod tests {
EmptyHistory::new(),
None,
None,
PagePool::default(),
PagePool::new_for_test(),
) {
Ok(_) => {
panic!("Allowed to open database twice")
Expand Down Expand Up @@ -2770,7 +2770,7 @@ mod tests {
Identity::ZERO,
Some(&repo),
Some(last_compress),
PagePool::default(),
PagePool::new_for_test(),
)?;

Ok(())
Expand All @@ -2795,7 +2795,8 @@ mod tests {
);

let last = repo.latest_snapshot()?;
let stdb = RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, PagePool::default())?;
let stdb =
RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, PagePool::new_for_test())?;

let out = TempDir::with_prefix("snapshot_test")?;
let dir = SnapshotsPath::from_path_unchecked(out.path());
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an
};

let runtimes = HostRuntimes::new(None);
let page_pool = PagePool::default();
let page_pool = PagePool::new(None);
let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program).await?;
let module_info = Arc::into_inner(module_info).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/snapshot/tests/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn can_sync_a_snapshot() -> anyhow::Result<()> {
assert_eq!(stats.objects_written, total_objects);

// Assert that the copied snapshot is valid.
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let dst_snapshot_full = dst_repo.read_snapshot(src.offset, &pool)?;
Locking::restore_from_snapshot(dst_snapshot_full, pool)?;

Expand Down
26 changes: 13 additions & 13 deletions crates/table/benches/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ fn reserve_empty_page(c: &mut Criterion) {
let mut group = c.benchmark_group("reserve_empty_page");
group.throughput(Throughput::Bytes(PAGE_DATA_SIZE as _));
group.bench_function("leave_uninit", |b| {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut pages = Pages::default();
b.iter(|| {
let _ = black_box(pages.reserve_empty_page(&pool, RESERVE_SIZE));
});
});

let fill_with_zeros = |_, _, pages: &mut Pages| {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let page = pages.reserve_empty_page(&pool, RESERVE_SIZE).unwrap();
let page = pages.get_page_mut(page);
unsafe { page.zero_data() };
Expand Down Expand Up @@ -222,7 +222,7 @@ fn insert_one_page_fixed_len(c: &mut Criterion) {
rows_per_page::<R>() as u64 * mem::size_of::<R>() as u64,
));
group.bench_function(name, |b| {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut pages = Pages::default();
// `0xa5` is the alternating bit pattern, which makes incorrect accesses obvious.
insert_one_page_worth_fixed_len(&pool, &mut pages, visitor, &R::from_u64(0xa5a5a5a5_a5a5a5a5));
Expand Down Expand Up @@ -283,7 +283,7 @@ fn delete_one_page_fixed_len(c: &mut Criterion) {
};
iter_time_with(
b,
&mut (Pages::default(), PagePool::default()),
&mut (Pages::default(), PagePool::new_for_test()),
pre,
|ptrs, _, (pages, _)| {
for ptr in ptrs {
Expand Down Expand Up @@ -315,7 +315,7 @@ fn retrieve_one_page_fixed_len(c: &mut Criterion) {
group.throughput(Throughput::Bytes(rows_per_page as u64 * mem::size_of::<R>() as u64));

group.bench_function(name, |b| {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut pages = Pages::default();

let ptrs = fill_page_with_fixed_len_collect_row_pointers(
Expand Down Expand Up @@ -367,7 +367,7 @@ fn insert_with_holes_fixed_len(c: &mut Criterion) {
group.throughput(Throughput::Bytes(num_to_delete_in_bytes as u64));

group.bench_function(delete_ratio.to_string(), |b| {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut pages = Pages::default();

let mut rng = StdRng::seed_from_u64(0xa5a5a5a5_a5a5a5a5);
Expand Down Expand Up @@ -437,7 +437,7 @@ fn copy_filter_fixed_len(c: &mut Criterion) {
let val = R::from_u64(0xdeadbeef_0badbeef);
for keep_ratio in [0.1, 0.25, 0.5, 0.75, 0.9, 1.0] {
let visitor = &NullVarLenVisitor;
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut pages = Pages::default();

let num_pages = 16;
Expand Down Expand Up @@ -537,7 +537,7 @@ fn table_insert_one_row(c: &mut Criterion) {
let val = black_box(val.to_product());

// Insert before benching to alloc and fault in a page.
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut ctx = (table, NullBlobStore);
let ptr = ctx.0.insert(&pool, &mut ctx.1, &val).unwrap().1.pointer();
let pre = |_, (table, bs): &mut (Table, NullBlobStore)| {
Expand Down Expand Up @@ -588,7 +588,7 @@ fn table_delete_one_row(c: &mut Criterion) {
let val = val.to_product();

// Insert before benching to alloc and fault in a page.
let mut ctx = (table, NullBlobStore, PagePool::default());
let mut ctx = (table, NullBlobStore, PagePool::new_for_test());
let insert = |_: u64, (table, bs, pool): &mut (Table, NullBlobStore, PagePool)| {
table.insert(pool, bs, &val).unwrap().1.pointer()
};
Expand Down Expand Up @@ -637,7 +637,7 @@ fn table_extract_one_row(c: &mut Criterion) {
let mut table = make_table_for_row_type::<R>(name);
let val = val.to_product();

let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = NullBlobStore;
let row = black_box(table.insert(&pool, &mut blob_store, &val).unwrap().1);
group.bench_function(name, |b| {
Expand Down Expand Up @@ -848,7 +848,7 @@ fn index_insert(c: &mut Criterion) {
same_ratio: f64,
) {
let make_row_move = &mut make_row;
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let (tbl, index_id, num_same, _) =
make_table_with_same_ratio::<R>(&pool, make_row_move, num_rows, same_ratio, false);
let mut ctx = (tbl, NullBlobStore, pool);
Expand Down Expand Up @@ -905,7 +905,7 @@ fn index_seek(c: &mut Criterion) {
) {
let make_row_move = &mut make_row;
let (tbl, index_id, num_same, num_diff) =
make_table_with_same_ratio::<R>(&PagePool::default(), make_row_move, num_rows, same_ratio, unique);
make_table_with_same_ratio::<R>(&PagePool::new_for_test(), make_row_move, num_rows, same_ratio, unique);

group.bench_with_input(
bench_id_for_index(name, num_rows, same_ratio, num_same, unique),
Expand Down Expand Up @@ -972,7 +972,7 @@ fn index_delete(c: &mut Criterion) {
same_ratio: f64,
) {
let make_row_move = &mut make_row;
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let (mut tbl, index_id, num_same, _) =
make_table_with_same_ratio::<R>(&pool, make_row_move, num_rows, same_ratio, false);

Expand Down
2 changes: 1 addition & 1 deletion crates/table/src/eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ mod test {
AlgebraicType::product([AlgebraicType::U8, AlgebraicType::U32]), // xpppxxxx
])]);

let pool = PagePool::default();
let pool = PagePool::new_for_test();
let bs = &mut NullBlobStore;
let mut table_a = crate::table::test::table(ty.clone());
let mut table_b = crate::table::test::table(ty);
Expand Down
2 changes: 1 addition & 1 deletion crates/table/src/eq_to_pv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ mod tests {
// Turn `val` into a `RowRef`.
let mut table = crate::table::test::table(ty);
let blob_store = &mut HashMapBlobStore::default();
let (_, row) = table.insert(&PagePool::default(), blob_store, &val).unwrap();
let (_, row) = table.insert(&PagePool::new_for_test(), blob_store, &val).unwrap();

// Check eq algo.
prop_assert_eq!(row, val);
Expand Down
2 changes: 1 addition & 1 deletion crates/table/src/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2493,7 +2493,7 @@ pub(crate) mod tests {

#[test]
fn serde_round_trip_whole_page() {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut page = Page::new(u64_row_size());

// Construct an empty page, ser/de it, and assert that it's still empty.
Expand Down
38 changes: 16 additions & 22 deletions crates/table/src/page_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@ impl MemoryUsage for PagePool {
}
}

/// The default page pool has a size of 8 GiB.
impl Default for PagePool {
fn default() -> Self {
Self::new(None)
impl PagePool {
pub fn new_for_test() -> Self {
Self::new(Some(100 * size_of::<Page>()))
}
}

impl PagePool {
/// Returns a new page pool with `max_size` bytes rounded down to the nearest multiple of 64 KiB.
///
/// if no size is provided, a default of 8 GiB is used.
/// if no size is provided, a default of 1 page is used.
pub fn new(max_size: Option<usize>) -> Self {
const DEFAULT_MAX_SIZE: usize = 8 * (1 << 30); // 8 GiB
const PAGE_SIZE: usize = 64 * (1 << 10); // 64 KiB, `size_of::<Page>()`
const PAGE_SIZE: usize = size_of::<Page>();
// TODO(centril): This effectively disables the page pool.
// Currently, we have a test `test_index_scans`.
// The test sets up a `Location` table, like in BitCraft, with a `chunk` field,
// and populates it with 1000 different chunks with 1200 rows each.
// Then it asserts that the cold latency of an index scan on `chunk` takes < 1 ms.
// However, for reasons currently unknown to us,
// a large page pool, with capacity `1 << 26` bytes, on i7-7700K, 64GB RAM,
// will turn the latency into 30-40 ms.
// As a precaution, we disable the page pool by default.
const DEFAULT_MAX_SIZE: usize = PAGE_SIZE; // 1 page

let queue_size = max_size.unwrap_or(DEFAULT_MAX_SIZE) / PAGE_SIZE;
let inner = Arc::new(PagePoolInner::new(queue_size));
Expand Down Expand Up @@ -163,18 +169,6 @@ impl MemoryUsage for PagePoolInner {
}
}

impl Default for PagePoolInner {
fn default() -> Self {
const MAX_PAGE_MEM: usize = 8 * (1 << 30); // 8 GiB

// 2 ^ 17 pages at most.
// Each slot in the pool is `(AtomicCell, Box<Page>)` which takes up 16 bytes.
// The pool will therefore have a fixed cost of 2^20 bytes, i.e., 2 MiB.
const MAX_POOLED_PAGES: usize = MAX_PAGE_MEM / size_of::<Page>();
Self::new(MAX_POOLED_PAGES)
}
}

#[inline]
fn inc(atomic: &AtomicUsize) {
atomic.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -246,7 +240,7 @@ mod tests {

#[test]
fn page_pool_returns_same_page() {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
assert_metrics(&pool, 0, 0, 0, 0);

// Create a page and put it back.
Expand Down
10 changes: 5 additions & 5 deletions crates/table/src/read_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ mod test {
/// inserting the row, then doing `AlgebraicValue::read_column` on each column of the row
/// returns the expected value.
fn read_column_same_value((ty, val) in generate_typed_row()) {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
let mut table = table(ty);

Expand All @@ -399,7 +399,7 @@ mod test {
/// which does not match the actual column type
/// returns an appropriate error.
fn read_column_wrong_type((ty, val) in generate_typed_row()) {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
let mut table = table(ty.clone());

Expand Down Expand Up @@ -430,7 +430,7 @@ mod test {
/// i.e. with an out-of-bounds index,
/// returns an appropriate error.
fn read_column_out_of_bounds((ty, val) in generate_typed_row()) {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
let mut table = table(ty.clone());

Expand Down Expand Up @@ -488,7 +488,7 @@ mod test {
($name:ident { $algebraic_type:expr => $rust_type:ty = $val:expr }) => {
#[test]
fn $name() {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
let mut table = table(ProductType::from_iter([$algebraic_type]));

Expand Down Expand Up @@ -550,7 +550,7 @@ mod test {
fn read_sum_tag_from_sum_with_payload() {
let algebraic_type = AlgebraicType::sum([("a", AlgebraicType::U8), ("b", AlgebraicType::U16)]);

let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
let mut table = table(ProductType::from([algebraic_type]));

Expand Down
4 changes: 2 additions & 2 deletions crates/table/src/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ mod tests {
fn pv_row_ref_hash_same_std_random_state((ty, val) in generate_typed_row()) {
// Turn `val` into a `RowRef`.
let mut table = crate::table::test::table(ty);
let pool = &PagePool::default();
let pool = &PagePool::new_for_test();
let blob_store = &mut HashMapBlobStore::default();
let (_, row) = table.insert(pool, blob_store, &val).unwrap();

Expand All @@ -247,7 +247,7 @@ mod tests {
#[test]
fn pv_row_ref_hash_same_ahash((ty, val) in generate_typed_row()) {
// Turn `val` into a `RowRef`.
let pool = &PagePool::default();
let pool = &PagePool::new_for_test();
let blob_store = &mut HashMapBlobStore::default();
let mut table = crate::table::test::table(ty);
let (_, row) = table.insert(pool, blob_store, &val).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/table/src/static_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ mod test {

#[test]
fn known_bsatn_same_as_bflatn_from((ty, val) in generate_typed_row()) {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
let mut table = crate::table::test::table(ty);
let Some(static_layout) = table.static_layout().cloned() else {
Expand Down Expand Up @@ -683,7 +683,7 @@ mod test {

#[test]
fn known_bflatn_same_as_pv_from((ty, val) in generate_typed_row()) {
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
let mut table = crate::table::test::table(ty);
let Some(static_layout) = table.static_layout().cloned() else {
Expand Down
Loading
Loading