Skip to content

[VQueues] Introducing inbox caching and read in batches#4674

Merged
AhmedSoliman merged 1 commit intomainfrom
pr4674
May 8, 2026
Merged

[VQueues] Introducing inbox caching and read in batches#4674
AhmedSoliman merged 1 commit intomainfrom
pr4674

Conversation

@AhmedSoliman
Copy link
Copy Markdown
Contributor

@AhmedSoliman AhmedSoliman commented Apr 29, 2026

Replaces the single-head cache with a sorted 24-entry per-queue cache.
Refills run via tokio::task::spawn_blocking when data isn't in the
block cache; in-flight notify_enqueued / notify_removed events are
buffered as an overlay (Add / Tombstone) and merged on completion.

On overlay overflow we set a horizon (exclusive upper bound) instead
of back-eviction: a popped tombstone could otherwise re-admit a
deleted row. Storage rows at or above the horizon are dropped from
the merge and rediscovered on the next refill.

Drops the tailing iterator and its workarounds. Splits VQueueCursor
into inbox (returns CursorError; WouldBlock under non-blocking opts)
and VQueueRunningCursor (sync).


Stack created with Sapling. Best reviewed with ReviewStack.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 29, 2026

Test Results

  8 files  + 1    8 suites  +1   4m 47s ⏱️ + 2m 0s
 50 tests + 3   50 ✅ + 3  0 💤 ±0  0 ❌ ±0 
218 runs  +18  218 ✅ +18  0 💤 ±0  0 ❌ ±0 

Results for commit 186893a. ± Comparison against base commit b128276.

This pull request removes 4 and adds 7 tests. Note that renamed tests count towards both.
dev.restate.sdktesting.tests.KafkaIngress ‑ handleEventInCounterService(URI, int, Client)
dev.restate.sdktesting.tests.KafkaIngress ‑ handleEventInEventHandler(URI, int, Client)
dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation ‑ inFlightInvocation(Client, URI)
dev.restate.sdktesting.tests.UpgradeWithNewInvocation ‑ executesNewInvocationWithLatestServiceRevisions(Client, URI)
dev.restate.sdktesting.tests.Combinators ‑ awakeableOrTimeoutUsingAwaitAny(Client)
dev.restate.sdktesting.tests.Combinators ‑ firstSuccessfulCompletedAwakeable(Client)
dev.restate.sdktesting.tests.Custom ‑ run(CustomTestConfig, URI, URI)[1]
dev.restate.sdktesting.tests.UserErrors ‑ failSeveralTimesWithMetadata(URI)
dev.restate.sdktesting.tests.UserErrors ‑ internalCallFailurePropagationWithMetadata(URI)
dev.restate.sdktesting.tests.UserErrors ‑ invokeTerminallyFailingCallWithMetadata(URI)
dev.restate.sdktesting.tests.UserErrors ‑ sideEffectWithTerminalErrorWithMetadata(URI)

♻️ This comment has been updated with latest results.

@AhmedSoliman
Copy link
Copy Markdown
Contributor Author

AhmedSoliman commented Apr 30, 2026

Note to reviewers. This is current being extended to run async refills. I will merge the two PRs into one when ready.

@tillrohrmann
Copy link
Copy Markdown
Contributor

Thanks for the heads up Ahmed. Will wait on this to happen for the review.

@tillrohrmann
Copy link
Copy Markdown
Contributor

Note to reviewers. This is current being extended to run async refills. I will merge the two PRs into one when ready.

@AhmedSoliman did this change already land in this PR?

@AhmedSoliman
Copy link
Copy Markdown
Contributor Author

Yes. This has been updated.

@AhmedSoliman
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b6fb18d9b1

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread crates/vqueues/src/scheduler/queue.rs Outdated
Comment thread crates/vqueues/src/scheduler/queue.rs
Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work @AhmedSoliman 🚀 The new asynchronous queue iterator looks really great. As far as I can tell, the logic looks really solid. So +1 for merging :-)

Out of curiosity: Did you measure the impact of this change compared to the previous attempt of using tailing iterators? I know that tailing iterators have correctness problems so that's why we needed to get rid of them.

Comment on lines +45 to +47
// We re-create this reader on every refill, so a fresh snapshot is what
// we want. A tailing iterator would see new writes but is unsafe across
// memtable flushes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add readopts.set_tailing(false) to give context to the comment.

Comment on lines +32 to +35
fn new_inbox_reader(&self, qid: &VQueueId, opts: Options) -> Self::InboxReader;
}

/// Iterator over vqueue entries
/// Iterator over "waiting inbox" vqueue entries
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With inbox is the inbox stage of the inbox meant, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will rename the waiting reader to be inbox reader but in a separate commit to not mess up with the PR stack.

Comment thread crates/storage-api/src/vqueue_table/store.rs Outdated
Comment on lines +220 to +222
RefillState::Standby { refill_anchor } => {
*refill_anchor = new_anchor;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the new anchor be larger than the existing one or can it only shrink?

Copy link
Copy Markdown
Contributor Author

@AhmedSoliman AhmedSoliman May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both. update_anchor can grow (refill task completed and discovered new keys), shrink (cache eviction in enqueue reset to items.back()), or stay equal. Method is intentionally unconstrained

Comment thread crates/vqueues/src/scheduler/queue.rs Outdated
// This branch handles when we don't have the item in cache.
//
// The removed item can be:
// - Cached
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we just say that it's not in the cache in the line above?

Comment thread crates/vqueues/src/scheduler/queue.rs Outdated
self.items.pop_back();
self.items.insert(pos, (key, value));
refill_anchor = self.items.back().map(|(k, _)| *k);
break;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it ok to break here if we insert the sorted item at pos < self.items.len()? When we start with the algorithm self.items contains items that are strictly smaller than what we read from RocksDB and what we have in the overlay, right? When merge sorting into self.items we probably will only push to the end of self.items as items as well as overlay are both sorted, right? So could we enforce the invariant that assert_eq!(pos, self.items.len())?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment thread crates/vqueues/src/scheduler/queue.rs Outdated
// Insert sorted in cache and ignore it if we already have it.
// If this item pushes us over the cache capcity, then we ignore it and reset
// the refill anchor to it.
let pos = match self.items.binary_search_by_key(&key, |&(k, _)| k) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a binary search needed here? Wouldn't we always push to the end of self.items what comes out of items.into_iter().merge_join_by(overlay)? Differently asked: What's the scenario were we wouldn't push to the end?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

binary_search redundant in poll_refill_task. Will rewrite.

Comment thread crates/vqueues/src/scheduler/queue.rs Outdated
let head_key = match queue.head() {
Some(QueueItem::Inbox { key, .. }) => *key,
_ => panic!("expected inbox head"),
// It's very important is that we must reset the task to standby
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// It's very important is that we must reset the task to standby
// It's very important that we must reset the task to standby

Comment on lines +309 to +341
/// **Bug demonstration.** When the overlay is at capacity and the back
/// entry is a tombstone, `push_added_item`'s `pop_back` silently drops
/// that tombstone. The merge then lets the (already-deleted) row into
/// the cache.
///
/// Layout at the moment of overflow:
///
/// ```text
/// overlay[0] = Tombstone(seq=50) // pre-tombstone
/// overlay[1] = Tombstone(seq=100) // pre-tombstone
/// overlay[2..CAPACITY-1] = Add(seq=150..) // CAPACITY-3 adds
/// overlay[CAPACITY-1] = Tombstone(seq=500) // *r_target*
/// ```
///
/// The trigger add (seq=400) sorts at `pos = CAPACITY-1`, which is
/// `< overlay.len() == CAPACITY`, so `push_added_item` evicts the back
/// (`Tombstone(500)`) and inserts the trigger. After this, the overlay
/// holds only `[T(50), T(100), Add(150..), Add(400)]` — the tombstone
/// for `r_target` is gone.
///
/// Storage is `[r_target=seq500]` (a single row that's been deleted but
/// is still in the in-flight task's snapshot, faithful to the race the
/// invariant doc on `RefillTask` describes). The merge produces:
///
/// ```text
/// [T(50), T(100), Add(150..), Add(400), Left(seq500)]
/// ```
///
/// With the tombstone evicted there's nothing to suppress `Left(seq500)`,
/// so the merge inserts it into the cache. The drain then sees `seq500`,
/// which is the bug.
#[restate_core::test]
async fn tombstone_evicted_on_overlay_overflow_leaks_deleted_row() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description and the test method read as if the bug still exists but I guess that the horizon filters out the problematic entry. Maybe add a clarification that this problem is gone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will clarify.

@AhmedSoliman
Copy link
Copy Markdown
Contributor Author

@tillrohrmann To answer your main question. No I've not run a comparative analysis with tailing iterators but we're steering away from it for correctness so we have no real option.

Replaces the single-head cache with a sorted 24-entry per-queue cache.
Refills run via tokio::task::spawn_blocking when data isn't in the
block cache; in-flight notify_enqueued / notify_removed events are
buffered as an overlay (Add / Tombstone) and merged on completion.

On overlay overflow we set a horizon (exclusive upper bound) instead
of back-eviction: a popped tombstone could otherwise re-admit a
deleted row. Storage rows at or above the horizon are dropped from
the merge and rediscovered on the next refill.

Drops the tailing iterator and its workarounds. Splits VQueueCursor
into inbox (returns CursorError; WouldBlock under non-blocking opts)
and VQueueRunningCursor (sync).
@AhmedSoliman AhmedSoliman merged commit 672f6d2 into main May 8, 2026
40 checks passed
@AhmedSoliman AhmedSoliman deleted the pr4674 branch May 8, 2026 13:13
@github-actions github-actions Bot locked and limited conversation to collaborators May 8, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants