diff --git a/Cargo.lock b/Cargo.lock index 1a428930b8..be450cc379 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8631,9 +8631,11 @@ dependencies = [ "restate-limiter", "restate-memory", "restate-partition-store", + "restate-platform", "restate-rocksdb", "restate-serde-util", "restate-storage-api", + "restate-time-util", "restate-types", "restate-util-string", "restate-worker-api", diff --git a/crates/partition-store/src/tests/vqueue_table_test/mod.rs b/crates/partition-store/src/tests/vqueue_table_test/mod.rs index be7d060657..1830213153 100644 --- a/crates/partition-store/src/tests/vqueue_table_test/mod.rs +++ b/crates/partition-store/src/tests/vqueue_table_test/mod.rs @@ -20,14 +20,16 @@ //! Invariants: //! - The waiting cursor must not cross the boundary to adjacent vqueue ids. //! - The waiting cursor must not cross partition-key boundaries either. -//! - When the waiting cursor is "seeked" to to first after a higher-order item has been inserted -//! (i.e. item with has_lock=true, or with older run_at), the cursor must show this added item. +//! - The waiting cursor uses snapshot semantics: it captures a consistent +//! view of storage at creation time. Writes/deletes that happen after the +//! cursor is created are NOT visible to that cursor — callers must create +//! a fresh cursor to observe them. use restate_clock::time::MillisSinceEpoch; use restate_storage_api::Transaction; use restate_storage_api::vqueue_table::{ - EntryKey, EntryMetadata, EntryValue, Stage, Status, VQueueCursor, VQueueStore, - WriteVQueueTable, stats::EntryStatistics, + EntryKey, EntryMetadata, EntryValue, Options, Stage, Status, VQueueCursor, VQueueRunningCursor, + VQueueStore, WriteVQueueTable, stats::EntryStatistics, }; use restate_types::clock::UniqueTimestamp; use restate_types::identifiers::PartitionKey; @@ -90,7 +92,16 @@ fn default_entry(id: u8) -> (EntryKey, EntryValue) { entry(id, false, 0, id as u64) } -/// Collects all items from a cursor into a Vec +fn inbox_reader(db: &crate::PartitionDb, qid: &VQueueId) -> impl VQueueCursor + use<> { + db.new_inbox_reader( + qid, + Options { + allow_blocking_io: true, + }, + ) +} + +/// Collects all items from an inbox cursor. fn collect_cursor(cursor: &mut C) -> Vec<(EntryKey, EntryValue)> { let mut items = Vec::new(); cursor.seek_to_first(); @@ -101,6 +112,17 @@ fn collect_cursor(cursor: &mut C) -> Vec<(EntryKey, EntryValue) items } +/// Collects all items from a running cursor (different error type than inbox). +fn collect_running(cursor: &mut C) -> Vec<(EntryKey, EntryValue)> { + let mut items = Vec::new(); + cursor.seek_to_first(); + while let Ok(Some(item)) = cursor.peek() { + items.push(item); + cursor.advance(); + } + items +} + fn collect_ids(items: &[(EntryKey, EntryValue)]) -> Vec { items.iter().map(|(key, _)| *key.entry_id()).collect() } @@ -125,7 +147,7 @@ async fn key_ordering_by_has_lock_run_at_seq(txn: &mut W) { fn verify_key_ordering_by_has_lock_run_at_seq(db: &crate::PartitionDb) { let qid = test_qid(); - let mut reader = db.new_inbox_reader(&qid); + let mut reader = inbox_reader(db, &qid); let items = collect_cursor(&mut reader); assert_eq!(items.len(), 5, "Expected 5 items in inbox"); @@ -165,7 +187,7 @@ async fn ordering_within_same_lock_domain(txn: &mut W) { fn verify_ordering_within_same_lock_domain(db: &crate::PartitionDb) { let qid = VQueueId::custom(2000, "1"); - let mut reader = db.new_inbox_reader(&qid); + let mut reader = inbox_reader(db, &qid); let items = collect_cursor(&mut reader); assert_eq!(items.len(), 4, "Expected 4 items"); @@ -199,12 +221,12 @@ fn verify_running_and_inbox_are_separate(db: &crate::PartitionDb) { // Running reader should only see the Run stage entry let mut run_reader = db.new_run_reader(&qid); - let run_items = collect_cursor(&mut run_reader); + let run_items = collect_running(&mut run_reader); assert_eq!(run_items.len(), 1, "Running reader should see 1 item"); assert_eq!(*run_items[0].0.entry_id(), entry_id(10)); // Inbox reader should only see the Inbox stage entries - let mut inbox_reader = db.new_inbox_reader(&qid); + let mut inbox_reader = inbox_reader(db, &qid); let inbox_items = collect_cursor(&mut inbox_reader); assert_eq!(inbox_items.len(), 2, "Inbox reader should see 2 items"); assert_eq!(collect_ids(&inbox_items), vec![entry_id(20), entry_id(21)]); @@ -226,10 +248,10 @@ fn verify_seek_after_works(db: &crate::PartitionDb) { let entries: Vec<_> = (1..=5).map(default_entry).collect(); - let mut reader = db.new_inbox_reader(&qid); + let mut reader = inbox_reader(db, &qid); // Seek after the 3rd entry (id=3) - reader.seek_after(&qid, &entries[2].0); + reader.seek_after(&entries[2].0); let item = reader.peek().unwrap(); assert!(item.is_some(), "Should have items after seek_after"); // Next item should be entry 4 (strictly after entry 3) @@ -251,7 +273,7 @@ fn verify_empty_queue_returns_none(db: &crate::PartitionDb) { "Empty running queue should return None" ); - let mut inbox_reader = db.new_inbox_reader(&qid); + let mut inbox_reader = inbox_reader(db, &qid); inbox_reader.seek_to_first(); assert!( inbox_reader.peek().unwrap().is_none(), @@ -283,17 +305,17 @@ fn verify_vqueue_isolation(db: &crate::PartitionDb) { let qid3 = VQueueId::custom(pkey, "3"); // Each queue should only see its own entry - let mut reader1 = db.new_inbox_reader(&qid1); + let mut reader1 = inbox_reader(db, &qid1); let items1 = collect_cursor(&mut reader1); assert_eq!(items1.len(), 1); assert_eq!(*items1[0].0.entry_id(), entry_id(1)); - let mut reader2 = db.new_inbox_reader(&qid2); + let mut reader2 = inbox_reader(db, &qid2); let items2 = collect_cursor(&mut reader2); assert_eq!(items2.len(), 1); assert_eq!(*items2[0].0.entry_id(), entry_id(2)); - let mut reader3 = db.new_inbox_reader(&qid3); + let mut reader3 = inbox_reader(db, &qid3); let items3 = collect_cursor(&mut reader3); assert_eq!(items3.len(), 1); assert_eq!(*items3[0].0.entry_id(), entry_id(3)); @@ -320,7 +342,7 @@ fn verify_waiting_cursor_boundary_is_respected(db: &crate::PartitionDb) { let qid_b = VQueueId::custom(pkey, "b"); let a2 = entry(12, false, 10, 2); - let mut reader_a = db.new_inbox_reader(&qid_a); + let mut reader_a = inbox_reader(db, &qid_a); reader_a.seek_to_first(); assert_eq!( @@ -339,13 +361,13 @@ fn verify_waiting_cursor_boundary_is_respected(db: &crate::PartitionDb) { "Reader for qid_a must stop before qid_b entries" ); - reader_a.seek_after(&qid_a, &a2.0); + reader_a.seek_after(&a2.0); assert!( reader_a.peek().unwrap().is_none(), "seek_after(last_item) must not cross into the next vqueue" ); - let mut reader_b = db.new_inbox_reader(&qid_b); + let mut reader_b = inbox_reader(db, &qid_b); let items_b = collect_cursor(&mut reader_b); assert_eq!(collect_ids(&items_b), vec![entry_id(21)]); } @@ -376,7 +398,7 @@ fn verify_waiting_cursor_partition_prefix_boundary_is_respected(db: &crate::Part let qid_next_partition = VQueueId::custom(PartitionKey::from(5_201u64), "shared-boundary"); let target2 = entry(42, false, 10, 2); - let mut reader_target = db.new_inbox_reader(&qid_target_partition); + let mut reader_target = inbox_reader(db, &qid_target_partition); reader_target.seek_to_first(); assert_eq!( @@ -403,33 +425,33 @@ fn verify_waiting_cursor_partition_prefix_boundary_is_respected(db: &crate::Part "Reader for target qid must stop before adjacent partition keys" ); - reader_target.seek_after(&qid_target_partition, &target2.0); + reader_target.seek_after(&target2.0); assert!( reader_target.peek().unwrap().is_none(), "seek_after(last_item) must not cross into adjacent partition-key prefixes" ); - let mut reader_prev = db.new_inbox_reader(&qid_prev_partition); + let mut reader_prev = inbox_reader(db, &qid_prev_partition); assert_eq!( collect_ids(&collect_cursor(&mut reader_prev)), vec![entry_id(31)] ); - let mut reader_next = db.new_inbox_reader(&qid_next_partition); + let mut reader_next = inbox_reader(db, &qid_next_partition); assert_eq!( collect_ids(&collect_cursor(&mut reader_next)), vec![entry_id(51)] ); } -/// Test: Tailing iterator sees newly enqueued items after seek_to_first. +/// Test: a freshly created reader sees the current state of storage, +/// including writes that landed after a previous reader was created. /// -/// This verifies that the inbox reader (which uses a tailing iterator) can see -/// items that were added after the reader was created, when re-seeking. -async fn tailing_iterator_sees_new_items_on_reseek(rocksdb: &mut PartitionStore) { +/// This is the snapshot-semantics counterpart to the previous tailing-iterator +/// tests: callers must construct a new reader to observe post-creation writes. +async fn fresh_reader_sees_current_state(rocksdb: &mut PartitionStore) { let qid = VQueueId::custom(6000, "1"); - // Insert initial entries let entry1 = default_entry(1); let entry2 = default_entry(2); { @@ -439,586 +461,60 @@ async fn tailing_iterator_sees_new_items_on_reseek(rocksdb: &mut PartitionStore) txn.commit().await.expect("commit should succeed"); } - // Create reader and verify initial state - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - let item = reader.peek().unwrap(); - assert_eq!(item.as_ref().map(|e| *e.0.entry_id()), Some(entry_id(1))); - reader.advance(); - - let item = reader.peek().unwrap(); - assert_eq!(item.as_ref().map(|e| *e.0.entry_id()), Some(entry_id(2))); - reader.advance(); - - // Should be empty now - assert!(reader.peek().unwrap().is_none()); - - // Now add a new entry while the reader is still open - let entry3 = default_entry(3); + // Create the original reader and drain it. { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.commit().await.expect("commit should succeed"); - } - - // Re-seek to first - should now see all 3 entries - reader.seek_to_first(); - let items = { - let mut items = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - items.push(item); - reader.advance(); - } - items - }; - - assert_eq!(items.len(), 3, "Should see all 3 items after reseek"); - assert_eq!( - collect_ids(&items), - vec![entry_id(1), entry_id(2), entry_id(3)] - ); -} - -/// Test: Re-seek to first sees newly inserted higher-order items. -/// -/// This covers both invariants mentioned in the module docs: -/// - new item with older `run_at` appears first after re-seek -/// - new item with `has_lock=true` appears first after re-seek -async fn reseek_shows_new_higher_order_items(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(6_500, "1"); - - let base1 = entry(1, false, 100, 1); - let base2 = entry(2, false, 200, 2); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &base1.0, &base1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &base2.0, &base2.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - - // Add an item with older run_at (higher order among unlocked entries) - let older_run_at = entry(3, false, 50, 3); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &older_run_at.0, &older_run_at.1); - txn.commit().await.expect("commit should succeed"); - } - - reader.seek_to_first(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - // Add an item that has a lock (always higher order than unlocked entries) - let locked = entry(4, true, 300, 4); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &locked.0, &locked.1); - txn.commit().await.expect("commit should succeed"); - } - - reader.seek_to_first(); - let items = { - let mut items = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - items.push(item); - reader.advance(); - } - items - }; - - assert_eq!( - collect_ids(&items), - vec![entry_id(4), entry_id(3), entry_id(1), entry_id(2)] - ); -} - -/// Test: Tailing iterator sees newly enqueued items via seek_after. -/// -/// This verifies that when using seek_after to resume iteration, newly added -/// items that sort after the seek position are visible. -async fn tailing_iterator_sees_new_items_on_seek_after(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7000, "1"); - - // Insert initial entries with different key prefixes - let entry_first = entry(1, true, 10, 1); - let entry_second = entry(2, false, 10, 2); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_first.0, &entry_first.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_second.0, &entry_second.1); - txn.commit().await.expect("commit should succeed"); - } - - // Create reader and read the first item - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - let first = reader.peek().unwrap().unwrap(); - assert_eq!(*first.0.entry_id(), entry_id(1)); - - // Now add a new entry that sorts after the seek position - let entry_third = entry(3, false, 10, 3); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_third.0, &entry_third.1); - txn.commit().await.expect("commit should succeed"); - } - - // Seek after the first item - should see entry_second and entry_third - reader.seek_after(&qid, &first.0); - - let mut remaining = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - remaining.push(item); - reader.advance(); - } - - assert_eq!(remaining.len(), 2, "Should see 2 items after seek_after"); - assert_eq!(collect_ids(&remaining), vec![entry_id(2), entry_id(3)]); -} - -/// Test: Tailing iterator sees items inserted ahead of current position without re-seek. -/// -/// Scenario (1): after the cursor has advanced at least once, if a new item is inserted -/// at a key greater than the current position, it should become visible by continuing to -/// call `advance()`/`peek()` without any seek. -async fn tailing_iterator_sees_inserted_ahead_without_reseek(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7_200, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry5 = entry(5, false, 50, 5); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry5.0, &entry5.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - reader.advance(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - let entry4 = entry(4, false, 40, 4); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry4.0, &entry4.1); - txn.commit().await.expect("commit should succeed"); - } - - // Continue from current position without seek. - reader.advance(); - let maybe_inserted = reader.peek().unwrap().as_ref().map(|e| *e.0.entry_id()); - assert_eq!( - maybe_inserted, - Some(entry_id(4)), - "Inserted item ahead of current position should be visible without re-seek" - ); - reader.advance(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(5)); -} - -/// Test: Tailing iterator sees flushed insertions while mid-iteration without re-seek. -/// -/// Scenario: the cursor advances a couple of times and still has items to read. -/// New entries are inserted ahead of the current position, but before the next -/// existing item (splicing), and memtables are flushed. -/// Continuing with `advance()`/`peek()` (without seek) should surface the flushed items. -async fn tailing_iterator_sees_flushed_insertions_mid_iteration(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7_250, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry6 = entry(6, false, 60, 6); - let entry9 = entry(9, false, 90, 9); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry6.0, &entry6.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry9.0, &entry9.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - reader.advance(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - // Splice entries between current position (3) and next existing item (6). - let entry4 = entry(4, false, 40, 4); - let entry5 = entry(5, false, 50, 5); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry4.0, &entry4.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry5.0, &entry5.1); - txn.commit().await.expect("commit should succeed"); + let db = rocksdb.partition_db(); + let mut reader = inbox_reader(db, &qid); + let items = collect_cursor(&mut reader); + assert_eq!(collect_ids(&items), vec![entry_id(1), entry_id(2)]); } - rocksdb - .partition_db() - .flush_memtables(true) - .await - .expect("flush memtables should succeed"); - - // Continue from current position without seek. - reader.advance(); - let mut remaining_ids = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - remaining_ids.push(*item.0.entry_id()); - reader.advance(); - } - - assert_eq!( - remaining_ids, - vec![entry_id(4), entry_id(5), entry_id(6), entry_id(9)], - "Spliced flushed items should be visible and not skipped without re-seek" - ); -} - -/// Test: Seeked tailing iterator sees spliced insertions after a pre-insert flush. -/// -/// Scenario: perform one seek to position the cursor on the item immediately -/// before the splice point, flush memtables, then insert a new item in the middle. -/// Advancing from that seeked position should return the new spliced item first. -async fn seeked_tailing_iterator_sees_spliced_insertions_after_preflush( - rocksdb: &mut PartitionStore, -) { - let qid = VQueueId::custom(7_255, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry6 = entry(6, false, 60, 6); - let entry9 = entry(9, false, 90, 9); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry6.0, &entry6.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry9.0, &entry9.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - - rocksdb - .partition_db() - .flush_memtables(true) - .await - .expect("flush memtables should succeed"); - - // Single seek before inserting new items: land on entry3. - reader.seek_after(&qid, &entry1.0); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - // Splice one entry between current position (3) and next existing item (6). - let entry4 = entry(4, false, 40, 4); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry4.0, &entry4.1); - txn.commit().await.expect("commit should succeed"); - } - - // Continue from the seeked position without another seek. - reader.advance(); - - // This is the proof that we need to re-seek because the iterator will be blind - // to the newly added item if the seek happened after the mutable memtable flush. - assert_ne!( - *reader.peek().unwrap().unwrap().0.entry_id(), - entry_id(4), - "advance from seeked predecessor should surface the spliced item first" - ); - - // Re-seeking should make it visible again. - reader.seek_after(&qid, &entry1.0); - reader.advance(); - assert_eq!( - *reader.peek().unwrap().unwrap().0.entry_id(), - entry_id(4), - "advance from seeked predecessor should surface the spliced item first" - ); - - reader.advance(); - let mut tail_ids = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - tail_ids.push(*item.0.entry_id()); - reader.advance(); - } - - assert_eq!( - tail_ids, - vec![entry_id(6), entry_id(9)], - "Remaining tail after the spliced item should keep original order" - ); -} - -/// Test: Seeked tailing iterator sees appended insertions after a pre-insert flush. -/// -/// Scenario: perform one seek, flush memtables, then insert new items that are strictly -/// after the existing tail. Continuing with `advance()`/`peek()` from that seeked -/// position should eventually surface the appended items. -async fn seeked_tailing_iterator_sees_appended_insertions_after_preflush( - rocksdb: &mut PartitionStore, -) { - let qid = VQueueId::custom(7_256, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry6 = entry(6, false, 60, 6); - let entry9 = entry(9, false, 90, 9); + // Append a new item. + let entry3 = default_entry(3); { let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry6.0, &entry6.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry9.0, &entry9.1); txn.commit().await.expect("commit should succeed"); } + // A fresh reader sees all three items. let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - - // Single seek before appending new tail items: land on entry3. - reader.seek_after(&qid, &entry1.0); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - rocksdb - .partition_db() - .flush_memtables(true) - .await - .expect("flush memtables should succeed"); - - let entry10 = entry(10, false, 100, 10); - let entry11 = entry(11, false, 110, 11); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry10.0, &entry10.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry11.0, &entry11.1); - txn.commit().await.expect("commit should succeed"); - } - - // Continue from the seeked position without another seek. - reader.advance(); - let mut remaining_ids = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - remaining_ids.push(*item.0.entry_id()); - reader.advance(); - } - + let mut reader = inbox_reader(db, &qid); + let items = collect_cursor(&mut reader); assert_eq!( - remaining_ids, - vec![entry_id(6), entry_id(9), entry_id(10), entry_id(11)], - "Appended flushed items should be visible after traversing existing tail" - ); -} - -/// Test: Tailing iterator sees items inserted after reaching the end without re-seek. -/// -/// Scenario (2): after the cursor reaches end-of-iteration, if a new item is inserted, -/// continuing with `advance()`/`peek()` (without seek) should surface the new item. -async fn tailing_iterator_sees_item_added_after_end_without_reseek(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7_300, "1"); - - let entry1 = entry(1, false, 10, 1); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - reader.advance(); - assert!(reader.peek().unwrap().is_none(), "Reader should be at end"); - - let entry2 = entry(2, false, 20, 2); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry2.0, &entry2.1); - txn.commit().await.expect("commit should succeed"); - } - - // Stay in the same iterator path, no re-seek. - reader.advance(); - assert!( - reader.peek().unwrap().is_none(), - "Inserted item after end is not visible without re-seek" + collect_ids(&items), + vec![entry_id(1), entry_id(2), entry_id(3)] ); - - // Re-seeking makes the newly inserted item visible. - reader.seek_after(&qid, &entry1.0); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(2)); } -/// Test: Deleted items don't appear after reseek. -/// -/// This verifies that when an item is deleted while the reader is open, -/// re-seeking will not return the deleted item. -async fn deleted_items_not_visible_after_reseek(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(8000, "1"); +/// Test: an existing reader holds a snapshot — writes after creation are not +/// observable, even after `seek_to_first` or `seek_after`. +async fn existing_reader_does_not_see_post_snapshot_writes(rocksdb: &mut PartitionStore) { + let qid = VQueueId::custom(6_100, "1"); - // Insert initial entries let entry1 = default_entry(1); let entry2 = default_entry(2); - let entry3 = default_entry(3); { let mut txn = rocksdb.transaction(); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry2.0, &entry2.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); txn.commit().await.expect("commit should succeed"); } - // Create reader and verify we see all 3 let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - let items = collect_cursor(&mut reader); - assert_eq!(items.len(), 3); - - // Delete the middle entry while the reader is still open - { - let mut txn = rocksdb.transaction(); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry2.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry2.0); - txn.commit().await.expect("commit should succeed"); - } - - // Re-seek and verify we only see entries 1 and 3 + let mut reader = inbox_reader(db, &qid); reader.seek_to_first(); - let items = { - let mut items = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - items.push(item); - reader.advance(); - } - items - }; - - assert_eq!(items.len(), 2, "Should only see 2 items after deletion"); - assert_eq!(collect_ids(&items), vec![entry_id(1), entry_id(3)]); -} - -/// Test: Deleted items don't appear after seek_after. -/// -/// This verifies that deleted items are not returned when using seek_after -/// to resume iteration past a certain point. -async fn deleted_items_not_visible_after_seek_after(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(8500, "1"); + assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - // Insert entries - let entry1 = default_entry(1); - let entry2 = default_entry(2); + // Insert a new item after the reader has taken its snapshot. let entry3 = default_entry(3); { let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry2.0, &entry2.1); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); txn.commit().await.expect("commit should succeed"); } - // Create reader - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - // Read first item - let first = reader.peek().unwrap().unwrap(); - assert_eq!(*first.0.entry_id(), entry_id(1)); - - // Delete entries 2 and 3 while reader is open - { - let mut txn = rocksdb.transaction(); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry2.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry2.0); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry3.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry3.0); - txn.commit().await.expect("commit should succeed"); - } - - // Seek after first - should see nothing since 2 and 3 are deleted - reader.seek_after(&qid, &first.0); - assert!( - reader.peek().unwrap().is_none(), - "Should see no items after seek_after when remaining items are deleted" - ); -} - -/// Test: Concurrent enqueue and delete operations are handled correctly. -/// -/// This tests a more complex scenario where items are both added and removed -/// while the reader is open. -async fn concurrent_enqueue_and_delete(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(9000, "1"); - - // Insert initial entries in deterministic key order - let entry_high = entry(10, true, 10, 10); - let entry_mid = entry(20, false, 20, 20); - let entry_low = entry(30, false, 30, 30); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_high.0, &entry_high.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_mid.0, &entry_mid.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_low.0, &entry_low.1); - txn.commit().await.expect("commit should succeed"); - } - - // Create reader and read first item - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - let first = reader.peek().unwrap().unwrap(); - assert_eq!(*first.0.entry_id(), entry_id(10)); - - // Simultaneously: delete entry_mid, add a new entry that sorts first - let entry_new_first = entry(5, true, 5, 5); - { - let mut txn = rocksdb.transaction(); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry_mid.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry_mid.0); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_new_first.0, &entry_new_first.1); - txn.commit().await.expect("commit should succeed"); - } - - // Re-seek from start - should see: id=5, id=10, id=30 - // (id=20 is deleted, id=5 is added) + // Re-seeking the same reader still only shows the snapshot's two items. reader.seek_to_first(); let items = { let mut items = Vec::new(); @@ -1028,12 +524,7 @@ async fn concurrent_enqueue_and_delete(rocksdb: &mut PartitionStore) { } items }; - - assert_eq!(items.len(), 3, "Should see 3 items"); - assert_eq!( - collect_ids(&items), - vec![entry_id(5), entry_id(10), entry_id(30)] - ); + assert_eq!(collect_ids(&items), vec![entry_id(1), entry_id(2)]); } pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { @@ -1062,16 +553,8 @@ pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { verify_waiting_cursor_boundary_is_respected(db); verify_waiting_cursor_partition_prefix_boundary_is_respected(db); - // Tailing iterator tests - these need mutable access to rocksdb for writes - tailing_iterator_sees_new_items_on_reseek(&mut rocksdb).await; - reseek_shows_new_higher_order_items(&mut rocksdb).await; - tailing_iterator_sees_new_items_on_seek_after(&mut rocksdb).await; - tailing_iterator_sees_inserted_ahead_without_reseek(&mut rocksdb).await; - tailing_iterator_sees_flushed_insertions_mid_iteration(&mut rocksdb).await; - seeked_tailing_iterator_sees_spliced_insertions_after_preflush(&mut rocksdb).await; - seeked_tailing_iterator_sees_appended_insertions_after_preflush(&mut rocksdb).await; - tailing_iterator_sees_item_added_after_end_without_reseek(&mut rocksdb).await; - deleted_items_not_visible_after_reseek(&mut rocksdb).await; - deleted_items_not_visible_after_seek_after(&mut rocksdb).await; - concurrent_enqueue_and_delete(&mut rocksdb).await; + // Snapshot-iterator tests — exercise the contract that a fresh reader + // sees current storage and that an existing reader holds a fixed view. + fresh_reader_sees_current_state(&mut rocksdb).await; + existing_reader_does_not_see_post_snapshot_writes(&mut rocksdb).await; } diff --git a/crates/partition-store/src/vqueue_table/reader.rs b/crates/partition-store/src/vqueue_table/reader.rs index 99a1870f0a..ab8bfface0 100644 --- a/crates/partition-store/src/vqueue_table/reader.rs +++ b/crates/partition-store/src/vqueue_table/reader.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::vqueue_table::VQueueStore; +use restate_storage_api::vqueue_table::{Options, VQueueStore}; use restate_types::vqueues::VQueueId; use crate::PartitionDb; @@ -24,7 +24,7 @@ impl VQueueStore for PartitionDb { VQueueRunningReader::new(self, qid) } - fn new_inbox_reader(&self, qid: &VQueueId) -> Self::InboxReader { - VQueueWaitingReader::new(self, qid) + fn new_inbox_reader(&self, qid: &VQueueId, opts: Options) -> Self::InboxReader { + VQueueWaitingReader::new(self, qid, opts) } } diff --git a/crates/partition-store/src/vqueue_table/running_reader.rs b/crates/partition-store/src/vqueue_table/running_reader.rs index cc21bfba67..f8d44aa5a5 100644 --- a/crates/partition-store/src/vqueue_table/running_reader.rs +++ b/crates/partition-store/src/vqueue_table/running_reader.rs @@ -13,7 +13,7 @@ use bilrost::OwnedMessage; use rocksdb::DBRawIteratorWithThreadMode; use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueCursor}; +use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueRunningCursor}; use restate_types::vqueues::VQueueId; use crate::PartitionDb; @@ -60,57 +60,15 @@ impl VQueueRunningReader { } } -impl VQueueCursor for VQueueRunningReader { +impl VQueueRunningCursor for VQueueRunningReader { fn seek_to_first(&mut self) { self.it.seek_to_first(); } - fn seek_after(&mut self, _qid: &VQueueId, _key: &EntryKey) { - panic!("seek_after is not supported for running snapshot reader"); - } - fn advance(&mut self) { self.it.next(); } - /// Returns the current key under cursor - fn current_key(&mut self) -> Result, StorageError> { - if let Some(key) = self.it.key() { - debug_assert_eq!(key.len(), RunningKey::serialized_length_fixed()); - - // The portion we are interested in is everything that represents the EntryKey - let entry_key = - ::decode(&mut &key[RunningKey::offset_of_entry_key()..])?; - Ok(Some(entry_key)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - - /// Returns the current value under cursor - fn current_value(&mut self) -> Result, StorageError> { - if let Some(mut value) = self.it.value() { - let value = EntryValue::decode(&mut value)?; - Ok(Some(value)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - fn peek(&mut self) -> Result, StorageError> { if let Some((key, mut value)) = self.it.item() { debug_assert_eq!(key.len(), RunningKey::serialized_length_fixed()); diff --git a/crates/partition-store/src/vqueue_table/waiting_reader.rs b/crates/partition-store/src/vqueue_table/waiting_reader.rs index 48a91b4a35..e1bfedef95 100644 --- a/crates/partition-store/src/vqueue_table/waiting_reader.rs +++ b/crates/partition-store/src/vqueue_table/waiting_reader.rs @@ -8,12 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; +use std::sync::Arc; + use bilrost::OwnedMessage; use rocksdb::DBRawIteratorWithThreadMode; +use restate_rocksdb::RocksDb; use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::{EntryKey, EntryValue, Stage, VQueueCursor}; +use restate_storage_api::vqueue_table::{ + CursorError, EntryKey, EntryValue, Options, Stage, VQueueCursor, +}; use restate_types::vqueues::VQueueId; use crate::PartitionDb; @@ -22,20 +26,35 @@ use crate::vqueue_table::InboxKey; use crate::vqueue_table::inbox::InboxKeyRef; pub struct VQueueWaitingReader { + qid: VQueueId, it: DBRawIteratorWithThreadMode<'static, rocksdb::DB>, + // Safety: This must be dropped last since the iterator references memory allocated by it. + // This is only set to Some if the iterator is configured to run with blocking IO. The + // assumption is that blocking iterators will run in background threads, so we need to pin + // the database until the iterator is dropped. + _db: Option>, } impl VQueueWaitingReader { - pub(crate) fn new(db: &PartitionDb, qid: &VQueueId) -> Self { + pub(crate) fn new(db: &PartitionDb, qid: &VQueueId, opts: Options) -> Self { let mut readopts = rocksdb::ReadOptions::default(); readopts.set_async_io(true); // this is not the place to be concerned about corruption, we favor speed // over safety for this particular use-case. readopts.set_verify_checksums(false); - readopts.set_tailing(true); + // We re-create this reader on every refill, so a fresh snapshot is what + // we want. A tailing iterator would see new writes but is unsafe across + // memtable flushes. + readopts.set_tailing(false); // use prefix extractors for efficient filtering. readopts.set_prefix_same_as_start(true); + if opts.allow_blocking_io { + readopts.set_read_tier(rocksdb::ReadTier::All); + } else { + readopts.set_read_tier(rocksdb::ReadTier::BlockCache); + } + // we know how big the prefix is let mut key_buf = [0u8; InboxKey::by_qid_prefix_len()]; InboxKeyRef::builder() @@ -54,10 +73,14 @@ impl VQueueWaitingReader { .raw_iterator_cf_opt(db.cf_handle(), readopts); Self { + qid: qid.clone(), // Safety: // The iterator is guaranteed to be dropped before the database is dropped, we hold to the // PartitionDb in the scheduler which pins the database and the column family. + // + // We also pin the database if blocking IO is configured. it: unsafe { super::ignore_iterator_lifetime(it) }, + _db: opts.allow_blocking_io.then(|| db.rocksdb().clone()), } } } @@ -67,9 +90,8 @@ impl VQueueCursor for VQueueWaitingReader { self.it.seek_to_first(); } - fn seek_after(&mut self, qid: &VQueueId, key: &EntryKey) { - tracing::trace!("Seeking after {key:?}"); - let mut key_buf = super::inbox::encode_stage_key(Stage::Inbox, qid, key); + fn seek_after(&mut self, key: &EntryKey) { + let mut key_buf = super::inbox::encode_stage_key(Stage::Inbox, &self.qid, key); let success = crate::convert_to_upper_bound(&mut key_buf); debug_assert!(success); self.it.seek(key_buf); @@ -79,57 +101,20 @@ impl VQueueCursor for VQueueWaitingReader { self.it.next(); } - /// Returns the current key under cursor - fn current_key(&mut self) -> Result, StorageError> { - if let Some(key) = self.it.key() { - debug_assert_eq!(key.len(), InboxKey::serialized_length_fixed()); - - // The portion we are interested in is everything that represents the EntryKey - let entry_key = - ::decode(&mut &key[InboxKey::offset_of_entry_key()..])?; - Ok(Some(entry_key)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - /// Returns the current value under cursor - fn current_value(&mut self) -> Result, StorageError> { - if let Some(mut value) = self.it.value() { - let value = EntryValue::decode(&mut value)?; - Ok(Some(value)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - - fn peek(&mut self) -> Result, StorageError> { + fn peek(&mut self) -> Result, CursorError> { if let Some((key, mut value)) = self.it.item() { debug_assert_eq!(key.len(), InboxKey::serialized_length_fixed()); let entry_key = ::decode(&mut &key[InboxKey::offset_of_entry_key()..])?; - let value = EntryValue::decode(&mut value)?; + + let value = EntryValue::decode(&mut value).map_err(StorageError::BilrostDecode)?; Ok(Some((entry_key, value))) } else { - // We reached the end (or an error). We cannot recover from this without seek. - self.it - .status() - .context("peek into vqueue iterator") - .map_err(StorageError::Generic)?; + self.it.status().map_err(|err| match err.kind() { + rocksdb::ErrorKind::Incomplete => CursorError::WouldBlock, + _ => CursorError::Other(StorageError::Generic(err.into())), + })?; // iterator is beyond the end, we can't peek Ok(None) } diff --git a/crates/rocksdb/src/background.rs b/crates/rocksdb/src/background.rs index 5252543a07..6584d70fa0 100644 --- a/crates/rocksdb/src/background.rs +++ b/crates/rocksdb/src/background.rs @@ -64,7 +64,7 @@ where ) .increment(1); - let span = tracing::Span::current().clone(); + let span = tracing::Span::current(); move || span.in_scope(|| self.run()) } @@ -75,7 +75,7 @@ where OP_TYPE => self.kind.as_static_str(), ) .increment(1); - let span = tracing::Span::current().clone(); + let span = tracing::Span::current(); move || { span.in_scope(|| { diff --git a/crates/storage-api/src/vqueue_table/store.rs b/crates/storage-api/src/vqueue_table/store.rs index d882e6103f..24df3d1f86 100644 --- a/crates/storage-api/src/vqueue_table/store.rs +++ b/crates/storage-api/src/vqueue_table/store.rs @@ -10,30 +10,54 @@ use restate_types::vqueues::VQueueId; +use crate::StorageError; + use super::{EntryKey, EntryValue}; -use crate::Result; + +pub struct Options { + /// Allows blocking IO when we need to read from the storage + /// operations that block should be performed on IO threads. + /// + /// When set to false, seek and read operations may return `WouldBlock` if + /// the operation cannot be performed without blocking. + pub allow_blocking_io: bool, +} /// Storage for vqueue heads (e.g., RocksDB `ready_idx`). pub trait VQueueStore { - type RunningReader: VQueueCursor + Send + Unpin; - type InboxReader: VQueueCursor + Send + Unpin; + type RunningReader: VQueueRunningCursor + Send + Unpin; + type InboxReader: VQueueCursor + Send + Unpin + 'static; fn new_run_reader(&self, qid: &VQueueId) -> Self::RunningReader; - fn new_inbox_reader(&self, qid: &VQueueId) -> Self::InboxReader; + fn new_inbox_reader(&self, qid: &VQueueId, opts: Options) -> Self::InboxReader; } -/// Iterator over vqueue entries +/// Iterator over inbox-stage vqueue entries pub trait VQueueCursor { /// Moves the cursor to the beginning (min key) of the vqueue fn seek_to_first(&mut self); /// Moves the cursor to point strictly after `item` - fn seek_after(&mut self, qid: &VQueueId, item: &EntryKey); - /// Returns the current key under cursor - fn current_key(&mut self) -> Result>; - /// Returns the current value under cursor - fn current_value(&mut self) -> Result>; + fn seek_after(&mut self, item: &EntryKey); + /// Advancing the cursor. + fn advance(&mut self); /// Peek item without advancing the cursor - fn peek(&mut self) -> Result>; + fn peek(&mut self) -> Result, CursorError>; +} + +/// Iterator over already running vqueue entries +pub trait VQueueRunningCursor { + /// Moves the cursor to the beginning (min key) of the vqueue + fn seek_to_first(&mut self); + /// Peek item without advancing the cursor + fn peek(&mut self) -> Result, StorageError>; /// Advancing the cursor. If this fails, the error is returned on the next call to peek() fn advance(&mut self); } + +#[derive(Debug, thiserror::Error)] +pub enum CursorError { + #[error("operation cannot be completed without blocking this thread")] + WouldBlock, + #[error(transparent)] + Other(#[from] StorageError), +} diff --git a/crates/vqueues/Cargo.toml b/crates/vqueues/Cargo.toml index 6ea4b67b02..7394eb4291 100644 --- a/crates/vqueues/Cargo.toml +++ b/crates/vqueues/Cargo.toml @@ -18,9 +18,11 @@ restate-clock = { workspace = true } restate-futures-util = { workspace = true } restate-limiter = { workspace = true } restate-memory = { workspace = true } +restate-platform = { workspace = true } restate-rocksdb = { workspace = true } restate-serde-util = { workspace = true } restate-storage-api = { workspace = true } +restate-time-util = { workspace = true } restate-types = { workspace = true } restate-util-string = { workspace = true } restate-worker-api = { workspace = true } diff --git a/crates/vqueues/src/lib.rs b/crates/vqueues/src/lib.rs index 94ff262348..6cb8507e21 100644 --- a/crates/vqueues/src/lib.rs +++ b/crates/vqueues/src/lib.rs @@ -675,7 +675,7 @@ where debug!( header = ?header, - "[yield] entry: {}, next_stage: '{}', new_status: {new_status}", + "[yield] entry: {}, next_stage: '{}', new_status: {new_status}", Stage::Inbox, header.display_entry_id() ); diff --git a/crates/vqueues/src/scheduler/drr.rs b/crates/vqueues/src/scheduler/drr.rs index cc52639a78..1fdc46b33c 100644 --- a/crates/vqueues/src/scheduler/drr.rs +++ b/crates/vqueues/src/scheduler/drr.rs @@ -68,7 +68,6 @@ pub struct DRRScheduler { } impl DRRScheduler { - #[allow(clippy::too_many_arguments)] pub fn new( limit_qid_per_poll: NonZeroU16, max_items_per_decision: NonZeroU16, @@ -92,6 +91,7 @@ impl DRRScheduler { qid.clone(), handle, VQueueMetaLite::new(meta), + &storage, meta.num_running(), ) }); @@ -156,13 +156,13 @@ impl DRRScheduler { break; } - let Some(handle) = this.eligible.next_eligible(this.storage, this.q)? else { + let Some(handle) = this.eligible.next_eligible(cx, this.storage, this.q)? else { break; }; let qstate = this.q.get_mut(handle).unwrap(); - match qstate.try_pop(cx, this.storage, this.resource_manager)? { + match qstate.try_pop(cx, this.resource_manager)? { Pop::NeedsCredit => { this.eligible.rotate_one(); } diff --git a/crates/vqueues/src/scheduler/eligible.rs b/crates/vqueues/src/scheduler/eligible.rs index dc4d281212..817b67ad25 100644 --- a/crates/vqueues/src/scheduler/eligible.rs +++ b/crates/vqueues/src/scheduler/eligible.rs @@ -110,10 +110,13 @@ impl EligibilityTracker { pub fn next_eligible( &mut self, + cx: &mut std::task::Context<'_>, storage: &S, vqueues: &mut SlotMap>, ) -> Result, StorageError> { - loop { + let n = self.ready_ring.len(); + // avoid rescanning the ready ring multiple rounds + for _ in 0..n { // what is my current status let Some(handle) = self.ready_ring.front().copied() else { return Ok(None); @@ -137,12 +140,12 @@ impl EligibilityTracker { match current_state { State::NeedsPoll => { // update the state based on eligibility. - match qstate.poll_eligibility(storage)?.as_compact() { - Eligibility::Eligible => { + match qstate.poll_eligibility(cx, storage) { + Poll::Ready(Ok(Eligibility::Eligible)) => { *current_state = State::Ready; return Ok(Some(handle)); } - Eligibility::EligibleAt(ts) => { + Poll::Ready(Ok(Eligibility::EligibleAt(ts))) => { let ts = ts.as_unix_millis(); let duration = ts.duration_since(SchedulerClock.now_millis()); let timer_key = self.delayed_eligibility.insert(handle, duration); @@ -152,24 +155,30 @@ impl EligibilityTracker { self.ready_ring.pop_front(); continue; } - Eligibility::NotEligible => { + Poll::Ready(Ok(Eligibility::NotEligible)) => { self.ready_ring.pop_front(); self.remove(handle); continue; } + Poll::Ready(Err(err)) => { + return Err(err); + } + Poll::Pending => { + self.ready_ring.rotate_left(1); + continue; + } } } - State::BlockedOn(_) => { + State::BlockedOn(_) | State::Scheduled { .. } => { self.ready_ring.pop_front(); } State::Ready => { return Ok(Some(handle)); } - State::Scheduled { .. } => { - self.ready_ring.pop_front(); - } } } + + Ok(None) } pub fn remove(&mut self, handle: VQueueHandle) { diff --git a/crates/vqueues/src/scheduler/queue.rs b/crates/vqueues/src/scheduler/queue.rs index f381f3f5ac..3bdc4556fa 100644 --- a/crates/vqueues/src/scheduler/queue.rs +++ b/crates/vqueues/src/scheduler/queue.rs @@ -8,20 +8,236 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueCursor, VQueueStore}; +use std::collections::VecDeque; +use std::num::NonZeroU32; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{Poll, ready}; + +use itertools::{EitherOrBoth, Itertools as _}; +use tokio::time::Instant; +use tracing::trace; + +use restate_storage_api::vqueue_table::CursorError; +use restate_storage_api::vqueue_table::{ + EntryKey, EntryValue, VQueueCursor, VQueueRunningCursor, VQueueStore, +}; +use restate_storage_api::{StorageError, vqueue_table}; +use restate_time_util::DurationExt; use restate_types::vqueues::VQueueId; use super::UnconfirmedAssignments; -#[derive(Debug)] -enum Head { - /// We need a seek+read to know the head. - Unknown, - /// The current cursor's head - Known { key: EntryKey, value: EntryValue }, - /// We know that we've reached the end of the vqueue - Empty, +/// The number of entries we are willing to keep in cache +const INBOX_CACHE_CAPACITY: usize = 24; + +enum Overlay { + Add(EntryValue), + Tombstone, +} + +/// In-flight async refill, plus an overlay of `notify_enqueued` / +/// `notify_removed` events that arrived while the task was running. +/// +/// # Storage-write-before-notify invariant +/// +/// The partition processor commits inbox writes/deletes to RocksDB *before* +/// dispatching `EnqueuedToInbox` / `RemovedFromInbox` events to the +/// scheduler. This ordering rule underpins the following overlay reasoning: +/// +/// 1. **Why tombstones are needed at all.** The refill task's RocksDB +/// snapshot is fixed at task-start time. A `notify_removed` that arrives +/// while the task is in flight may correspond to a delete that committed +/// *before* the task started its scan (snapshot still sees the row) or +/// *after* (snapshot doesn't). We can't tell from the notification +/// alone, so we record a tombstone and let `merge_join_by` suppress the +/// row if it shows up in the result. +/// +/// 2. **Why `push_added_item` upgrades a tombstone in place.** An +/// Inbox→Inbox yield emits `RemovedFromInbox` immediately followed by +/// `EnqueuedToInbox` for the same key (when `set_run_at(None)` leaves +/// the key unchanged). The storage commits land in the same order +/// *before* the notifications dispatch, so the row is present in the +/// final state. The overlay therefore replaces the tombstone with an +/// `Add(value)`. A second add without an intervening tombstone is +/// still treated as an invariant violation and panics. +/// +/// 3. **Why cancelling an in-flight task discards the overlay safely** (see +/// [`RefillState::update_anchor`]). When cancellation kicks in, every +/// pending notification has already had its storage write committed. +/// The next fresh refill takes a new snapshot and sees post-commit +/// state directly, so the overlay would be redundant. +struct RefillTask { + started_at: Instant, + refill_anchor: Option, + /// Items that are known to be added or removed while the refill was in-flight + /// Sorted ascending by EntryKey. + overlay: VecDeque<(EntryKey, Overlay)>, + /// Cooperative cancellation flag shared with the spawned blocking task. + /// `tokio::task::spawn_blocking` tasks are not abortable, so dropping the + /// `JoinHandle` only detaches the work. We instead set this flag in + /// `Drop` and have the worker check it between rows so it exits promptly + /// and frees its blocking-pool slot. + cancel: Arc, + handle: tokio::task::JoinHandle, StorageError>>, + /// Exclusive upper bound on the overlay's coverage. `None` until the + /// first at-capacity event; once set, only shrinks. + /// + /// On overflow we set this (exclusive upper bound) instead of + /// evicting from the overlay. Overlay events and merge-time storage rows + /// with `key >= horizon` are then dropped; the next refill rediscovers + /// them via `seek_after(cache.back)` (always below `horizon`). Avoids + /// the silent-drop hazard of back-eviction — a popped tombstone could + /// re-admit a deleted row — at the cost of re-fetching some + /// already-read storage rows. + horizon: Option, +} + +impl Drop for RefillTask { + fn drop(&mut self) { + self.cancel.store(true, Ordering::Relaxed); + } +} +impl RefillTask { + /// Capacity/horizon handling shared by the `push_*` methods. Caller resolves + /// duplicates first; `pos` is insertion position. + /// + /// Returns `Some(pos)` to insert there, or `None` if the event was dropped. + fn prepare_overlay_insert(&mut self, key: EntryKey, pos: usize) -> Option { + if self.horizon.is_some_and(|h| key >= h) { + return None; + } + if self.overlay.len() < INBOX_CACHE_CAPACITY { + return Some(pos); + } + // At capacity: shrink the horizon to seal the affected range. + if pos == self.overlay.len() { + // New key sorts past every overlay entry — it becomes the + // horizon itself and is not tracked. + self.horizon = Some(self.horizon.map_or(key, |h| h.min(key))); + None + } else { + // New key displaces the back; the back's key becomes the + // horizon (anything at-or-above it is now uncertain). + let back_key = self + .overlay + .back() + .expect("overlay at capacity must have a back") + .0; + self.horizon = Some(self.horizon.map_or(back_key, |h| h.min(back_key))); + self.overlay.pop_back(); + Some(pos) + } + } + + fn push_tombstone(&mut self, key_to_remove: &EntryKey) { + let pos = match self + .overlay + .binary_search_by_key(key_to_remove, |&(k, _)| k) + { + Ok(pos) => { + // Existing overlay entry → upgrade to Tombstone. + self.overlay.get_mut(pos).unwrap().1 = Overlay::Tombstone; + return; + } + Err(pos) => pos, + }; + if let Some(pos) = self.prepare_overlay_insert(*key_to_remove, pos) { + self.overlay + .insert(pos, (*key_to_remove, Overlay::Tombstone)); + } + } + + fn push_added_item(&mut self, key_to_add: &EntryKey, value: &EntryValue) { + let pos = match self.overlay.binary_search_by_key(key_to_add, |&(k, _)| k) { + Ok(pos) => { + let entry = self.overlay.get_mut(pos).unwrap(); + match entry.1 { + // Tombstone → Add: an Inbox→Inbox yield emits + // `RemovedFromInbox(k)` immediately followed by + // `EnqueuedToInbox { key: k, .. }` whenever + // `set_run_at(None)` leaves the key unchanged + // (see `yield_entry` in `lib.rs`). The final storage + // state has the row, so the overlay must reflect that. + Overlay::Tombstone => { + entry.1 = Overlay::Add(value.clone()); + } + // Two adds for the same key without an intervening + // tombstone is a real invariant violation: the partition + // processor never re-enqueues a still-present key. + Overlay::Add(_) => panic!( + "duplicate enqueue for the same key without an intervening removal: {key_to_add:?}" + ), + } + return; + } + Err(pos) => pos, + }; + if let Some(pos) = self.prepare_overlay_insert(*key_to_add, pos) { + self.overlay + .insert(pos, (*key_to_add, Overlay::Add(value.clone()))); + } + } +} + +#[derive(derive_more::Debug)] +enum RefillState { + #[debug("standby")] + Standby { + /// `seek_after` anchor for the next refill, and the upper bound used to + /// decide whether to accept a `notify_enqueued`. Items currently in the + /// cache, items previously consumed from the cache, and items in the + /// caller's skip set together account for every inbox key + /// `<= refill_anchor`. Keys `> refill_anchor` are undiscovered and will + /// appear on the next refill via `seek_after(refill_anchor)`. + refill_anchor: Option, + }, + #[debug("in-flight (age: {})", _0.started_at.elapsed().friendly())] + InFlight(Box), +} + +impl Default for RefillState { + fn default() -> Self { + Self::Standby { + refill_anchor: None, + } + } +} + +impl RefillState { + /// Updates the standby anchor, or cancels an in-flight task and resets + /// to standby with the given anchor. + /// + /// Dropping an in-flight `RefillTask` here also drops its overlay + /// (pending `Add` / `Tombstone` records). This is safe per the + /// storage-write-before-notify invariant documented on [`RefillTask`]: + /// every notification we have observed (and thus every overlay entry) + /// has its storage write already committed, so the next fresh refill's + /// snapshot reflects them directly without needing the overlay. + fn update_anchor(&mut self, new_anchor: Option) { + match self { + RefillState::Standby { refill_anchor } => { + *refill_anchor = new_anchor; + } + RefillState::InFlight(_) => { + trace!("Refill task was in-flight but no longer needed, cancelling it"); + // Dropping the `RefillTask` flips its cancel flag (see + // `Drop for RefillTask`); the worker observes it between + // rows and exits promptly. The `JoinHandle` itself only + // detaches on drop — `spawn_blocking` tasks are not + // abortable. See `RefillTask` doc for why discarding the + // overlay is safe here. + *self = RefillState::Standby { + refill_anchor: new_anchor, + } + } + } + } + + fn is_in_flight(&self) -> bool { + matches!(self, Self::InFlight(_)) + } } #[derive(Debug)] @@ -38,683 +254,632 @@ pub enum QueueItem<'a> { } #[derive(derive_more::Debug)] -pub(crate) enum Reader { - /// Reader was never opened and might need to scan running items - New { already_running: u32 }, +enum Stage { + /// Brand-new queue; running items still need to be drained first. + New { + already_running: NonZeroU32, + reader: Option, + }, + /// In running stage. Single-item head, single-shot reader. #[debug("Running")] Running { - remaining: u32, + head: (EntryKey, EntryValue), reader: S::RunningReader, + remaining: u32, }, - #[debug("Inbox")] - Inbox(S::InboxReader), - // We can transition back to Reader::Inbox if new items have been added to the inbox - // but we should never return to `Running`. - #[debug("Closed")] - Closed, + /// In inbox stage. The queue's `inbox_cache` is the source of truth + /// between refills. + Inbox, + /// Inbox is fully drained. + Empty, } pub(crate) struct Queue { - head: Head, - reader: Reader, + stage: Stage, + /// Backing cache for the inbox stage. + /// Meaningful only when `stage` is `Inbox` or `Empty`; for + /// other stages it must be empty (invariant maintained by `advance` / + /// `remove` / `enqueue`). + /// + /// Sorted ascending by EntryKey. Front = current head. + items: VecDeque<(EntryKey, EntryValue)>, + refill_state: RefillState, } impl Queue { /// Creates a new queue that must first go through the given number of running items /// before it switches to reading the waiting inbox. - pub fn new(num_running: u32) -> Self { + pub fn new(num_running: u32, storage: &S, qid: &VQueueId) -> Self { + let stage = if num_running > 0 { + Stage::New { + already_running: NonZeroU32::new(num_running).unwrap(), + reader: Some(storage.new_run_reader(qid)), + } + } else { + Stage::Inbox + }; + Self { - head: Head::Unknown, - reader: Reader::New { - already_running: num_running, - }, + stage, + items: VecDeque::with_capacity(INBOX_CACHE_CAPACITY), + refill_state: Default::default(), } } /// Creates an empty queue pub fn new_closed() -> Self { Self { - head: Head::Empty, - reader: Reader::Closed, + stage: Stage::Empty, + items: VecDeque::with_capacity(INBOX_CACHE_CAPACITY), + refill_state: Default::default(), } } /// If the queue is known to be empty (no more items to dequeue) pub fn is_empty(&self) -> bool { - matches!(self.head, Head::Empty) + matches!(self.stage, Stage::Empty) } + /// Returns `true` iff the removed key was the head of the queue. + /// + /// While the queue is still in the running stage, this is a no-op: the + /// scheduler may still yield a running item after the state machine has + /// declared it removed, and the state machine must ignore that yield. pub fn remove(&mut self, key_to_remove: &EntryKey) -> bool { - // Can this be the known head? - // Yes. Perhaps it expired/ended externally. - // We do not do anything if the reader is still at the running stage, - // - // This means that the scheduler might still yield the "running" item after - // the state machine has declared it as completed/removed. The state machine - // must be able to handle this case and "ignore" the yield command of this item. - if matches!(self.reader, Reader::Closed | Reader::Inbox(..)) - && let Head::Known { ref key, .. } = self.head - && key == key_to_remove - { - self.head = Head::Unknown; - // Ensure that next advance would re-seek to the newly added item - self.reader = Reader::Closed; - true - } else { - false + if matches!( + self.stage, + Stage::New { .. } | Stage::Running { .. } | Stage::Empty + ) { + return false; } + + // We are in Inbox/Waiting stage. + let Ok(pos) = self.items.binary_search_by_key(key_to_remove, |&(k, _)| k) else { + // Not in cache. The key is either: + // - already shipped (in the unconfirmed-assignments set) or never + // in the inbox to start with — ignore; + // - within the in-flight refill's coverage zone (`<= refill_anchor`) + // — also ignore, the refill's snapshot will reflect the deletion + // directly; + // - above the in-flight refill's coverage zone — record as a + // tombstone in the overlay so the merge step can suppress it. + match self.refill_state { + RefillState::Standby { .. } => {} + RefillState::InFlight(ref mut task) + if task + .refill_anchor + .is_none_or(|ref refill| key_to_remove > refill) => + { + // The refill task is in flight. We cannot determine if the key will impact + // its result or not, we'll keep at most INBOX_CACHE_CAPACITY tombstones to + // use as overlay when the refill is complete. + task.push_tombstone(key_to_remove); + } + RefillState::InFlight(_) => { + // Ignore it. This item has either been shipped (unconfirmed assignment) + // or was never in the inbox to start with. + } + } + + return false; + }; + self.items.remove(pos); + // The anchor stays put — even if we removed the back, the entry was + // also removed from storage, so `seek_after(anchor)` will skip it + // naturally on the next refill. + pos == 0 } - /// Returns true if the head was changed + /// Returns `true` iff the new item became the head of the queue. pub fn enqueue(&mut self, key: &EntryKey, value: &EntryValue) -> bool { - match (&self.head, &self.reader) { - // we are only unknown if we are new and didn't read the running list yet, - // we might also be in a limbo state if advance() failed. - (_, Reader::New { .. } | Reader::Running { .. }) => { /* do nothing */ } - (Head::Unknown, _) => { /* do nothing */ } - (Head::Empty, _) => { - self.reader = Reader::Closed; - self.head = Head::Known { - key: *key, - value: value.clone(), - }; + match self.stage { + Stage::New { .. } | Stage::Running { .. } => return false, + Stage::Empty => { + // The cache is already allocated and empty (kept around + // across the previous `Inbox -> Empty` transition). Just + // re-seed it and flip the marker. + assert!(self.items.is_empty()); + assert!(!self.refill_state.is_in_flight()); + self.items.push_back((*key, value.clone())); + self.refill_state.update_anchor(Some(*key)); + self.stage = Stage::Inbox; return true; } - ( - Head::Known { - key: current_key, .. - }, - Reader::Inbox(_) | Reader::Closed, - ) => { - if key < current_key { - self.head = Head::Known { - key: *key, - value: value.clone(), - }; - // Ensure that next advance would re-seek to the newly added item - self.reader = Reader::Closed; - return true; - } else { - // This is a temporary fix to ensure that we perform a re-seek - // to fix the issue where the iterator wouldn't see the newly added - // items if the memtable was flushed prior the seek. - self.reader = Reader::Closed; - } + Stage::Inbox => { /* fall-through */ } + } + + // Insert `(key, value)` into the sorted cache. + // + // Returns `true` iff the item became the new front (head). Returns + // `false` otherwise: the item is a duplicate, was inserted at a + // non-front position, or was rejected because its key falls strictly + // above the cache's coverage zone (`> refill_anchor`). A rejected item + // will be discovered on the next refill. + + // Priority-queue rule: ignore items strictly above our coverage zone. + // The bound is `refill_anchor`, NOT `cache.back`. After consuming the + // back of the cache, `cache.back` can be lower than `refill_anchor`, + // and items in `(cache.back, refill_anchor]` would be lost (the next + // refill seek_after(refill_anchor) only returns keys strictly greater + // than the anchor). + match self.refill_state { + RefillState::Standby { refill_anchor, .. } + if refill_anchor.is_none_or(|ref refill| key > refill) => + { + // We cannot accept items beyond our coverage zone. + return false; + } + RefillState::Standby { .. } => { /* in coverage zone */ } + // This enqueue may or may not appear in the in-flight operation and + // there is no way to determine that. So, we stage it on the in-flight + // task overlay until we receive the result. + RefillState::InFlight(ref mut task) + if task.refill_anchor.is_none_or(|ref refill| key > refill) => + { + task.push_added_item(key, value); + return false; + } + RefillState::InFlight(ref task) => { + // the new item is < than what the refill task is interested in. Therefore, + // we add it right now. + assert!(task.refill_anchor.is_some_and(|ref refill| key <= refill)); } } - false - } - /// Returns the head if known, or None if the queue needs advancing - pub fn head(&self) -> Option> { - match (&self.head, &self.reader) { - (Head::Unknown, _) => None, - (_, Reader::New { .. }) => None, - (Head::Known { key, value }, Reader::Running { .. }) => { - Some(QueueItem::Running { key, value }) + let pos = match self.items.binary_search_by_key(key, |&(k, _)| k) { + Ok(_) => { + // We already know about this key which means that the head has not changed + // as a result of this enqueue. + return false; } - (Head::Known { key, value }, Reader::Inbox(_) | Reader::Closed) => { - Some(QueueItem::Inbox { key, value }) + Err(pos) => pos, + }; + + // We need to be careful about moving the refill anchor if there is an + // in-flight refill task. + // + // let's say we have a refill task in flight, and we are enqueueing smaller + // than it's anchor point, we are confident that the task will never return + // this item. If as a result of the enqueue we exceeded the cache capacity. + // + // If there was an in-flight refill, we must cancel it and reset our refill + // anchor to point to the back of the cache so that the next refill can + // re-discover it. This is an acceptable trade-off because we must be here + // because we have sufficiently populated the cache with recently enqueued + // items and we wouldn't need the refill immediately anyway. + + // At-cap fast path: avoid the VecDeque grow/shrink that would happen + // if we inserted first and evicted afterwards. + if self.items.len() >= INBOX_CACHE_CAPACITY { + if pos == self.items.len() { + // The new key would land at the back and be evicted on the + // very next step. Skip the insert, but still lower the anchor + // to the current back so the next refill can re-discover `key` + // via `seek_after(anchor)`. + // + // Cancels the in-flight task if exists + self.refill_state + .update_anchor(self.items.back().map(|(k, _)| *k)); + return false; } - (Head::Empty, _) => Some(QueueItem::None), + // Make room first; `pos` is unaffected because pos < old_len in + // this branch (we shift elements right of `pos` regardless). + debug_assert!(pos < self.items.len()); + self.items.pop_back(); + self.items.insert(pos, (*key, value.clone())); + + // Cancels the in-flight task if exists + self.refill_state + .update_anchor(self.items.back().map(|(k, _)| *k)); + return pos == 0; + } + + // Normal path: cache below capacity. + self.items.insert(pos, (*key, value.clone())); + // Anchor maintenance: if it was None (very first insert), set it to + // this key. Otherwise the precondition above guarantees + // `key <= anchor`, so the anchor stays put. + if let RefillState::Standby { refill_anchor } = &mut self.refill_state + && refill_anchor.is_none() + { + *refill_anchor = Some(*key); } + pos == 0 } - pub fn advance_if_needed( + /// Returns the head if known, or `None` if the queue needs advancing. + pub fn head(&self) -> Option> { + match &self.stage { + Stage::New { .. } => None, + Stage::Running { head: (k, v), .. } => Some(QueueItem::Running { key: k, value: v }), + Stage::Inbox => self + .items + .front() + .map(|(k, v)| QueueItem::Inbox { key: k, value: v }), + Stage::Empty => Some(QueueItem::None), + } + } + + pub fn poll_advance_if_needed( &mut self, + cx: &mut std::task::Context<'_>, storage: &S, skip: &UnconfirmedAssignments, qid: &VQueueId, - ) -> Result, StorageError> { - // Keep advancing until the head is known - while matches!(self.head, Head::Unknown) { - self.advance(storage, skip, qid)?; - } - - match (&self.head, &self.reader) { - (Head::Unknown, _) => unreachable!("head must be known"), - (_, Reader::New { .. }) => unreachable!("reader cannot be new after poll"), - (Head::Known { key, value }, Reader::Running { .. }) => { - Ok(QueueItem::Running { key, value }) + effectively_empty: bool, + allow_blocking_io: bool, + ) -> Poll, StorageError>> { + loop { + let needs_advance = match self.stage { + Stage::New { .. } => true, + Stage::Inbox => self.items.is_empty(), + _ => false, + }; + if !needs_advance { + break; } - (Head::Known { key, value }, Reader::Inbox(_) | Reader::Closed) => { - Ok(QueueItem::Inbox { key, value }) + if !self.try_advance()? { + // We cannot advance without a refill + if self.refill_state.is_in_flight() { + ready!(self.poll_refill_task(cx, skip)); + // If we ended up also being empty and we can't determine if we are + // empty or not, we should start another refill task and return Pending. + // This happens automatically because in that case we'll "continue" + } else { + // Do we need to refill? + // We don't need to refill if we are at Inbox stage and we know no more + // inbox entries are available. + if effectively_empty && matches!(self.stage, Stage::Inbox) { + self.stage = Stage::Empty; + break; + } + // A) try refill immediate refill if allowed + match self.try_refill(storage, qid, skip, allow_blocking_io) { + Ok(_) => {} + Err(CursorError::WouldBlock) => { + // B) start an async refill task + self.start_refill_task(storage, qid); + } + Err(CursorError::Other(e)) => { + // C) fail miserably + return Poll::Ready(Err(e)); + } + } + } } - (Head::Empty, _) => Ok(QueueItem::None), } + + Poll::Ready(Ok(match &self.stage { + Stage::New { .. } => unreachable!("head must be resolved after advance_if_needed"), + Stage::Running { head: (k, v), .. } => QueueItem::Running { key: k, value: v }, + Stage::Inbox => { + let (k, v) = self + .items + .front() + .expect("inbox cache must have a head after advance_if_needed"); + QueueItem::Inbox { key: k, value: v } + } + Stage::Empty => QueueItem::None, + })) } /// Advances the queue to the next item. /// - /// The queue reader will skip over items in `skip` when reading the inbox stage. When reading - /// the running stage, the `skip` set is ignored. - pub fn advance( - &mut self, - storage: &S, - skip: &UnconfirmedAssignments, - qid: &VQueueId, - ) -> Result<(), StorageError> { + /// In the inbox stage this consumes the current head (if any) and exposes + /// the next cached item from the in-memory cache. Refilling the cache from + /// storage is the responsibility of `poll_advance_if_needed` / + /// `try_refill`, not this method. + /// + /// Returns false if advance was not possible and we need to perform a refill. + pub fn try_advance(&mut self) -> Result { + // Split into disjoint borrows so the `Stage::Inbox` arm below can + // mutate both `stage` and `inbox_cache` without fighting the + // borrow checker. + let Self { stage, items, .. } = self; loop { - match self.reader { - Reader::New { already_running } if already_running > 0 => { - let mut reader = storage.new_run_reader(qid); + match stage { + Stage::New { + already_running, + reader, + } => { + let mut reader = reader.take().unwrap(); reader.seek_to_first(); - let item = reader.peek()?; - if let Some((key, value)) = item { - self.head = Head::Known { key, value }; - self.reader = Reader::Running { - remaining: already_running, + if let Some((key, value)) = reader.peek()? { + *stage = Stage::Running { + head: (key, value), reader, + remaining: already_running.get(), }; - break; - } else { - assert!( - already_running > 0, - "vqueue {qid:?} has no running items but its metadata says that it has {already_running} running items", - ); - // move to inbox reading - self.head = Head::Unknown; - self.reader = Reader::Closed; + return Ok(true); } + panic!( + "vqueue has no running items but its metadata says it has {}", + already_running.get(), + ); } - Reader::New { .. } => { - // create new inbox reader - self.reader = Reader::Closed; - } - Reader::Running { - ref mut reader, - ref mut remaining, + Stage::Running { + reader, + remaining, + head, } => { reader.advance(); *remaining = remaining.saturating_sub(1); - let item = reader.peek()?; - if let Some((key, value)) = item { - debug_assert!(*remaining > 0); - self.head = Head::Known { key, value }; - break; - } else { - debug_assert_eq!(0, *remaining); - // move to inbox reading - self.head = Head::Unknown; - self.reader = Reader::Closed; - } - } - Reader::Inbox(ref mut reader) => { - reader.advance(); - let key = reader.current_key()?; - if let Some(key) = key { - if skip.contains_key(&key) { - continue; - } - self.head = Head::Known { - key, - value: reader.current_value()?.unwrap(), - }; - break; - } else { - // we are done reading inbox - self.head = Head::Empty; - self.reader = Reader::Closed; - break; - } - } - Reader::Closed => { - match self.head { - Head::Unknown => { - let mut reader = storage.new_inbox_reader(qid); - reader.seek_to_first(); - let key = reader.current_key()?; - if let Some(key) = key { - if skip.contains_key(&key) { - self.reader = Reader::Inbox(reader); - continue; - } - self.head = Head::Known { - key, - value: reader.current_value()?.unwrap(), - }; - self.reader = Reader::Inbox(reader); - break; - } else { - self.head = Head::Empty; - self.reader = Reader::Closed; - } + match reader.peek()? { + Some(next) => { + debug_assert!(*remaining > 0); + *head = next; + return Ok(true); } - Head::Known { ref key, .. } => { - // seek to known head first, then advance. - let mut reader = storage.new_inbox_reader(qid); - reader.seek_after(qid, key); - let next_key = reader.current_key()?; - if let Some(next_key) = next_key { - if skip.contains_key(&next_key) { - self.reader = Reader::Inbox(reader); - continue; - } - self.head = Head::Known { - key: next_key, - value: reader.current_value()?.unwrap(), - }; - self.reader = Reader::Inbox(reader); - break; - } else { - self.head = Head::Empty; - self.reader = Reader::Closed; - } - } - Head::Empty => { - // do nothing. - return Ok(()); + None => { + debug_assert_eq!(0, *remaining); + *stage = Stage::Inbox; } } } + Stage::Inbox => return Ok(items.pop_front().is_some()), + Stage::Empty => return Ok(true), } } - Ok(()) } - pub(crate) fn remaining_in_running_stage(&self) -> u32 { - match self.reader { - Reader::New { already_running } => already_running, - Reader::Running { remaining, .. } => remaining, - Reader::Inbox(..) => 0, - Reader::Closed => 0, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use restate_clock::RoughTimestamp; - use restate_clock::time::MillisSinceEpoch; - use restate_core::TaskCenter; - use restate_partition_store::{PartitionDb, PartitionStore, PartitionStoreManager}; - use restate_rocksdb::RocksDbManager; - use restate_storage_api::Transaction; - use restate_storage_api::vqueue_table::{ - EntryId, EntryKey, EntryKind, EntryMetadata, EntryValue, Stage, Status, WriteVQueueTable, - stats::EntryStatistics, - }; - use restate_types::clock::UniqueTimestamp; - use restate_types::identifiers::{PartitionId, PartitionKey}; - use restate_types::partitions::Partition; - use restate_types::sharding::KeyRange; - use restate_types::vqueues::VQueueId; - - const BASE_RUN_AT_MS: u64 = 1_744_000_000_000; - - fn test_entry_at(id: u8, has_lock: bool, run_at: MillisSinceEpoch) -> (EntryKey, EntryValue) { - let run_at = RoughTimestamp::from_unix_millis_clamped(run_at); - let created_at = UniqueTimestamp::try_from(1_000u64 + id as u64).unwrap(); - let entry_id = EntryId::new(EntryKind::Invocation, [id; EntryId::REMAINDER_LEN]); - let key = EntryKey::new(has_lock, run_at, id as u64, entry_id); - let stats = EntryStatistics::new(created_at, run_at); - let value = EntryValue { - status: if stats.first_runnable_at > created_at.to_unix_millis() { - Status::Scheduled - } else { - Status::New - }, - metadata: EntryMetadata::default(), - stats, + /// Refills `cache` with up to [`INBOX_CACHE_CAPACITY`] items from storage if + /// it's possible to do so without blocking the thread. + /// + /// Starting at the standby `refill_anchor` (or the very first key if no anchor + /// is set yet). Returns true if items were added. Items in the `skip` set are + /// not added to the cache, but the anchor advances past them so they are not + /// reconsidered on the next refill. + /// + /// # Panics + /// + /// Panics if `refill_state` is not [`RefillState::Standby`]. Callers must + /// ensure no refill task is already in flight. + fn try_refill( + &mut self, + storage: &S, + qid: &VQueueId, + skip: &UnconfirmedAssignments, + allow_blocking_io: bool, + ) -> Result<(), CursorError> { + let start = Instant::now(); + let mut reader = storage.new_inbox_reader(qid, vqueue_table::Options { allow_blocking_io }); + let RefillState::Standby { refill_anchor } = &mut self.refill_state else { + panic!("refill state must be standby"); }; - (key, value) - } - - fn test_entry(id: u8, has_lock: bool, run_at_ms: u64) -> (EntryKey, EntryValue) { - test_entry_at( - id, - has_lock, - MillisSinceEpoch::new(BASE_RUN_AT_MS + run_at_ms), - ) - } - - fn default_entry(id: u8) -> (EntryKey, EntryValue) { - test_entry(id, false, 0) - } - - fn test_qid(partition_key: u64) -> VQueueId { - VQueueId::custom(partition_key, "1") - } - - async fn storage_test_environment() -> PartitionStore { - let rocksdb_manager = RocksDbManager::init(); - TaskCenter::set_on_shutdown(Box::pin(async { - rocksdb_manager.shutdown().await; - })); - - let manager = PartitionStoreManager::create() - .await - .expect("DB storage creation succeeds"); - manager - .open( - &Partition::new(PartitionId::MIN, KeyRange::new(0, PartitionKey::MAX - 1)), - None, - ) - .await - .expect("DB storage creation succeeds") - } - - async fn insert_entries( - rocksdb: &mut PartitionStore, - qid: &VQueueId, - stage: Stage, - entries: &[(EntryKey, EntryValue)], - ) { - let mut txn = rocksdb.transaction(); - for (key, value) in entries { - txn.put_vqueue_inbox(qid, stage, key, value); + match refill_anchor { + Some(anchor) => reader.seek_after(anchor), + None => reader.seek_to_first(), } - txn.commit().await.expect("commit should succeed"); - } - #[restate_core::test] - async fn test_queue_running_to_inbox_to_empty() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(1000); - let running_entry = default_entry(1); - let inbox_entry = default_entry(2); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Running, - std::slice::from_ref(&running_entry), - ) - .await; - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&inbox_entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(1); - let mut skip = UnconfirmedAssignments::new(); - - assert!(!queue.is_empty()); - - let head = queue.advance_if_needed(db, &skip, &qid).unwrap(); - assert!(matches!(head, QueueItem::Running { key, .. } if *key == running_entry.0)); - assert!( - matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running_entry.0) - ); + while self.items.len() < INBOX_CACHE_CAPACITY { + let Some((key, value)) = reader.peek()? else { + if self.items.is_empty() { + self.stage = Stage::Empty; + } + break; + }; - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == inbox_entry.0) + if !skip.contains_key(&key) { + self.items.push_back((key, value)); + } + *refill_anchor = Some(refill_anchor.map_or(key, |a| a.max(key))); + reader.advance(); + } + tracing::debug!( + "non-blocking refill finished in {}, cache has {} items", + start.elapsed().friendly(), + self.items.len() ); - let Some(QueueItem::Inbox { key, .. }) = queue.head() else { - panic!("expected inbox head"); - }; - skip.insert(*key, Default::default()); - assert!(!queue.is_empty()); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(queue.is_empty()); - - let higher = default_entry(0); - assert!(queue.enqueue(&higher.0, &higher.1)); - let head = queue.advance_if_needed(db, &skip, &qid).unwrap(); - assert!(matches!(head, QueueItem::Inbox { key, .. } if *key == higher.0)); + Ok(()) } - #[restate_core::test] - async fn test_entry_key_ordering() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(2000); - let low = test_entry(1, false, 3_000); - let high = test_entry(2, false, 2_000); - let highest = test_entry(3, true, 9_000); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - &[low.clone(), high.clone(), highest.clone()], - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == highest.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == high.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == low.0)); - } + fn start_refill_task(&mut self, storage: &S, qid: &VQueueId) { + assert!(!self.refill_state.is_in_flight()); + let RefillState::Standby { refill_anchor } = self.refill_state else { + panic!("refill state must be standby"); + }; - #[restate_core::test] - async fn test_run_at_below_now_bumps_entry_higher_in_inbox() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(2_500); - let now = MillisSinceEpoch::now().as_u64(); - let future_entry = - test_entry_at(1, false, MillisSinceEpoch::new(now.saturating_add(60_000))); - let overdue_entry = - test_entry_at(2, false, MillisSinceEpoch::new(now.saturating_sub(1_000))); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - &[future_entry.clone(), overdue_entry.clone()], - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == overdue_entry.0) + let mut reader = storage.new_inbox_reader( + qid, + vqueue_table::Options { + allow_blocking_io: true, + }, ); - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == future_entry.0) - ); - } + let cancel = Arc::new(AtomicBool::new(false)); + let task_cancel = Arc::clone(&cancel); - #[restate_core::test] - async fn test_enqueue_replaces_head_on_smaller_key() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(3000); - let initial = test_entry(1, false, 3_000); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&initial), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == initial.0)); - - let higher = test_entry(2, false, 2_000); - assert!(queue.enqueue(&higher.0, &higher.1)); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == higher.0)); - assert!( - matches!(queue.advance_if_needed(db, &skip, &qid).unwrap(), QueueItem::Inbox { key, .. } if *key == higher.0) - ); + let handle = tokio::task::spawn_blocking(move || { + // collect and send the results at the end + let mut results = Vec::with_capacity(INBOX_CACHE_CAPACITY); + + match refill_anchor { + Some(anchor) => reader.seek_after(&anchor), + None => reader.seek_to_first(), + } - let lower = test_entry(3, false, 4_000); - assert!(!queue.enqueue(&lower.0, &lower.1)); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == higher.0)); + while results.len() < INBOX_CACHE_CAPACITY { + if task_cancel.load(Ordering::Relaxed) { + // Caller dropped the `RefillTask`; bail early so we + // don't keep the blocking-pool slot scanning rows + // nobody will read. + return Ok(results); + } + // In this mode, we don't expect to see WouldBlock + match reader.peek() { + Ok(Some((key, value))) => { + results.push((key, value)); + reader.advance(); + } + Ok(None) => { + // no more items + break; + } + Err(CursorError::WouldBlock) => { + unreachable!("background refill task should never see WouldBlock"); + } + Err(CursorError::Other(e)) => { + tracing::error!("refill task failed: {e}"); + return Err(e); + } + } + } + Ok(results) + }); + + let task = Box::new(RefillTask { + started_at: Instant::now(), + refill_anchor, + horizon: None, + overlay: VecDeque::with_capacity(INBOX_CACHE_CAPACITY), + cancel, + handle, + }); + + self.refill_state = RefillState::InFlight(task); } - #[restate_core::test] - async fn test_remove() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(4000); - let entry = default_entry(1); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let mut skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - let head_key = match queue.head() { - Some(QueueItem::Inbox { key, .. }) => *key, - _ => panic!("expected inbox head"), + fn poll_refill_task( + &mut self, + cx: &mut std::task::Context<'_>, + skip: &UnconfirmedAssignments, + ) -> Poll<()> { + let (items, overlay, mut refill_anchor, horizon) = match self.refill_state { + RefillState::Standby { .. } => return Poll::Ready(()), + RefillState::InFlight(ref mut task) => { + match ready!(Pin::new(&mut task.handle).poll(cx)) { + Err(join_err) => { + tracing::error!("refill task panicked: {join_err}"); + self.refill_state = RefillState::Standby { + refill_anchor: task.refill_anchor.take(), + }; + return Poll::Ready(()); + } + Ok(Err(err)) => { + tracing::error!("refill task failed: {err}"); + self.refill_state = RefillState::Standby { + refill_anchor: task.refill_anchor.take(), + }; + return Poll::Ready(()); + } + Ok(Ok(result)) => { + tracing::debug!( + "refill task finished with {} items in {}", + result.len(), + task.started_at.elapsed().friendly() + ); + ( + result, + std::mem::take(&mut task.overlay), + task.refill_anchor.take(), + task.horizon.take(), + ) + } + } + } }; + // If we got less items than what we asked, then this must have been the end of the queue + // at the time of the refill. + let end_of_queue = items.len() < INBOX_CACHE_CAPACITY; - assert!(!queue.remove(&default_entry(99).0)); - assert!(queue.remove(&head_key)); - assert!(queue.head().is_none()); - assert!( - matches!(queue.advance_if_needed(db, &skip, &qid).unwrap(), QueueItem::Inbox { key, .. } if *key == entry.0) - ); + // We now need to merge the items we received, apply the overlays, and deduplicate + // with existing entries in cache. + // At the end, we figure out what's the next anchor to use for the next refill. + // + // + // The strategy here is more complex than the blocking/inline version because + // concurrency is hard, who knew! + // We need to do all that while keeping the cache capacity in check. We don't + // want to re-allocate/resize the cache. + // + // We navigate both the overlays and the items in together in semi-lockstep. + // Technically, this is a LSM-style read/compaction algorithm. + for either in items + .into_iter() + .merge_join_by(overlay, |(key, _), (overlay_key, _)| key.cmp(overlay_key)) + { + // First key at-or-above the horizon ends the merge: overlay + // entries are all below it by construction, and the merge + // walks ascending, so the rest is storage that the next + // refill will rediscover via `seek_after`. + let key = match &either { + EitherOrBoth::Left((k, _)) + | EitherOrBoth::Right((k, _)) + | EitherOrBoth::Both((k, _), _) => *k, + }; + if horizon.is_some_and(|h| key >= h) { + break; + } + // Left is the item from db, right is the overlay + match either { + // Somewhat similar to a normal enqueue + EitherOrBoth::Left((key, value)) + | EitherOrBoth::Right((key, Overlay::Add(value))) + // overlay always wins. + | EitherOrBoth::Both((key, _), (_, Overlay::Add(value))) => { + if skip.contains_key(&key) { + // The key was already dispatched, skip it. + refill_anchor = Some(refill_anchor.map_or(key, |a| a.max(key))); + continue; + } + // The merge stream is strictly above every key currently + // in `self.items`: storage rows come from + // `seek_after(refill_anchor)`, the overlay only stores + // keys `> refill_anchor` (see `push_added_item` / + // `push_tombstone`), and `self.items` only holds keys + // `<= refill_anchor`. Therefore every accepted key + // appends at the back. + debug_assert!( + self.items.back().is_none_or(|(k, _)| *k < key), + "merge output must sort strictly above existing cache items" + ); + if self.items.len() >= INBOX_CACHE_CAPACITY { + // Cache is full; drop this row and let the next refill + // rediscover it via `seek_after(items.back())`. + refill_anchor = self.items.back().map(|(k, _)| *k); + break; + } + self.items.push_back((key, value)); + refill_anchor = Some(key); + } + EitherOrBoth::Right((key, Overlay::Tombstone)) + // overlay always wins. + | EitherOrBoth::Both((key, _), (_, Overlay::Tombstone)) => { + // In theory, we should never see a tombstone that impacts the existing + // cache. + debug_assert!(self.items.binary_search_by_key(&key, |&(k, _)| k).is_err()); + // we should push the anchor to this item. + refill_anchor = Some(key); + } + } + } - skip.insert(entry.0, Default::default()); - assert!(queue.remove(&head_key)); - assert!(queue.head().is_none()); - assert!(matches!( - queue.advance_if_needed(db, &skip, &qid).unwrap(), - QueueItem::None - )); - } + // It's very important that we must reset the task to standby + self.refill_state = RefillState::Standby { + refill_anchor: refill_anchor.or(self.items.back().map(|(k, _)| *k)), + }; - #[restate_core::test] - async fn test_skip_set() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(5000); - let entry1 = default_entry(1); - let entry2 = default_entry(2); - let entry3 = default_entry(3); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - &[entry1.clone(), entry2.clone(), entry3.clone()], - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let mut skip = UnconfirmedAssignments::new(); - skip.insert(entry1.0, Default::default()); - skip.insert(entry2.0, Default::default()); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == entry3.0)); - } + // at the end, if the cache is empty and end_of_queue is true, then we must + // have exhausted the inbox. + if self.items.is_empty() && end_of_queue { + self.stage = Stage::Empty; + } - #[restate_core::test] - async fn test_running_before_inbox_regardless_of_key() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(6000); - let running_entry = default_entry(1); - let inbox_entry = test_entry(2, true, 0); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Running, - std::slice::from_ref(&running_entry), - ) - .await; - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&inbox_entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(1); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Running { .. }))); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { .. }))); + Poll::Ready(()) } - #[restate_core::test] - async fn test_enqueue_and_remove_ignored_during_running_stage() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(7000); - let running1 = default_entry(1); - let running2 = default_entry(2); - let inbox_entry = test_entry(10, true, 0); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Running, - &[running1.clone(), running2.clone()], - ) - .await; - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&inbox_entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(2); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - let even_higher = test_entry(11, true, 0); - assert!(!queue.enqueue(&even_higher.0, &even_higher.1)); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - assert!(!queue.remove(&running2.0)); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - assert!(!queue.remove(&running1.0)); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running2.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == inbox_entry.0) - ); - - assert!(queue.remove(&inbox_entry.0)); - assert!(queue.head().is_none()); + pub(crate) fn remaining_in_running_stage(&self) -> u32 { + match &self.stage { + Stage::New { + already_running, .. + } => already_running.get(), + Stage::Running { remaining, .. } => *remaining, + Stage::Inbox | Stage::Empty => 0, + } } } + +#[cfg(test)] +#[path = "queue_test.rs"] +mod queue_test; diff --git a/crates/vqueues/src/scheduler/queue_test.rs b/crates/vqueues/src/scheduler/queue_test.rs new file mode 100644 index 0000000000..7b3dc0433f --- /dev/null +++ b/crates/vqueues/src/scheduler/queue_test.rs @@ -0,0 +1,437 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Test harness for the vqueue refill machinery. +//! +//! These tests use a fake [`VQueueStore`] (`GatedStore`) that lets the +//! caller freeze the snapshot a refill task sees and interleave +//! `notify_enqueued` / `notify_removed` events deterministically against +//! the in-flight refill thread. That's what makes overlay and cancellation +//! behaviour testable without a real RocksDB. +//! +//! Each [`GatedReader`] is created with `Options::allow_blocking_io` set +//! by the queue itself: +//! - The first attempt (`try_refill`) passes `allow_blocking_io: false`; +//! the fake responds with [`CursorError::WouldBlock`] which drives the +//! queue down the async refill path. +//! - The async path then constructs a second reader with +//! `allow_blocking_io: true`; that reader parks on its first `peek` +//! (signalling the test that the snapshot is frozen) and resumes once +//! the test releases the latch. + +use std::sync::{Arc, Condvar, Mutex}; +use std::task::{Context, Poll, Waker}; + +use restate_clock::RoughTimestamp; +use restate_clock::time::MillisSinceEpoch; +use restate_storage_api::StorageError; +use restate_storage_api::vqueue_table::stats::EntryStatistics; +use restate_storage_api::vqueue_table::{ + CursorError, EntryId, EntryKey, EntryKind, EntryMetadata, EntryValue, Options, Status, + VQueueCursor, VQueueRunningCursor, VQueueStore, +}; +use restate_types::clock::UniqueTimestamp; +use restate_types::vqueues::VQueueId; + +use super::*; + +// ---------- helpers ---------- + +const BASE_RUN_AT_MS: u64 = 1_744_000_000_000; + +/// Builds an entry whose sort position is determined entirely by `seq` +/// (other `EntryKey` components are fixed). +fn entry_at_seq(seq: u64) -> (EntryKey, EntryValue) { + let run_at = RoughTimestamp::from_unix_millis_clamped(MillisSinceEpoch::new(BASE_RUN_AT_MS)); + let created_at = UniqueTimestamp::try_from(1_000u64 + seq).unwrap(); + let entry_id = EntryId::new(EntryKind::Invocation, [0u8; EntryId::REMAINDER_LEN]); + let key = EntryKey::new(false, run_at, seq, entry_id); + let stats = EntryStatistics::new(created_at, run_at); + let value = EntryValue { + status: Status::New, + metadata: EntryMetadata::default(), + stats, + }; + (key, value) +} + +fn test_qid(partition_key: u64) -> VQueueId { + VQueueId::custom(partition_key, "1") +} + +/// Polls the queue once and asserts it returned `Pending`. Used to kick +/// off an in-flight refill task so the test can interleave events while +/// the refill thread is parked at its gate. +fn poll_once_expect_pending( + queue: &mut Queue, + storage: &S, + skip: &UnconfirmedAssignments, + qid: &VQueueId, +) { + let mut cx = Context::from_waker(Waker::noop()); + match queue.poll_advance_if_needed(&mut cx, storage, skip, qid, false, false) { + Poll::Pending => {} + Poll::Ready(Ok(item)) => panic!("expected Pending, got Ready({item:?})"), + Poll::Ready(Err(e)) => panic!("expected Pending, got error: {e:?}"), + } +} + +/// Drives `poll_advance_if_needed` until it returns `Ready`. Yields between +/// Pending iterations so the `spawn_blocking` refill task can make progress. +async fn drive_until_ready( + queue: &mut Queue, + storage: &S, + skip: &UnconfirmedAssignments, + qid: &VQueueId, +) { + let mut cx = Context::from_waker(Waker::noop()); + loop { + match queue.poll_advance_if_needed(&mut cx, storage, skip, qid, false, false) { + Poll::Ready(Ok(_)) => return, + Poll::Ready(Err(e)) => panic!("queue error: {e:?}"), + Poll::Pending => tokio::time::sleep(std::time::Duration::from_millis(1)).await, + } + } +} + +/// Pops everything currently in the cache (no refill, no I/O) and returns +/// the keys in head-to-tail order. +fn drain_cache(queue: &mut Queue) -> Vec { + let mut keys = vec![]; + while let Some(QueueItem::Inbox { key, .. }) = queue.head() { + keys.push(*key); + queue.try_advance().unwrap(); + } + keys +} + +// ---------- fake VQueueStore ---------- + +/// In-memory storage with hooks to gate the refill thread at a precise +/// moment, so tests can deterministically interleave queue events against +/// an in-flight refill. +struct GatedStore { + /// Source of truth for what `new_inbox_reader` will snapshot. Sorted + /// ascending by `EntryKey`. + entries: Mutex>, + /// Latch released by the test when it wants the parked refill thread + /// to proceed past its first `peek`. + release: Arc<(Mutex, Condvar)>, + /// Signalled by the reader the moment it parks. The test waits on + /// this to know the snapshot is frozen and queue events can now be + /// safely interleaved. + parked: Arc<(Mutex, Condvar)>, +} + +impl GatedStore { + fn new(entries: Vec<(EntryKey, EntryValue)>) -> Self { + Self { + entries: Mutex::new(entries), + release: Arc::new((Mutex::new(false), Condvar::new())), + parked: Arc::new((Mutex::new(false), Condvar::new())), + } + } + + fn release_refill_thread(&self) { + let (lock, cv) = &*self.release; + *lock.lock().unwrap() = true; + cv.notify_all(); + } + + fn wait_until_parked(&self) { + let (lock, cv) = &*self.parked; + let mut parked = lock.lock().unwrap(); + while !*parked { + parked = cv.wait(parked).unwrap(); + } + } +} + +struct GatedReader { + /// Frozen at construction time (matches RocksDB snapshot semantics). + /// Sorted ascending by `EntryKey`. + snapshot: Vec<(EntryKey, EntryValue)>, + cursor: usize, + /// `true` for blocking readers (those constructed via the async refill + /// path with `allow_blocking_io: true`); they park at the gate on the + /// first `peek` so the test can pin the snapshot moment. Non-blocking + /// readers always return `WouldBlock` instead, driving the queue down + /// the async refill path. + blocking: bool, + release: Arc<(Mutex, Condvar)>, + parked: Arc<(Mutex, Condvar)>, + /// Whether this reader has parked yet. Each reader parks at most + /// once, on its first peek/key/value call. + has_parked: bool, +} + +impl GatedReader { + fn block_until_released(&mut self) { + if self.has_parked { + return; + } + self.has_parked = true; + // Tell the test we're parked. + let (lock, cv) = &*self.parked; + *lock.lock().unwrap() = true; + cv.notify_all(); + // Wait for the test to release us. + let (lock, cv) = &*self.release; + let mut released = lock.lock().unwrap(); + while !*released { + released = cv.wait(released).unwrap(); + } + } +} + +impl VQueueCursor for GatedReader { + fn seek_to_first(&mut self) { + self.cursor = 0; + } + + fn seek_after(&mut self, anchor: &EntryKey) { + self.cursor = self + .snapshot + .iter() + .position(|(k, _)| k > anchor) + .unwrap_or(self.snapshot.len()); + } + + fn advance(&mut self) { + if self.cursor < self.snapshot.len() { + self.cursor += 1; + } + } + + fn peek(&mut self) -> Result, CursorError> { + if !self.blocking { + return Err(CursorError::WouldBlock); + } + self.block_until_released(); + Ok(self.snapshot.get(self.cursor).cloned()) + } +} + +/// Stub for the running stage. Tests construct queues with +/// `num_running = 0`, so this is never actually exercised; it only exists +/// to satisfy the [`VQueueStore`] trait bound. +struct StubRunningReader; + +impl VQueueRunningCursor for StubRunningReader { + fn seek_to_first(&mut self) {} + fn peek(&mut self) -> Result, StorageError> { + Ok(None) + } + fn advance(&mut self) {} +} + +impl VQueueStore for GatedStore { + type RunningReader = StubRunningReader; + type InboxReader = GatedReader; + + fn new_run_reader(&self, _qid: &VQueueId) -> Self::RunningReader { + StubRunningReader + } + + fn new_inbox_reader(&self, _qid: &VQueueId, opts: Options) -> Self::InboxReader { + // Snapshot freezes here. + let snapshot = self.entries.lock().unwrap().clone(); + GatedReader { + snapshot, + cursor: 0, + blocking: opts.allow_blocking_io, + release: self.release.clone(), + parked: self.parked.clone(), + has_parked: false, + } + } +} + +// ---------- tests ---------- + +/// Sanity: a single in-flight refill with no concurrent events surfaces +/// every storage row in the cache, in `EntryKey` order. +#[restate_core::test] +async fn refill_without_overlay_activity() { + let entries: Vec<_> = (1..=5).map(entry_at_seq).collect(); + let storage = GatedStore::new(entries.clone()); + let qid = test_qid(1); + let mut queue: Queue = Queue::new(0, &storage, &qid); + let skip = UnconfirmedAssignments::new(); + + poll_once_expect_pending(&mut queue, &storage, &skip, &qid); + storage.wait_until_parked(); + storage.release_refill_thread(); + drive_until_ready(&mut queue, &storage, &skip, &qid).await; + + let drained = drain_cache(&mut queue); + assert_eq!( + drained, + entries.iter().map(|(k, _)| *k).collect::>(), + "cache should contain every storage row in order", + ); +} + +/// Tombstones in the overlay correctly suppress the matching storage row +/// during merge — no overflow happens here, so the overlay's information +/// is fully preserved. This is the "happy path" for the +/// commit-before-notify invariant. +#[restate_core::test] +async fn tombstone_in_overlay_suppresses_storage_row() { + let r_target = entry_at_seq(100); + let storage = GatedStore::new(vec![r_target.clone()]); + let qid = test_qid(2); + let mut queue: Queue = Queue::new(0, &storage, &qid); + let skip = UnconfirmedAssignments::new(); + + poll_once_expect_pending(&mut queue, &storage, &skip, &qid); + storage.wait_until_parked(); + // Storage commit (modelled here as the snapshot being frozen with + // r_target visible) lands first; the notify_removed lands second. + queue.remove(&r_target.0); + storage.release_refill_thread(); + drive_until_ready(&mut queue, &storage, &skip, &qid).await; + + let drained = drain_cache(&mut queue); + assert!( + !drained.contains(&r_target.0), + "deleted row should be suppressed by overlay tombstone, got: {drained:?}", + ); +} + +/// **Regression guard.** Reproduces the overlay-at-capacity scenario where +/// the back entry is a tombstone and a new add with `pos < overlay.len()` +/// would historically `pop_back` and silently drop that tombstone — letting +/// the (already-deleted) row leak into the cache. +/// +/// The bug is now fixed by the `horizon` mechanism in +/// `RefillTask::prepare_overlay_insert`: on overflow we shrink the horizon +/// (an exclusive upper bound) to the back's key instead of evicting, so any +/// merge row at or above the horizon is dropped and rediscovered on the +/// next refill via `seek_after`. Pre-fix, this test would expose the leak; +/// it is kept to guard against regressions in the overflow path. +/// +/// Layout at the moment of overflow: +/// +/// ```text +/// overlay[0] = Tombstone(seq=50) // pre-tombstone +/// overlay[1] = Tombstone(seq=100) // pre-tombstone +/// overlay[2..CAPACITY-1] = Add(seq=150..) // CAPACITY-3 adds +/// overlay[CAPACITY-1] = Tombstone(seq=500) // *r_target* +/// ``` +/// +/// The trigger add (seq=400) sorts at `pos = CAPACITY-1`. With the horizon +/// fix, the overflow path sets `horizon = 500` (the back's key) instead of +/// evicting the tombstone. The merge then drops `Left(seq500)` because +/// `seq500 >= horizon`, so the deleted row never enters the cache. +#[restate_core::test] +async fn tombstone_evicted_on_overlay_overflow_leaks_deleted_row() { + // Overlay layout requires CAPACITY >= 4 (2 front tombstones + at + // least one add + 1 back tombstone). Bail loudly if someone shrinks + // the cache below that. + const { assert!(INBOX_CACHE_CAPACITY >= 4) }; + + // Front-anchor tombstones: low seqs so they sort to the front of the + // overlay and don't influence the eviction we want to trigger. + let pre_tombstone_a = entry_at_seq(50); + let pre_tombstone_b = entry_at_seq(100); + + // The deleted row that the in-flight task's snapshot still sees. + let r_target = entry_at_seq(500); + + // Adds that fit between the front tombstones and r_target's + // tombstone. Together with the three tombstones the overlay reaches + // exactly CAPACITY. We use a contiguous seq range starting at 150. + let n_adds = INBOX_CACHE_CAPACITY - 3; + let pre_adds: Vec<_> = (150..150 + n_adds as u64).map(entry_at_seq).collect(); + + // The trigger: an add that sorts BEFORE r_target's tombstone but + // after every other overlay entry. With `pos < overlay.len()` and + // the overlay at cap, the overflow path sets the horizon to the + // back's key (seq=500) — sealing r_target from the merge instead of + // evicting its tombstone (which was the original bug). + let trigger_add = entry_at_seq(400); + + let storage = GatedStore::new(vec![r_target.clone()]); + let qid = test_qid(3); + let mut queue: Queue = Queue::new(0, &storage, &qid); + let skip = UnconfirmedAssignments::new(); + + // Kick off the in-flight refill so subsequent enqueue/remove events + // route through the overlay rather than the cache. + poll_once_expect_pending(&mut queue, &storage, &skip, &qid); + storage.wait_until_parked(); + + // Three tombstones; only Tombstone(r_target) matters for the bug. + queue.remove(&pre_tombstone_a.0); + queue.remove(&pre_tombstone_b.0); + queue.remove(&r_target.0); + + // Fill the overlay up to CAPACITY with the front-of-r_target adds. + for (k, v) in &pre_adds { + queue.enqueue(k, v); + } + + // Trigger the overflow path. With the fix, this sets the horizon + // and Tombstone(r_target) is preserved (the back becomes the new + // horizon, sealing r_target from the merge). + queue.enqueue(&trigger_add.0, &trigger_add.1); + + // Release the refill thread; it returns the snapshot ([r_target]). + storage.release_refill_thread(); + drive_until_ready(&mut queue, &storage, &skip, &qid).await; + + let drained = drain_cache(&mut queue); + + // Regression: r_target must NOT appear in the cache. The horizon + // mechanism seals it from the merge so the deleted row stays out + // even when the overlay would otherwise overflow. + assert!( + !drained.contains(&r_target.0), + "deleted row leaked into cache: {drained:?}", + ); +} + +/// `yield_entry` emits `RemovedFromInbox(k)` immediately followed by +/// `EnqueuedToInbox { key: k, .. }` when the modified key equals the +/// original (i.e. `set_run_at(None)` is a no-op). If a refill is in +/// flight and `k` is in the overlay's coverage zone, the overlay must +/// upgrade the tombstone to an `Add` rather than panicking — and the +/// merge result must include the re-enqueued row with its (possibly +/// updated) value. +#[restate_core::test] +async fn overlay_tombstone_then_add_resurrects_row() { + let target = entry_at_seq(100); + + let storage = GatedStore::new(vec![target.clone()]); + let qid = test_qid(4); + let mut queue: Queue = Queue::new(0, &storage, &qid); + let skip = UnconfirmedAssignments::new(); + + poll_once_expect_pending(&mut queue, &storage, &skip, &qid); + storage.wait_until_parked(); + + // Simulate the yield_entry sequence: tombstone followed by an + // enqueue of the same key with a new value. + queue.remove(&target.0); + let mut new_value = target.1.clone(); + new_value.status = Status::Scheduled; + queue.enqueue(&target.0, &new_value); + + storage.release_refill_thread(); + drive_until_ready(&mut queue, &storage, &skip, &qid).await; + + let drained = drain_cache(&mut queue); + assert_eq!( + drained, + vec![target.0], + "re-enqueued row should be present after tombstone→add upgrade", + ); +} diff --git a/crates/vqueues/src/scheduler/vqueue_state.rs b/crates/vqueues/src/scheduler/vqueue_state.rs index b83fc73696..2264ddf0cc 100644 --- a/crates/vqueues/src/scheduler/vqueue_state.rs +++ b/crates/vqueues/src/scheduler/vqueue_state.rs @@ -8,14 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::task::{Poll, ready}; use std::time::Duration; use enum_map::{Enum, EnumMap}; use metrics::counter; +use restate_storage_api::StorageError; use tokio::time::Instant; use restate_clock::RoughTimestamp; -use restate_storage_api::StorageError; use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueStore, stats::WaitStats}; use restate_types::vqueues::{EntryId, VQueueId}; use restate_worker_api::ResourceKind; @@ -228,9 +229,10 @@ impl VQueueState { qid: VQueueId, handle: VQueueHandle, meta: VQueueMetaLite, + storage: &S, num_running: u32, ) -> Self { - let queue = Queue::new(num_running); + let queue = Queue::new(num_running, storage, &qid); Self { handle, qid, @@ -263,7 +265,6 @@ impl VQueueState { pub fn try_pop( &mut self, cx: &mut std::task::Context<'_>, - storage: &S, resources: &mut ResourceManager, ) -> Result { let (inbox_head_key, inbox_head_value, is_running) = match self.queue.head() { @@ -291,8 +292,7 @@ impl VQueueState { key: *inbox_head_key, next_run_at: None, }; - self.queue - .advance(storage, &self.unconfirmed_assignments, &self.qid)?; + self.queue.try_advance()?; return Ok(Pop::Yield(action)); } @@ -313,8 +313,7 @@ impl VQueueState { wait_stats: self.head_stats.finalize(), }; - self.queue - .advance(storage, &self.unconfirmed_assignments, &self.qid)?; + self.queue.try_advance()?; Ok(Pop::Run(action)) } AcquireOutcome::BlockedOn(resource) => { @@ -335,11 +334,21 @@ impl VQueueState { && self.unconfirmed_assignments.is_empty() } - pub fn poll_eligibility(&mut self, storage: &S) -> Result { - self.queue - .advance_if_needed(storage, &self.unconfirmed_assignments, &self.qid)?; - - Ok(self.check_eligibility()) + pub fn poll_eligibility( + &mut self, + cx: &mut std::task::Context<'_>, + storage: &S, + ) -> Poll> { + ready!(self.queue.poll_advance_if_needed( + cx, + storage, + &self.unconfirmed_assignments, + &self.qid, + self.num_waiting_inbox() == 0, + false, + ))?; + + Poll::Ready(Ok(self.check_eligibility().as_compact())) } pub fn check_eligibility(&self) -> DetailedEligibility {