refactor(sinktools): use pin_project in LazySinkSource#2579
refactor(sinktools): use pin_project in LazySinkSource#2579MingweiSamuel wants to merge 3 commits intomainfrom
pin_project in LazySinkSource#2579Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors sinktools::LazySinkSource to use pin_project with an internal pinned state machine and replaces the previous multi-waker approach with a dual AtomicWaker setup, while updating/expanding async unit tests around initialization driving.
Changes:
- Refactor
LazySinkSourceto a pinnedSharedStateenum usingpin_project_lite(removing the oldRc<RefCell<...>>split-halves implementation). - Replace
MultiWaker(mutex + vec) withDualWakerusing twoAtomicWakers (sink vs stream) and a shared wake fanout. - Update tests and add Tokio
timedev-dependency feature to support timing-based assertions.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
sinktools/src/lazy_sink_source.rs |
Reworks internal state/pinning and waker handling; updates and adds tests validating sink/stream-driven initialization. |
sinktools/Cargo.toml |
Enables Tokio time feature for dev-dependencies to support updated tests. |
Comments suppressed due to low confidence (1)
sinktools/src/lazy_sink_source.rs:229
poll_nextreturnsPoll::Ready(None)when the initialization future resolves toErr, but it leaves the state inThunkulating. If the stream is polled again after yieldingNone(whichStreamcontract allows), this will re-poll a completed future and may panic or otherwise violate the expectation that the stream remains terminated. Consider transitioning to a terminal state on init failure (e.g., anInitFailed/Terminatedvariant) so subsequentpoll_nextcalls reliably returnReady(None)without polling the init future again.
Poll::Ready(Err(_)) => {
return Poll::Ready(None);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tokio::time::sleep(std::time::Duration::from_millis(10)).await; | ||
|
|
There was a problem hiding this comment.
This test now relies on a fixed wall-clock sleep (sleep(Duration::from_millis(10))) to wait for task wakeups/propagation, which can be flaky under slow or loaded CI runners (and unnecessarily slows fast runs). Prefer an event-driven wait (e.g., loop with yield_now() until the condition is true with an overall timeout, or a oneshot/Notify that signals when initialization completes) so the test is deterministic.
c69bfa7 to
74e307b
Compare
Deploying hydro with
|
| Latest commit: |
74e307b
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://2499bde7.hydroflow.pages.dev |
| Branch Preview URL: | https://lazy-2.hydroflow.pages.dev |
Stack: #2556
Kiro-generated, requires close review