Skip to content

Commit 63568f8

Browse files
authored
fix: send responses only after state machine commit (#19286)
Fix a consistency issue where responses were sent to clients before the state machine data was committed. Clients receiving early responses could immediately read the state machine and observe stale or missing data. Changes: - Collect pending responses during entry application - Send all responses only after `applier.commit()` succeeds - Add doc comment explaining the consistency requirement
1 parent 5c41a4d commit 63568f8

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

src/meta/raft-store/src/sm_v003/sm_v003.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,19 +242,31 @@ impl SMV003 {
242242
WriterAcquirer::new(self.write_semaphore.clone())
243243
}
244244

245+
/// Apply entries from the stream to the state machine.
246+
///
247+
/// Responses are only sent AFTER commit succeeds to ensure consistency.
248+
/// If we sent responses before commit, clients would consider their operations complete
249+
/// and might immediately read the state machine, but the data wouldn't be persisted yet.
250+
/// This could lead to clients observing stale or missing data.
245251
pub async fn apply_entries<S>(&self, mut entries: S) -> Result<(), io::Error>
246252
where S: Stream<Item = Result<EntryResponder, io::Error>> + Unpin {
247253
let mut applier = self.new_applier().await;
254+
let mut pending_responses = Vec::new();
248255

249256
while let Some(result) = entries.next().await {
250257
let (entry, responder) = result?;
251258
let applied = applier.apply(&entry).await?;
252259
if let Some(responder) = responder {
253-
responder.send(applied);
260+
pending_responses.push((responder, applied));
254261
}
255262
}
256263

257264
applier.commit().await?;
265+
266+
for (responder, applied) in pending_responses {
267+
responder.send(applied);
268+
}
269+
258270
Ok(())
259271
}
260272

0 commit comments

Comments
 (0)