Skip to content

Commit 1661c23

Browse files
committed
[ENH][wal3]: add quorum_writer for parallel future coordination
Implement a quorum-based coordination mechanism that: - Runs futures in parallel and waits for a minimum count of Ok results - Starts a timeout after reaching the quorum threshold - Cancels remaining futures that exceed the timeout - Returns results in original order with None for cancelled futures This enables handling partial quorum failures where some writers may be slow or unresponsive, allowing the system to proceed once a quorum of successful writes is achieved while still attempting to maximize replication within a bounded time window. Co-authored-by: AI
1 parent 5ba5807 commit 1661c23

2 files changed

Lines changed: 336 additions & 0 deletions

File tree

rust/wal3/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod destroy;
1313
mod gc;
1414
mod interfaces;
1515
mod manifest;
16+
mod quorum_writer;
1617
mod reader;
1718
mod snapshot_cache;
1819
mod writer;
@@ -33,6 +34,7 @@ pub use interfaces::{
3334
pub use manifest::{
3435
unprefixed_snapshot_path, Manifest, ManifestAndETag, Snapshot, SnapshotPointer,
3536
};
37+
pub use quorum_writer::write_quorum;
3638
pub use reader::{checksum_parquet, scan_from_manifest, Limits, LogReader};
3739
pub use snapshot_cache::SnapshotCache;
3840
pub use writer::{LogWriter, MarkDirty};

rust/wal3/src/quorum_writer.rs

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
//! Imagine all the implications of writing your own quorum-based algorithm.
2+
//!
3+
//! The following problem would eventually dominate: How do you handle partial failure of the
4+
//! quorum writers? A down node that affirmatively fails is relatively easy, and naive mitigation
5+
//! of this will leave the system under-replicated.
6+
//!
7+
//! There's a need for a performant coordination mechanism that does the following:
8+
//! - Given as input a set of `futures` that return a Result.
9+
//! - Run all `futures` in parallel.
10+
//! - Collect the first `min_futures_to_wait_for` futures' results.
11+
//! - Start an N-second timer.
12+
//! - Try to collect the remaining `futures.len() - min_futures_to_wait_for` futures.
13+
//! - Stop on time-out and cancel remaining futures.
14+
15+
use std::future::Future;
16+
use std::pin::Pin;
17+
use std::time::Duration;
18+
19+
use futures::stream::FuturesUnordered;
20+
use futures::StreamExt;
21+
22+
/// Runs futures in parallel and waits for a quorum to complete.
23+
///
24+
/// This function executes all provided futures concurrently and:
25+
/// 1. Waits for at least `min_futures_to_wait_for` futures to complete successfully.
26+
/// 2. After reaching the minimum, starts a timer and attempts to collect remaining results.
27+
/// 3. Cancels any futures that haven't completed when the timeout expires.
28+
///
29+
/// Returns a vector of `Option<Result<S, E>>` where:
30+
/// - `Some(result)` indicates a future that completed (successfully or with error).
31+
/// - `None` indicates a future that was cancelled due to timeout.
32+
///
33+
/// The results are returned in the same order as the input futures.
34+
pub async fn write_quorum<S, E, F: Future<Output = Result<S, E>> + Send + 'static>(
35+
futures: Vec<F>,
36+
min_futures_to_wait_for: usize,
37+
timeout: Duration,
38+
) -> Vec<Option<Result<S, E>>>
39+
where
40+
S: Send + 'static,
41+
E: Send + 'static,
42+
{
43+
let num_futures = futures.len();
44+
45+
if num_futures == 0 {
46+
return Vec::new();
47+
}
48+
49+
let mut results: Vec<Option<Result<S, E>>> = (0..num_futures).map(|_| None).collect();
50+
51+
type IndexedFuture<S, E> = Pin<Box<dyn Future<Output = (usize, Result<S, E>)> + Send>>;
52+
let mut pending: FuturesUnordered<IndexedFuture<S, E>> = FuturesUnordered::new();
53+
54+
for (idx, fut) in futures.into_iter().enumerate() {
55+
pending.push(Box::pin(async move { (idx, fut.await) }));
56+
}
57+
58+
let mut ok_count = 0;
59+
60+
// Phase 1: Wait for the minimum number of Ok futures to complete.
61+
while ok_count < min_futures_to_wait_for {
62+
if let Some((idx, result)) = pending.next().await {
63+
if result.is_ok() {
64+
ok_count += 1;
65+
}
66+
results[idx] = Some(result);
67+
} else {
68+
// All futures have completed before reaching the minimum.
69+
break;
70+
}
71+
}
72+
73+
// Phase 2: Try to collect remaining futures within the timeout.
74+
if !pending.is_empty() {
75+
let deadline = tokio::time::sleep(timeout);
76+
tokio::pin!(deadline);
77+
78+
loop {
79+
tokio::select! {
80+
biased;
81+
82+
maybe_result = pending.next() => {
83+
match maybe_result {
84+
Some((idx, result)) => {
85+
results[idx] = Some(result);
86+
}
87+
None => {
88+
// All futures completed.
89+
break;
90+
}
91+
}
92+
}
93+
_ = &mut deadline => {
94+
// Timeout reached; remaining futures will be cancelled.
95+
break;
96+
}
97+
}
98+
}
99+
}
100+
101+
// Dropping pending cancels any remaining futures.
102+
results
103+
}
104+
105+
#[cfg(test)]
106+
mod tests {
107+
use std::time::Duration;
108+
109+
use futures::future::BoxFuture;
110+
111+
use super::write_quorum;
112+
113+
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
114+
115+
#[tokio::test]
116+
async fn empty_futures_returns_empty_vec() {
117+
let futures: Vec<BoxFuture<'static, Result<(), ()>>> = vec![];
118+
let results = write_quorum(futures, 0, TEST_TIMEOUT).await;
119+
assert!(results.is_empty());
120+
}
121+
122+
#[tokio::test]
123+
async fn all_futures_complete_immediately() {
124+
let futures: Vec<BoxFuture<'static, Result<i32, ()>>> = vec![
125+
Box::pin(async { Ok(1) }),
126+
Box::pin(async { Ok(2) }),
127+
Box::pin(async { Ok(3) }),
128+
];
129+
let results = write_quorum(futures, 2, TEST_TIMEOUT).await;
130+
assert_eq!(results.len(), 3);
131+
assert_eq!(results[0].as_ref().unwrap().as_ref().ok(), Some(&1));
132+
assert_eq!(results[1].as_ref().unwrap().as_ref().ok(), Some(&2));
133+
assert_eq!(results[2].as_ref().unwrap().as_ref().ok(), Some(&3));
134+
}
135+
136+
#[tokio::test]
137+
async fn mixed_success_and_error() {
138+
let futures: Vec<BoxFuture<'static, Result<i32, &'static str>>> = vec![
139+
Box::pin(async { Ok(1) }),
140+
Box::pin(async { Err("error") }),
141+
Box::pin(async { Ok(3) }),
142+
];
143+
let results = write_quorum(futures, 3, TEST_TIMEOUT).await;
144+
assert_eq!(results.len(), 3);
145+
assert!(results[0].as_ref().unwrap().is_ok());
146+
assert!(results[1].as_ref().unwrap().is_err());
147+
assert!(results[2].as_ref().unwrap().is_ok());
148+
}
149+
150+
#[tokio::test]
151+
async fn slow_future_cancelled_after_timeout() {
152+
let futures: Vec<BoxFuture<'static, Result<i32, ()>>> = vec![
153+
Box::pin(async {
154+
tokio::time::sleep(Duration::from_millis(10)).await;
155+
Ok(1)
156+
}),
157+
Box::pin(async {
158+
// This future takes much longer than the timeout.
159+
tokio::time::sleep(Duration::from_secs(60)).await;
160+
Ok(2)
161+
}),
162+
];
163+
let results = write_quorum(futures, 1, TEST_TIMEOUT).await;
164+
assert_eq!(results.len(), 2);
165+
// First future should complete.
166+
assert!(results[0].is_some());
167+
// Second future should be cancelled (None) because it exceeds the 5s timeout.
168+
assert!(results[1].is_none());
169+
}
170+
171+
#[tokio::test]
172+
async fn min_zero_starts_timeout_immediately() {
173+
let futures: Vec<BoxFuture<'static, Result<i32, ()>>> = vec![
174+
Box::pin(async {
175+
tokio::time::sleep(Duration::from_millis(100)).await;
176+
Ok(1)
177+
}),
178+
Box::pin(async {
179+
tokio::time::sleep(Duration::from_millis(200)).await;
180+
Ok(2)
181+
}),
182+
];
183+
let results = write_quorum(futures, 0, TEST_TIMEOUT).await;
184+
assert_eq!(results.len(), 2);
185+
// Both should complete within the 5s timeout.
186+
assert!(results[0].is_some());
187+
assert!(results[1].is_some());
188+
}
189+
190+
#[tokio::test]
191+
async fn preserves_order_of_results() {
192+
let futures: Vec<BoxFuture<'static, Result<i32, ()>>> = vec![
193+
Box::pin(async {
194+
tokio::time::sleep(Duration::from_millis(30)).await;
195+
Ok(100)
196+
}),
197+
Box::pin(async {
198+
tokio::time::sleep(Duration::from_millis(10)).await;
199+
Ok(200)
200+
}),
201+
Box::pin(async {
202+
tokio::time::sleep(Duration::from_millis(20)).await;
203+
Ok(300)
204+
}),
205+
];
206+
let results = write_quorum(futures, 3, TEST_TIMEOUT).await;
207+
assert_eq!(results.len(), 3);
208+
// Results should be in the original order, not completion order.
209+
assert_eq!(results[0].as_ref().unwrap().as_ref().ok(), Some(&100));
210+
assert_eq!(results[1].as_ref().unwrap().as_ref().ok(), Some(&200));
211+
assert_eq!(results[2].as_ref().unwrap().as_ref().ok(), Some(&300));
212+
}
213+
214+
#[tokio::test]
215+
async fn only_ok_responses_count_toward_min_futures() {
216+
// This test verifies that only Ok responses count toward min_futures_to_wait_for.
217+
// We have 3 futures: 2 fast errors and 1 slow success that takes longer than the
218+
// provided timeout (5 seconds).
219+
//
220+
// If errors counted toward the minimum (min=2), we'd hit the quorum after the 2
221+
// errors complete, then the 5-second timeout would start, and the slow success
222+
// would be cancelled.
223+
//
224+
// Since only Ok responses should count, the 2 errors shouldn't satisfy min=2,
225+
// and we must continue waiting in Phase 1 until the slow success completes.
226+
let futures: Vec<BoxFuture<'static, Result<i32, &'static str>>> = vec![
227+
Box::pin(async {
228+
tokio::time::sleep(Duration::from_millis(10)).await;
229+
Err("error1")
230+
}),
231+
Box::pin(async {
232+
tokio::time::sleep(Duration::from_millis(20)).await;
233+
Err("error2")
234+
}),
235+
Box::pin(async {
236+
// This takes longer than the timeout (5s).
237+
// If errors count toward min_futures_to_wait_for, this will be cancelled.
238+
tokio::time::sleep(Duration::from_secs(7)).await;
239+
Ok(42)
240+
}),
241+
];
242+
let results = write_quorum(futures, 2, TEST_TIMEOUT).await;
243+
assert_eq!(results.len(), 3);
244+
println!("results[0] = {:?}", results[0]);
245+
println!("results[1] = {:?}", results[1]);
246+
println!("results[2] = {:?}", results[2]);
247+
// The first two futures return errors.
248+
assert!(results[0].as_ref().unwrap().is_err());
249+
assert!(results[1].as_ref().unwrap().is_err());
250+
// The third future must complete with Ok(42) because only Ok responses count.
251+
// If this is None, it means the future was cancelled, indicating that errors
252+
// were incorrectly counted toward min_futures_to_wait_for.
253+
let third_result = results[2].as_ref().expect(
254+
"third future should complete; if None, errors are incorrectly counting toward quorum",
255+
);
256+
assert_eq!(
257+
third_result.as_ref().ok(),
258+
Some(&42),
259+
"only Ok responses should count toward min_futures_to_wait_for"
260+
);
261+
}
262+
263+
#[tokio::test]
264+
async fn five_futures_min_three_with_three_errors_does_not_block() {
265+
// This test verifies behavior when we have 5 futures with min_futures_to_wait_for=3,
266+
// and 3 of them error while 2 succeed.
267+
//
268+
// Since only Ok responses count toward the quorum, the 3 errors should not satisfy
269+
// min=3. However, once all 5 futures complete, we should get results for all of them
270+
// without blocking indefinitely.
271+
let futures: Vec<BoxFuture<'static, Result<i32, &'static str>>> = vec![
272+
Box::pin(async {
273+
tokio::time::sleep(Duration::from_millis(10)).await;
274+
Err("error1")
275+
}),
276+
Box::pin(async {
277+
tokio::time::sleep(Duration::from_millis(20)).await;
278+
Err("error2")
279+
}),
280+
Box::pin(async {
281+
tokio::time::sleep(Duration::from_millis(30)).await;
282+
Err("error3")
283+
}),
284+
Box::pin(async {
285+
tokio::time::sleep(Duration::from_millis(40)).await;
286+
Ok(1)
287+
}),
288+
Box::pin(async {
289+
tokio::time::sleep(Duration::from_millis(50)).await;
290+
Ok(2)
291+
}),
292+
];
293+
let results = write_quorum(futures, 3, TEST_TIMEOUT).await;
294+
assert_eq!(results.len(), 5);
295+
println!("results[0] = {:?}", results[0]);
296+
println!("results[1] = {:?}", results[1]);
297+
println!("results[2] = {:?}", results[2]);
298+
println!("results[3] = {:?}", results[3]);
299+
println!("results[4] = {:?}", results[4]);
300+
// All futures should have completed (no None values).
301+
assert!(results[0].is_some(), "future 0 should complete");
302+
assert!(results[1].is_some(), "future 1 should complete");
303+
assert!(results[2].is_some(), "future 2 should complete");
304+
assert!(results[3].is_some(), "future 3 should complete");
305+
assert!(results[4].is_some(), "future 4 should complete");
306+
// First three should be errors.
307+
assert!(results[0].as_ref().unwrap().is_err());
308+
assert!(results[1].as_ref().unwrap().is_err());
309+
assert!(results[2].as_ref().unwrap().is_err());
310+
// Last two should be Ok.
311+
assert_eq!(results[3].as_ref().unwrap().as_ref().ok(), Some(&1));
312+
assert_eq!(results[4].as_ref().unwrap().as_ref().ok(), Some(&2));
313+
}
314+
315+
#[tokio::test]
316+
async fn single_future_completes() {
317+
let futures: Vec<BoxFuture<'static, Result<i32, ()>>> = vec![Box::pin(async { Ok(42) })];
318+
let results = write_quorum(futures, 1, TEST_TIMEOUT).await;
319+
assert_eq!(results.len(), 1);
320+
assert_eq!(results[0].as_ref().unwrap().as_ref().ok(), Some(&42));
321+
}
322+
323+
#[tokio::test]
324+
async fn min_greater_than_futures_len_returns_all_available() {
325+
// When min_futures_to_wait_for exceeds the number of futures, the function should
326+
// still return all available results without hanging.
327+
let futures: Vec<BoxFuture<'static, Result<i32, ()>>> =
328+
vec![Box::pin(async { Ok(1) }), Box::pin(async { Ok(2) })];
329+
let results = write_quorum(futures, 5, TEST_TIMEOUT).await;
330+
assert_eq!(results.len(), 2);
331+
assert_eq!(results[0].as_ref().unwrap().as_ref().ok(), Some(&1));
332+
assert_eq!(results[1].as_ref().unwrap().as_ref().ok(), Some(&2));
333+
}
334+
}

0 commit comments

Comments
 (0)