Skip to content

Commit 5a8e091

Browse files
committed
Implement prior commit replay and improve startup synchronization in Starfish.
Fix differences between Starfish and Mysticeti in handling of previously processed commits at startup by introducing a replay mechanism. Adjust `StarfishConsensusHandler` to manage commit ranges effectively and log replay progress.
1 parent 5b55be2 commit 5a8e091

File tree

3 files changed

+23
-9
lines changed

3 files changed

+23
-9
lines changed

crates/iota-core/src/consensus_handler.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ pub struct StarfishConsensusHandler {
517517

518518
impl StarfishConsensusHandler {
519519
pub fn new(
520+
last_processed_commit_at_startup: starfish_core::CommitIndex,
520521
mut consensus_handler: ConsensusHandler<CheckpointService>,
521522
mut receiver: UnboundedReceiver<starfish_core::CommittedSubDag>,
522523
commit_consumer_monitor: Arc<starfish_core::CommitConsumerMonitor>,
@@ -526,10 +527,14 @@ impl StarfishConsensusHandler {
526527
// backpressure.
527528
while let Some(consensus_output) = receiver.recv().await {
528529
let commit_index = consensus_output.commit_ref.index;
529-
consensus_handler
530-
.handle_consensus_output(consensus_output)
531-
.await;
532-
commit_consumer_monitor.set_highest_handled_commit(commit_index);
530+
if commit_index <= last_processed_commit_at_startup {
531+
consensus_handler.handle_prior_consensus_output(consensus_output);
532+
} else {
533+
consensus_handler
534+
.handle_consensus_output(consensus_output)
535+
.await;
536+
commit_consumer_monitor.set_highest_handled_commit(commit_index);
537+
}
533538
}
534539
});
535540
Self {

crates/iota-core/src/consensus_manager/starfish_manager.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,11 @@ impl ConsensusManagerTrait for StarfishManager {
139139
let (commit_sender, commit_receiver) = unbounded_channel("consensus_output");
140140

141141
let consensus_handler = consensus_handler_initializer.new_consensus_handler();
142-
let consumer = CommitConsumer::new(
143-
commit_sender,
144-
consensus_handler.last_processed_subdag_index() as CommitIndex,
145-
);
142+
143+
let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
144+
let last_processed_commit = consensus_handler.last_processed_subdag_index() as CommitIndex;
145+
let starting_commit = last_processed_commit.saturating_sub(num_prior_commits);
146+
let consumer = CommitConsumer::new(commit_sender, starting_commit);
146147
let monitor = consumer.monitor();
147148

148149
// If there is a previous consumer monitor, it indicates that the consensus
@@ -201,13 +202,20 @@ impl ConsensusManagerTrait for StarfishManager {
201202
self.client.set(client);
202203

203204
// spin up the new starfish consensus handler to listen for committed sub dags
204-
let handler = StarfishConsensusHandler::new(consensus_handler, commit_receiver, monitor);
205+
let handler = StarfishConsensusHandler::new(
206+
last_processed_commit,
207+
consensus_handler,
208+
commit_receiver,
209+
monitor,
210+
);
205211

206212
let mut consensus_handler = self.consensus_handler.lock().await;
207213
*consensus_handler = Some(handler);
208214

209215
// Wait until all locally available commits have been processed
216+
info!("replaying commits at startup");
210217
registered_authority.0.replay_complete().await;
218+
info!("Startup commit replay complete");
211219
}
212220

213221
async fn shutdown(&self) {

crates/iota-core/src/mysticeti_adapter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ impl ConsensusClient for LazyMysticetiClient {
9090
.iter()
9191
.map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail"))
9292
.collect::<Vec<_>>();
93+
9394
let (block_ref, status_waiter) = client
9495
.as_ref()
9596
.expect("Client should always be returned")

0 commit comments

Comments
 (0)