Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): resubmit gc-transaction #5707

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 169 additions & 7 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ impl Core {
.with_label_values(&["Core::try_commit"])
.start_timer();

let mut committed_subdags = Vec::new();
let mut committed_sub_dags = Vec::new();
// TODO: Add optimization to abort early without quorum for a round.
loop {
// LeaderSchedule has a limit to how many sequenced leaders can be committed
Expand Down Expand Up @@ -652,10 +652,21 @@ impl Core {
// Try to unsuspend blocks if gc_round has advanced.
self.block_manager
.try_unsuspend_blocks_for_latest_gc_round();
committed_subdags.extend(subdags);
committed_sub_dags.extend(subdags);
}

Ok(committed_subdags)
// Notify about our own committed blocks
let committed_block_refs = committed_sub_dags
.iter()
.flat_map(|sub_dag| sub_dag.blocks.iter())
.filter_map(|block| {
(block.author() == self.context.own_index).then_some(block.reference())
})
.collect::<Vec<_>>();
self.transaction_consumer
.notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());

Ok(committed_sub_dags)
}

pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
Expand Down Expand Up @@ -1037,8 +1048,10 @@ mod test {
use std::{collections::BTreeSet, time::Duration};

use consensus_config::{AuthorityIndex, Parameters};
use futures::{StreamExt, stream::FuturesUnordered};
use iota_metrics::monitored_mpsc::unbounded_channel;
use iota_protocol_config::ProtocolConfig;
use rstest::rstest;
use tokio::time::sleep;

use super::*;
Expand All @@ -1050,7 +1063,8 @@ mod test {
leader_scoring::ReputationScores,
storage::{Store, WriteBatch, mem_store::MemStore},
test_dag_builder::DagBuilder,
transaction::TransactionClient,
test_dag_parser::parse_dag,
transaction::{BlockStatus, TransactionClient},
};

/// Recover Core and continue proposing from the last round which forms a
Expand All @@ -1063,6 +1077,7 @@ mod test {
let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut block_status_subscriptions = FuturesUnordered::new();

// Create test blocks for all the authorities for 4 rounds and populate them in
// store
Expand All @@ -1077,6 +1092,14 @@ mod test {
.build(),
);

// If it's round 1, that one will be committed later on, and it's our "own"
// block, then subscribe to listen for the block status.
if round == 1 && index == context.own_index {
let subscription =
transaction_consumer.subscribe_for_block_status_testing(block.reference());
block_status_subscriptions.push(subscription);
}

this_round_blocks.push(block);
}
all_blocks.extend(this_round_blocks.clone());
Expand Down Expand Up @@ -1117,7 +1140,7 @@ mod test {
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let mut core = Core::new(
let _core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
Expand Down Expand Up @@ -1148,8 +1171,6 @@ mod test {
assert_eq!(ancestor.round, 4);
}

// Run commit rule.
core.try_commit().ok();
let last_commit = store
.read_last_commit()
.unwrap()
Expand All @@ -1162,6 +1183,13 @@ mod test {
assert_eq!(dag_state.read().last_commit_index(), 2);
let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
assert_eq!(all_stored_commits.len(), 2);

// And ensure that our "own" block 1 sent to TransactionConsumer as notification
// alongside with gc_round
while let Some(result) = block_status_subscriptions.next().await {
let status = result.unwrap();
assert!(matches!(status, BlockStatus::Sequenced(_)));
}
}

/// Recover Core and continue proposing when having a partial last round
Expand Down Expand Up @@ -1489,6 +1517,140 @@ mod test {
assert_eq!(dag_state.read().last_commit_index(), 0);
}

#[rstest]
#[tokio::test]
async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
telemetry_subscribers::init_for_testing();
let (mut context, mut key_pairs) = Context::new_for_test(4);

if gc_depth > 0 {
context
.protocol_config
.set_consensus_gc_depth_for_testing(gc_depth);
}

let context = Arc::new(context);

let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut block_status_subscriptions = FuturesUnordered::new();

let dag_str = "DAG {
Round 0 : { 4 },
Round 1 : { * },
Round 2 : { * },
Round 3 : {
A -> [*],
B -> [-A2],
C -> [-A2],
D -> [-A2],
},
Round 4 : {
B -> [-A3],
C -> [-A3],
D -> [-A3],
},
Round 5 : {
A -> [A3, B4, C4, D4]
B -> [*],
C -> [*],
D -> [*],
},
Round 6 : { * },
Round 7 : { * },
Round 8 : { * },
}";

let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
dag_builder.print();

// Subscribe to all created "own" blocks. We know that for our node (A) we'll be
// able to commit up to round 5.
for block in dag_builder.blocks(1..=5) {
if block.author() == context.own_index {
let subscription =
transaction_consumer.subscribe_for_block_status_testing(block.reference());
block_status_subscriptions.push(subscription);
}
}

// write them in store
store
.write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
.expect("Storage error");

// create dag state after all blocks have been written to store
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(
context.clone(),
dag_state.clone(),
Arc::new(NoopBlockVerifier),
);
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));

let (sender, _receiver) = unbounded_channel("consensus_output");
let commit_consumer = CommitConsumer::new(sender, 0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
store.clone(),
leader_schedule.clone(),
);

// Check no commits have been persisted to dag_state or store.
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);

// Now spin up core
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let _block_receiver = signal_receivers.block_broadcast_receiver();
let _core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
);

let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");

assert_eq!(last_commit.index(), 5);

while let Some(result) = block_status_subscriptions.next().await {
let status = result.unwrap();

// If gc is enabled, then we expect some blocks to be garbage collected.
if gc_depth > 0 {
match status {
BlockStatus::Sequenced(block_ref) => {
assert!(block_ref.round == 1 || block_ref.round == 5);
}
BlockStatus::GarbageCollected(block_ref) => {
assert!(block_ref.round == 2 || block_ref.round == 3);
}
}
} else {
// otherwise all of them should be committed
assert!(matches!(status, BlockStatus::Sequenced(_)));
}
}
}

#[tokio::test]
async fn test_core_set_min_propose_round() {
telemetry_subscribers::init_for_testing();
Expand Down
6 changes: 4 additions & 2 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ mod test_dag_parser;

/// Exported consensus API.
pub use authority_node::ConsensusAuthority;
pub use block::{BlockAPI, Round};
pub use block::{BlockAPI, BlockRef, Round};
/// Exported API for testing.
pub use block::{TestBlock, Transaction, VerifiedBlock};
pub use commit::{CommitDigest, CommitIndex, CommitRef, CommittedSubDag};
pub use commit_consumer::{CommitConsumer, CommitConsumerMonitor};
pub use transaction::{ClientError, TransactionClient, TransactionVerifier, ValidationError};
pub use transaction::{
BlockStatus, ClientError, TransactionClient, TransactionVerifier, ValidationError,
};
20 changes: 0 additions & 20 deletions consensus/core/src/test_dag_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,26 +346,6 @@ impl DagBuilder {
blocks
}

#[allow(dead_code)]
pub(crate) fn get_blocks(&self, block_refs: &[BlockRef]) -> Vec<VerifiedBlock> {
let mut blocks = vec![None; block_refs.len()];

for (index, block_ref) in block_refs.iter().enumerate() {
if block_ref.round == 0 {
if let Some(block) = self.genesis.get(block_ref) {
blocks[index] = Some(block.clone());
}
continue;
}
if let Some(block) = self.blocks.get(block_ref) {
blocks[index] = Some(block.clone());
continue;
}
}

blocks.into_iter().map(|x| x.unwrap()).collect()
}

pub(crate) fn genesis_block_refs(&self) -> Vec<BlockRef> {
self.genesis.keys().cloned().collect()
}
Expand Down
Loading
Loading