Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Local Cluster Duplicate Switch Test #32614

Merged
merged 26 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3220,6 +3220,10 @@ impl Blockstore {
self.dead_slots_cf.delete(slot)
}

pub fn remove_slot_duplicate_proof(&self, slot: Slot) -> Result<()> {
self.duplicate_slots_cf.delete(slot)
}

pub fn store_duplicate_if_not_existing(
&self,
slot: Slot,
Expand All @@ -3233,6 +3237,15 @@ impl Blockstore {
}
}

pub fn get_first_duplicate_proof(&self) -> Option<(Slot, DuplicateSlotProof)> {
let mut iter = self
.db
.iter::<cf::DuplicateSlots>(IteratorMode::From(0, IteratorDirection::Forward))
.unwrap();
iter.next()
.map(|(slot, proof_bytes)| (slot, deserialize(&proof_bytes).unwrap()))
}

pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)
Expand Down
3 changes: 3 additions & 0 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
solana_client::thin_client::ThinClient,
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::socket::SocketAddrSpace,
std::{path::PathBuf, sync::Arc},
Expand Down Expand Up @@ -62,4 +63,6 @@ pub trait Cluster {
config: ValidatorConfig,
socket_addr_space: SocketAddrSpace,
);
fn set_entry_point(&mut self, entry_point_info: ContactInfo);
fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey);
}
23 changes: 21 additions & 2 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use {
},
solana_gossip::{
cluster_info::Node,
contact_info::{ContactInfo, LegacyContactInfo},
contact_info::{ContactInfo, LegacyContactInfo, Protocol},
gossip_service::discover_cluster,
},
solana_ledger::create_new_tmp_ledger,
solana_ledger::{create_new_tmp_ledger, shred::Shred},
solana_runtime::{
genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
Expand Down Expand Up @@ -57,6 +57,7 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
net::UdpSocket,
path::{Path, PathBuf},
sync::{Arc, RwLock},
},
Expand Down Expand Up @@ -864,6 +865,10 @@ impl Cluster for LocalCluster {
(node, entry_point_info)
}

fn set_entry_point(&mut self, entry_point_info: ContactInfo) {
self.entry_point_info = entry_point_info;
}

fn restart_node(
&mut self,
pubkey: &Pubkey,
Expand Down Expand Up @@ -934,6 +939,20 @@ impl Cluster for LocalCluster {
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> {
self.validators.get(pubkey).map(|v| &v.info.contact_info)
}

fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey) {
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let validator_tvu = self
.get_contact_info(validator_key)
.unwrap()
.tvu(Protocol::UDP)
.unwrap();
for shred in dup_shreds {
send_socket
.send_to(shred.payload().as_ref(), validator_tvu)
.unwrap();
}
}
}

impl Drop for LocalCluster {
Expand Down
99 changes: 67 additions & 32 deletions local-cluster/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ pub fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Tower> {
Tower::restore(&file_tower_storage, node_pubkey).ok()
}

pub fn remove_tower_if_exists(tower_path: &Path, node_pubkey: &Pubkey) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
let filename = file_tower_storage.filename(node_pubkey);
if filename.exists() {
fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap();
}
}

pub fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap();
Expand Down Expand Up @@ -120,17 +128,18 @@ pub fn purge_slots_with_count(blockstore: &Blockstore, start_slot: Slot, slot_co
pub fn wait_for_last_vote_in_tower_to_land_in_ledger(
ledger_path: &Path,
node_pubkey: &Pubkey,
) -> Slot {
let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap();
loop {
// We reopen in a loop to make sure we get updates
let blockstore = open_blockstore(ledger_path);
if blockstore.is_full(last_vote) {
break;
) -> Option<Slot> {
last_vote_in_tower(ledger_path, node_pubkey).map(|(last_vote, _)| {
loop {
// We reopen in a loop to make sure we get updates
let blockstore = open_blockstore(ledger_path);
if blockstore.is_full(last_vote) {
break;
}
sleep(Duration::from_millis(100));
}
sleep(Duration::from_millis(100));
}
last_vote
last_vote
})
}

pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) {
Expand Down Expand Up @@ -390,40 +399,66 @@ pub fn run_cluster_partition<C>(
on_partition_resolved(&mut cluster, &mut context);
}

pub struct ValidatorTestConfig {
pub validator_keypair: Arc<Keypair>,
pub validator_config: ValidatorConfig,
pub in_genesis: bool,
}

pub fn test_faulty_node(
faulty_node_type: BroadcastStageType,
node_stakes: Vec<u64>,
validator_test_configs: Option<Vec<ValidatorTestConfig>>,
custom_leader_schedule: Option<FixedSchedule>,
) -> (LocalCluster, Vec<Arc<Keypair>>) {
solana_logger::setup_with_default("solana_local_cluster=info");
let num_nodes = node_stakes.len();
let mut validator_keys = Vec::with_capacity(num_nodes);
validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true));
let validator_keys = validator_test_configs
.as_ref()
.map(|configs| {
configs
.iter()
.map(|config| (config.validator_keypair.clone(), config.in_genesis))
.collect()
})
.unwrap_or_else(|| {
let mut validator_keys = Vec::with_capacity(num_nodes);
validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true));
validator_keys
});

assert_eq!(node_stakes.len(), num_nodes);
assert_eq!(validator_keys.len(), num_nodes);

// Use a fixed leader schedule so that only the faulty node gets leader slots.
let validator_to_slots = vec![(
validator_keys[0].0.as_ref().pubkey(),
solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize,
)];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
let fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
let fixed_leader_schedule = custom_leader_schedule.unwrap_or_else(|| {
// Use a fixed leader schedule so that only the faulty node gets leader slots.
let validator_to_slots = vec![(
validator_keys[0].0.as_ref().pubkey(),
solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize,
)];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
}
});

let error_validator_config = ValidatorConfig {
broadcast_stage_type: faulty_node_type,
fixed_leader_schedule: fixed_leader_schedule.clone(),
..ValidatorConfig::default_for_test()
};
let mut validator_configs = Vec::with_capacity(num_nodes);
let mut validator_configs = validator_test_configs
.map(|configs| {
configs
.into_iter()
.map(|config| config.validator_config)
.collect()
})
.unwrap_or_else(|| {
let mut configs = Vec::with_capacity(num_nodes);
configs.resize_with(num_nodes, ValidatorConfig::default_for_test);
configs
});

// First validator is the bootstrap leader with the malicious broadcast logic.
validator_configs.push(error_validator_config);
validator_configs.resize_with(num_nodes, || ValidatorConfig {
fixed_leader_schedule: fixed_leader_schedule.clone(),
..ValidatorConfig::default_for_test()
});
validator_configs[0].broadcast_stage_type = faulty_node_type;
for config in &mut validator_configs {
config.fixed_leader_schedule = Some(fixed_leader_schedule.clone());
}

let mut cluster_config = ClusterConfig {
cluster_lamports: 10_000,
Expand Down
Loading