Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Bullshark Fallback #30

Open
wants to merge 7 commits into
base: bullshark
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 314 additions & 7 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright(C) Facebook, Inc. and its affiliates.
use config::{Committee, Stake};
use core::panic;
use crypto::Hash as _;
use crypto::{Digest, PublicKey};
use log::{debug, info, log_enabled, warn};
use primary::{Certificate, Round};
use std::cmp::max;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use tokio::sync::mpsc::{Receiver, Sender};

#[cfg(test)]
Expand All @@ -25,6 +26,11 @@ struct State {
/// Keeps the latest committed certificate (and its parents) for every authority. Anything older
/// must be regularly cleaned up through the function `update`.
dag: Dag,
// Keeps track of which parties are in steady state waves
ss_validator_sets: HashMap<u64, BTreeSet<PublicKey>>,

// Keeps track of which parties are in fallback waves
fb_validator_sets: HashMap<u64, BTreeSet<PublicKey>>,
}

impl State {
Expand All @@ -34,10 +40,16 @@ impl State {
.map(|x| (x.origin(), (x.digest(), x)))
.collect::<HashMap<_, _>>();

let mut ss_sets = HashMap::new();
let ss_validators = genesis.iter().map(|(x, (_, _))| *x).collect();
ss_sets.insert(1, ss_validators);

Self {
last_committed_round: 0,
last_committed: genesis.iter().map(|(x, (_, y))| (*x, y.round())).collect(),
dag: [(0, genesis)].iter().cloned().collect(),
ss_validator_sets: ss_sets,
fb_validator_sets: HashMap::new(),
}
}

Expand Down Expand Up @@ -110,14 +122,13 @@ impl Consensus {
let round = certificate.round();

// Add the new certificate to the local storage.
state
.dag
.entry(round)
.or_insert_with(HashMap::new)
.insert(certificate.origin(), (certificate.digest(), certificate));
state.dag.entry(round).or_insert_with(HashMap::new).insert(
certificate.origin(),
(certificate.digest(), certificate.clone()),
);

// Try to order the dag to commit. Start from the previous round and check if it is a leader round.
let r = round - 1;
/*let r = round - 1;

// We only elect leaders for even round numbers.
if r % 2 != 0 || r < 2 {
Expand All @@ -144,6 +155,7 @@ impl Consensus {
.map(|(_, x)| self.committee.stake(&x.origin()))
.sum();


// If it is the case, we can commit the leader. But first, we need to recursively go back to
// the last committed leader, and commit all preceding leaders in the right order. Committing
// a leader block means committing all its dependencies.
Expand All @@ -164,6 +176,40 @@ impl Consensus {
// Add the certificate to the sequence.
sequence.push(x);
}
}*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code


let leader = match self.update_validator_mode(&certificate, &mut state) {
Some(x) => x.clone(),
None => continue,
};

let r = round - 1;

// Elect leaders every even round
if r % 2 != 0 || r < 2 {
continue;
}

// Get the certificate's digest of the leader. If we already ordered this leader, there is nothing to do.
let leader_round = r - 1;

if leader_round <= state.last_committed_round {
continue;
}

// Get an ordered list of past leaders that are linked to the current leader.
debug!("Leader {:?} has enough support", leader);
//println!("Committed leader {}", &leader.origin());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code

let mut sequence = Vec::new();
for leader in self.order_wave_leaders(&leader, &mut state).iter().rev() {
// Starting from the oldest leader, flatten the sub-dag referenced by the leader.
for x in self.order_dag(leader, &state) {
// Update and clean up internal state.
state.update(&x, self.gc_depth);

// Add the certificate to the sequence.
sequence.push(x);
}
}

// Log the latest committed round of every authority (for debug).
Expand Down Expand Up @@ -196,6 +242,168 @@ impl Consensus {
}
}

// Updates the mode of a certificate if necessary
fn update_validator_mode(
&self,
certificate: &Certificate,
state: &mut State,
) -> Option<Certificate> {
let ss_wave = ((certificate.round() as f64) / 2.0).ceil() as u64;
let fb_wave = ((certificate.round() as f64) / 4.0).ceil() as u64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid float arithmetic, we should be able to work with u64

//println!("fbwave {}", fb_wave);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code


// If the mode of a certificate is already determined to be steady state then return
if state
.ss_validator_sets
.entry(ss_wave)
.or_insert(BTreeSet::new())
.contains(&certificate.origin())
{
//println!("In the steady state {}", &certificate.origin());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code

return None;
}

// If the mode of the certificate is already determined to be fallback then return
if state
.fb_validator_sets
.entry(fb_wave)
.or_insert(BTreeSet::new())
.contains(&certificate.origin())
{
//println!("In the fallback {}", &certificate.origin());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code


return None;
}

// If certificate was in the previous steady state wave and there was a commit, then it is
// in the steady state
let ss_leader = self.try_steady_commit(&certificate, ss_wave - 1, state);
if state
.ss_validator_sets
.entry(ss_wave - 1)
.or_insert(BTreeSet::new())
.contains(&certificate.origin())
&& ss_leader.is_some()
{
state
.ss_validator_sets
.entry(ss_wave)
.or_insert(BTreeSet::new())
.insert(certificate.origin());
return ss_leader;
}

// If the certificate was in the previous fallback wave and there was a commit, then it is
// in the fallback
let fb_leader = self.try_fallback_commit(&certificate, max(fb_wave - 1, 1), state);

if state
.fb_validator_sets
.entry(max(fb_wave - 1, 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume fallback waves start from 1, so I did the max in case fb_wave - 1 = 0. I will change so this explicit check is done beforehand.

.or_insert(BTreeSet::new())
.contains(&certificate.origin())
&& fb_leader.is_some()
{
state
.ss_validator_sets
.entry(ss_wave)
.or_insert(BTreeSet::new())
.insert(certificate.origin());
return fb_leader;
}

// Otherwise the certificate is in fallback mode
state
.fb_validator_sets
.entry(fb_wave)
.or_insert(BTreeSet::new())
.insert(certificate.origin());
//println!("fb_size round {} wave {} size {}", certificate.round(), fb_wave, state.fb_validator_sets.entry(fb_wave).or_insert(BTreeSet::new()).len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code


return None;
}

// Checks whether there is a steady state wave commit
fn try_steady_commit(
&self,
certificate: &Certificate,
ss_wave: u64,
state: &mut State,
) -> Option<Certificate> {
let dag = &state.dag;

// Get the steady state leader of the current ss_wave
//println!("sswave {}", ss_wave);
let ss_leader_round = 2 * ss_wave - 1;
let (_, leader) = match self.leader(ss_leader_round, &state.dag) {
Some(x) => x,
None => return None,
};

let ss_sets = state
.ss_validator_sets
.entry(ss_wave)
.or_insert(BTreeSet::new());

// Find the potential votes of certificates in steady state mode
let stake: Stake = state
.dag
.get(&(certificate.round() - 1))
.expect("Should have previous round certificates")
.values()
.filter(|(d, x)| certificate.header.parents.contains(d) && self.linked(x, leader, dag))
.filter(|(_, x)| ss_sets.contains(&x.origin()))
.map(|(_, x)| self.committee.stake(&x.origin()))
.sum();
//println!("ss stake {} round {} sswave {} size {}", stake, certificate.round(), ss_wave, ss_sets.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code

// Commit if there is at least 2f+1 steady state votes
if stake >= self.committee.quorum_threshold() {
return Some(leader.clone());
}
return None;
}

// Checks whether there is a fallback wave commit
fn try_fallback_commit(
&self,
certificate: &Certificate,
fb_wave: u64,
state: &mut State,
) -> Option<Certificate> {
let dag = &state.dag;

// Get the current fallback leader of the current fb_wave
let fb_leader_round = 4 * fb_wave - 3;
let (_, leader) = match self.fb_leader(fb_leader_round, &state.dag) {
Some(x) => x,
None => return None,
};

let fb_sets = state
.fb_validator_sets
.entry(fb_wave)
.or_insert(BTreeSet::new());
//println!("commit round {} fbwave {} size {}", certificate.round(), fb_wave, fb_sets.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code


// Find the potential votes of certificates in fallback mode
let stake: Stake = state
.dag
.get(&(certificate.round() - 1))
.expect("Should have previous round certificates")
.values()
.filter(|(d, x)| certificate.header.parents.contains(d) && self.linked(x, leader, dag))
.filter(|(_, x)| fb_sets.contains(&x.origin()))
.map(|(_, x)| self.committee.stake(&x.origin()))
.sum();

//println!("fb stake {}", stake);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code

// Commit if there is at least 2f+1 fallback votes
if stake >= self.committee.quorum_threshold() {
return Some(leader.clone());
}
return None;
}

/// Returns the certificate (and the certificate's digest) originated by the leader of the
/// specified round (if any).
fn leader<'a>(&self, round: Round, dag: &'a Dag) -> Option<&'a (Digest, Certificate)> {
Expand All @@ -214,6 +422,105 @@ impl Consensus {
dag.get(&round).map(|x| x.get(&leader)).flatten()
}

/// Returns the certificate (and the certificate's digest) originated by the fallback leader of the
/// specified round (if any).
fn fb_leader<'a>(&self, round: Round, dag: &'a Dag) -> Option<&'a (Digest, Certificate)> {
// TODO: We should elect the leader of round r-2 using the common coin revealed at round r.
// At this stage, we are guaranteed to have 2f+1 certificates from round r (which is enough to
// compute the coin). We currently just use round-robin.
#[cfg(test)]
let seed = 1;
#[cfg(not(test))]
let seed = round + 1;

// Elect the leader.
let leader = self.committee.leader(seed as usize);

// Return its certificate and the certificate's digest.
dag.get(&round).map(|x| x.get(&leader)).flatten()
}

/// Order the past leaders that we didn't already commit.
fn order_wave_leaders(&self, leader: &Certificate, state: &mut State) -> Vec<Certificate> {
let mut to_commit = vec![leader.clone()];
let mut leader = leader;
let dag = &state.dag;

for r in (state.last_committed_round + 2..leader.round())
.rev()
.step_by(2)
{
// Get the current steady state wave number
let wave = ((r as f64) / 2.0).ceil() as u64;

let ss_sets = state
.ss_validator_sets
.entry(wave)
.or_insert(BTreeSet::new());

// Get the certificate proposed by the previous leader.
let (_, prev_ss_leader) = match self.leader(r, &state.dag) {
Some(x) => x,
None => continue,
};

let ss_stake: Stake = state
.dag
.get(&(2 * wave))
.expect("Should have previous round certificates")
.values()
.filter(|(d, x)| {
leader.header.parents.contains(d) && self.linked(x, prev_ss_leader, dag)
})
.filter(|(_, x)| ss_sets.contains(&x.origin()))
.map(|(_, x)| self.committee.stake(&x.origin()))
.sum();

let fb_stake: Stake;
let fb_sets = state
.fb_validator_sets
.entry(wave / 2)
.or_insert(BTreeSet::new());

if wave % 2 == 0 {
// Get the certificate proposed by the previous leader.
let (_, prev_fb_leader) = match self.fb_leader(r - 2, &state.dag) {
Some(x) => x,
None => continue,
};

fb_stake = state
.dag
.get(&(2 * wave))
.expect("Should have previous round certificates")
.values()
.filter(|(d, x)| {
leader.header.parents.contains(d) && self.linked(x, prev_fb_leader, dag)
})
.filter(|(_, x)| fb_sets.contains(&x.origin()))
.map(|(_, x)| self.committee.stake(&x.origin()))
.sum();

if ss_stake < self.committee.validity_threshold()
&& fb_stake >= self.committee.validity_threshold()
{
to_commit.push(prev_fb_leader.clone());
leader = prev_fb_leader;
}
} else {
fb_stake = 0;
}

if ss_stake >= self.committee.validity_threshold()
&& fb_stake < self.committee.validity_threshold()
{
to_commit.push(prev_ss_leader.clone());
leader = prev_ss_leader;
}
}
to_commit
}

/// Order the past leaders that we didn't already commit.
fn order_leaders(&self, leader: &Certificate, state: &State) -> Vec<Certificate> {
let mut to_commit = vec![leader.clone()];
Expand Down
Loading