Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved several consensus fields to state struct. #4825

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions consensus/checks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package consensus

import (
"testing"

msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/stretchr/testify/require"
)

// verifyMessageSig modifies the message signature when returns error
func TestVerifyMessageSig(t *testing.T) {
message := &msg_pb.Message{
Signature: []byte("signature"),
}

err := verifyMessageSig(nil, message)
require.Error(t, err)
require.Empty(t, message.Signature)
}
26 changes: 6 additions & 20 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/harmony-one/abool"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
Expand Down Expand Up @@ -65,8 +64,6 @@ type Consensus struct {
decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process
fBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
phase FBFTPhase
// current indicates what state a node is in
current State
// isBackup declarative the node is in backup mode
Expand All @@ -89,15 +86,7 @@ type Consensus struct {
MinPeers int
// private/public keys of current node
priKey multibls.PrivateKeys
// the publickey of leader
leaderPubKey unsafe.Pointer //*bls.PublicKeyWrapper
// blockNum: the next blockNumber that FBFT is going to agree on,
// should be equal to the blockNumber of next block
blockNum uint64
// Blockhash - 32 byte
blockHash [32]byte
// Block to run consensus on
block []byte

// Shard Id which this node belongs to
ShardID uint32
// IgnoreViewIDCheck determines whether to ignore viewID check
Expand Down Expand Up @@ -241,21 +230,19 @@ func (consensus *Consensus) getPublicKeys() multibls.PublicKeys {
}

func (consensus *Consensus) GetLeaderPubKey() *bls_cosi.PublicKeyWrapper {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.getLeaderPubKey()
}

func (consensus *Consensus) getLeaderPubKey() *bls_cosi.PublicKeyWrapper {
return (*bls_cosi.PublicKeyWrapper)(atomic.LoadPointer(&consensus.leaderPubKey))
return consensus.current.getLeaderPubKey()
}

func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
consensus.setLeaderPubKey(pub)
}

func (consensus *Consensus) setLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
atomic.StorePointer(&consensus.leaderPubKey, unsafe.Pointer(pub))
consensus.current.setLeaderPubKey(pub)
}

func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys {
Expand Down Expand Up @@ -284,11 +271,11 @@ func (consensus *Consensus) IsBackup() bool {
}

func (consensus *Consensus) BlockNum() uint64 {
return atomic.LoadUint64(&consensus.blockNum)
return consensus.getBlockNum()
}

func (consensus *Consensus) getBlockNum() uint64 {
return atomic.LoadUint64(&consensus.blockNum)
return atomic.LoadUint64(&consensus.current.blockNum)
}

// New create a new Consensus record
Expand All @@ -301,8 +288,7 @@ func New(
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: NewState(Normal),
current: NewState(Normal, shard),
decider: Decider,
registry: registry,
MinPeers: minPeers,
Expand Down
42 changes: 18 additions & 24 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package consensus

import (
"math/big"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -65,7 +64,7 @@ var (
// Signs the consensus message and returns the marshaled message.
func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Message,
priKey *bls_core.SecretKey) ([]byte, error) {
if err := consensus.signConsensusMessage(message, priKey); err != nil {
if err := signConsensusMessage(message, priKey); err != nil {
return empty, err
}
marshaledMessage, err := protobuf.Marshal(message)
Expand Down Expand Up @@ -113,30 +112,30 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
}

// Sign on the hash of the message
func (consensus *Consensus) signMessage(message []byte, priKey *bls_core.SecretKey) []byte {
func signMessage(message []byte, priKey *bls_core.SecretKey) []byte {
hash := hash.Keccak256(message)
signature := priKey.SignHash(hash[:])
return signature.Serialize()
}

// Sign on the consensus message signature field.
func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message,
func signConsensusMessage(message *msg_pb.Message,
priKey *bls_core.SecretKey) error {
message.Signature = nil
marshaledMessage, err := protobuf.Marshal(message)
if err != nil {
return err
}
// 64 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage, priKey)
signature := signMessage(marshaledMessage, priKey)
message.Signature = signature
return nil
}

// UpdateBitmaps update the bitmaps for prepare and commit phase
func (consensus *Consensus) updateBitmaps() {
consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()).
Str("MessageType", consensus.current.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.decider.Participants()
prepareBitmap := bls_cosi.NewMask(members)
Expand Down Expand Up @@ -199,8 +198,8 @@ func (consensus *Consensus) sendLastSignPower() {
func (consensus *Consensus) resetState() {
consensus.switchPhase("ResetState", FBFTAnnounce)

consensus.blockHash = [32]byte{}
consensus.block = []byte{}
consensus.current.blockHash = [32]byte{}
consensus.current.block = []byte{}
consensus.decider.ResetPrepareAndCommitVotes()
if consensus.prepareBitmap != nil {
consensus.prepareBitmap.Clear()
Expand Down Expand Up @@ -295,23 +294,23 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {

// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
func (consensus *Consensus) SetBlockNum(blockNum uint64) {
atomic.StoreUint64(&consensus.blockNum, blockNum)
consensus.setBlockNum(blockNum)
}

// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
func (consensus *Consensus) setBlockNum(blockNum uint64) {
atomic.StoreUint64(&consensus.blockNum, blockNum)
consensus.current.setBlockNum(blockNum)
}

// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) {
consensus.mutex.RLock()
members := consensus.decider.Participants()
consensus.mutex.RUnlock()
return consensus.readSignatureBitmapPayload(recvPayload, offset, members)
return readSignatureBitmapPayload(recvPayload, offset, members)
}

func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) {
func readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) {
if offset+bls.BLSSignatureSizeInBytes > len(recvPayload) {
return nil, nil, errors.New("payload not have enough length")
}
Expand Down Expand Up @@ -596,12 +595,7 @@ func (consensus *Consensus) GetFinality() int64 {

// switchPhase will switch FBFTPhase to desired phase.
func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) {
consensus.getLogger().Info().
Str("from:", consensus.phase.String()).
Str("to:", desired.String()).
Str("switchPhase:", subject)

consensus.phase = desired
consensus.current.switchPhase(subject, desired)
}

var (
Expand All @@ -623,15 +617,15 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
return errGetPreparedBlock
}

aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
aggSig, mask, err := readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
if err != nil {
return errReadBitmapPayload
}

// Have to keep the block hash so the leader can finish the commit phase of prepared block
consensus.resetState()

copy(consensus.blockHash[:], blockHash[:])
copy(consensus.current.blockHash[:], blockHash[:])
consensus.switchPhase("selfCommit", FBFTCommit)
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
Expand All @@ -651,7 +645,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
quorum.Commit,
[]*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload),
common.BytesToHash(consensus.blockHash[:]),
common.BytesToHash(consensus.current.blockHash[:]),
block.NumberU64(),
block.Header().ViewID().Uint64(),
); err != nil {
Expand Down Expand Up @@ -697,9 +691,9 @@ func (consensus *Consensus) GetLogger() *zerolog.Logger {
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Uint32("shardID", consensus.ShardID).
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.getCurBlockViewID()).
Str("phase", consensus.phase.String()).
Uint64("myBlock", consensus.current.getBlockNum()).
Uint64("myViewID", consensus.current.getCurBlockViewID()).
Str("phase", consensus.current.phase.String()).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestSignAndMarshalConsensusMessage(t *testing.T) {
t.Fatalf("Cannot craeate consensus: %v", err)
}
consensus.SetCurBlockViewID(2)
consensus.blockHash = [32]byte{}
consensus.current.blockHash = [32]byte{}

msg := &msg_pb.Message{}
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(msg, blsPriKey)
Expand Down
4 changes: 2 additions & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.NoError(t, err)

messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
state := NewState(Normal)
state := NewState(Normal, consensus.ShardID)

timeouts := createTimeout()
expectedTimeouts := make(map[TimeoutType]time.Duration)
Expand All @@ -37,7 +37,7 @@ func TestConsensusInitialization(t *testing.T) {
// FBFTLog
assert.NotNil(t, consensus.FBFTLog())

assert.Equal(t, FBFTAnnounce, consensus.phase)
assert.Equal(t, FBFTAnnounce, consensus.current.phase)

// State / consensus.current
assert.Equal(t, state.mode, consensus.current.mode)
Expand Down
5 changes: 2 additions & 3 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/hex"
"math/big"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -179,7 +178,7 @@ func (consensus *Consensus) _finalCommit(isLeader bool) {
)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
// find correct block content
curBlockHash := consensus.blockHash
curBlockHash := consensus.current.blockHash
block := consensus.fBFTLog.GetBlockByHash(curBlockHash)
if block == nil {
consensus.getLogger().Warn().
Expand Down Expand Up @@ -841,7 +840,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK

// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
consensus.setBlockNum(blk.NumberU64() + 1)
consensus.setCurBlockViewID(committedMsg.ViewID + 1)
var epoch *big.Int
if blk.IsLastBlockInEpoch() {
Expand Down
30 changes: 15 additions & 15 deletions consensus/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,32 @@ type NetworkMessage struct {
}

// Populates the common basic fields for all consensus message.
func (consensus *Consensus) populateMessageFields(
func (pm *State) populateMessageFields(
request *msg_pb.ConsensusRequest, blockHash []byte,
) *msg_pb.ConsensusRequest {
request.ViewId = consensus.getCurBlockViewID()
request.BlockNum = consensus.getBlockNum()
request.ShardId = consensus.ShardID
request.ViewId = pm.getCurBlockViewID()
request.BlockNum = pm.getBlockNum()
request.ShardId = pm.ShardID
// 32 byte block hash
request.BlockHash = blockHash
return request
}

// Populates the common basic fields for the consensus message and senders bitmap.
func (consensus *Consensus) populateMessageFieldsAndSendersBitmap(
func (pm *State) populateMessageFieldsAndSendersBitmap(
request *msg_pb.ConsensusRequest, blockHash []byte, bitmap []byte,
) *msg_pb.ConsensusRequest {
consensus.populateMessageFields(request, blockHash)
pm.populateMessageFields(request, blockHash)
// sender address
request.SenderPubkeyBitmap = bitmap
return request
}

// Populates the common basic fields for the consensus message and single sender.
func (consensus *Consensus) populateMessageFieldsAndSender(
func (pm *State) populateMessageFieldsAndSender(
request *msg_pb.ConsensusRequest, blockHash []byte, pubKey bls.SerializedPublicKey,
) *msg_pb.ConsensusRequest {
consensus.populateMessageFields(request, blockHash)
pm.populateMessageFields(request, blockHash)
// sender address
request.SenderPubkey = pubKey[:]
return request
Expand All @@ -75,26 +75,26 @@ func (consensus *Consensus) construct(
)

if len(priKeys) == 1 {
consensusMsg = consensus.populateMessageFieldsAndSender(
message.GetConsensus(), consensus.blockHash[:], priKeys[0].Pub.Bytes,
consensusMsg = consensus.current.populateMessageFieldsAndSender(
message.GetConsensus(), consensus.current.blockHash[:], priKeys[0].Pub.Bytes,
)
} else {
// TODO: use a persistent bitmap to report bitmap
mask := bls.NewMask(consensus.decider.Participants())
for _, key := range priKeys {
mask.SetKey(key.Pub.Bytes, true)
}
consensusMsg = consensus.populateMessageFieldsAndSendersBitmap(
message.GetConsensus(), consensus.blockHash[:], mask.Bitmap,
consensusMsg = consensus.current.populateMessageFieldsAndSendersBitmap(
message.GetConsensus(), consensus.current.blockHash[:], mask.Bitmap,
)
}

// Do the signing, 96 byte of bls signature
needMsgSig := true
switch p {
case msg_pb.MessageType_ANNOUNCE:
consensusMsg.Block = consensus.block
consensusMsg.Payload = consensus.blockHash[:]
consensusMsg.Block = consensus.current.block
consensusMsg.Payload = consensus.current.blockHash[:]
case msg_pb.MessageType_PREPARE:
needMsgSig = false
sig := bls_core.Sign{}
Expand All @@ -114,7 +114,7 @@ func (consensus *Consensus) construct(
}
consensusMsg.Payload = sig.Serialize()
case msg_pb.MessageType_PREPARED:
consensusMsg.Block = consensus.block
consensusMsg.Block = consensus.current.block
consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Prepare)
case msg_pb.MessageType_COMMITTED:
consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Commit)
Expand Down
Loading