diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 42e295573e..5da9383c1c 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -556,11 +556,6 @@ func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 { return consensus.setCurBlockViewID(viewID) } -// SetCurBlockViewID set the current view ID -func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 { - return consensus.current.SetCurBlockViewID(viewID) -} - // SetViewChangingID set the current view change ID func (consensus *Consensus) SetViewChangingID(viewID uint64) { consensus.current.SetViewChangingID(viewID) diff --git a/consensus/quorum/quorom_test.go b/consensus/quorum/quorom_test.go index 7622f5aac3..ff020f64f4 100644 --- a/consensus/quorum/quorom_test.go +++ b/consensus/quorum/quorom_test.go @@ -629,7 +629,7 @@ func TestCIdentities_NthNextValidatorFailedEdgeCase2(t *testing.T) { case <-done: t.Error("Expected a timeout, but successfully calculated next leader") - case <-time.After(5 * time.Second): + case <-time.After(1 * time.Second): t.Log("Test timed out, possible infinite loop") } } diff --git a/consensus/quorum/quorum.go b/consensus/quorum/quorum.go index 888b2c3511..c8d07c8642 100644 --- a/consensus/quorum/quorum.go +++ b/consensus/quorum/quorum.go @@ -79,7 +79,8 @@ type ParticipantTracker interface { NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) - FirstParticipant(shardingconfig.Instance) *bls.PublicKeyWrapper + NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) + FirstParticipant() *bls.PublicKeyWrapper UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) } @@ -202,12 +203,10 @@ func (s *cIdentities) IndexOf(pubKey bls.SerializedPublicKey) int { } // NthNext return the Nth next pubkey, next can be negative number -func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) { - found := false - +func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) { idx := s.IndexOf(pubKey.Bytes) - if idx != -1 { - found = true + if idx == -1 { + return nil, errors.Errorf("pubKey not found %x", pubKey.Bytes) } numNodes := int(s.ParticipantsCount()) // sanity check to avoid out of bound access @@ -215,7 +214,7 @@ func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bl numNodes = len(s.publicKeys) } idx = (idx + next) % numNodes - return found, &s.publicKeys[idx] + return &s.publicKeys[idx], nil } // NthNextValidatorV2 returns the Nth next pubkey nodes, but from another validator. @@ -314,7 +313,7 @@ func (s *cIdentities) NthNextHmy(instance shardingconfig.Instance, pubKey *bls.P } // FirstParticipant returns the first participant of the shard -func (s *cIdentities) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper { +func (s *cIdentities) FirstParticipant() *bls.PublicKeyWrapper { return &s.publicKeys[0] } diff --git a/consensus/quorum/thread_safe_decider.go b/consensus/quorum/thread_safe_decider.go index 2cc877448a..ca69232787 100644 --- a/consensus/quorum/thread_safe_decider.go +++ b/consensus/quorum/thread_safe_decider.go @@ -56,6 +56,12 @@ func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey return a.decider.NthNextValidator(slotList, pubKey, next) } +func (a threadSafeDeciderImpl) NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.NthNext(pubKey, next) +} + func (a threadSafeDeciderImpl) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) { a.mu.Lock() defer a.mu.Unlock() @@ -68,10 +74,10 @@ func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubk return a.decider.NthNextHmy(instance, pubkey, next) } -func (a threadSafeDeciderImpl) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper { +func (a threadSafeDeciderImpl) FirstParticipant() *bls.PublicKeyWrapper { a.mu.Lock() defer a.mu.Unlock() - return a.decider.FirstParticipant(instance) + return a.decider.FirstParticipant() } func (a threadSafeDeciderImpl) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) { diff --git a/consensus/view_change.go b/consensus/view_change.go index 6fb6def705..5d1792f069 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -61,6 +61,11 @@ func (pm *State) SetCurBlockViewID(viewID uint64) uint64 { return viewID } +// SetCurBlockViewID set the current view ID +func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 { + return consensus.current.SetCurBlockViewID(viewID) +} + // GetViewChangingID return the current view changing id // It is meaningful during view change mode func (pm *State) GetViewChangingID() uint64 { @@ -140,10 +145,27 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { return nextViewID, viewChangeDuration } -// getNextLeaderKey uniquely determine who is the leader for given viewID -// It reads the current leader's pubkey based on the blockchain data and returns +// getNextLeaderKeySkipSameAddress uniquely determine who is the leader for given viewID +// It receives the committee and returns // the next leader based on the gap of the viewID of the view change and the last // know view id of the block. +func (consensus *Consensus) getNextLeaderKeySkipSameAddress(viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper { + gap := 1 + + cur := consensus.getCurBlockViewID() + if viewID > cur { + gap = int(viewID - cur) + } + // use pubkey as default key as well + leaderPubKey := consensus.getLeaderPubKey() + rs, err := viewChangeNextValidator(consensus.decider, gap, committee.Slots, leaderPubKey) + if err != nil { + consensus.getLogger().Error().Err(err).Msg("[getNextLeaderKeySkipSameAddress] viewChangeNextValidator failed") + return leaderPubKey + } + return rs +} + func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper { gap := 1 @@ -182,7 +204,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com // it can still sync with other validators. if curHeader.IsLastBlockInEpoch() { consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch") - lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) + lastLeaderPubKey = consensus.decider.FirstParticipant() } } } @@ -231,6 +253,46 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com return next } +type nthNext interface { + NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) +} + +func viewChangeNextValidator(decider nthNext, gap int, slots shard.SlotList, lastLeaderPubKey *bls.PublicKeyWrapper) (*bls.PublicKeyWrapper, error) { + if gap > 1 { + current, err := decider.NthNext( + lastLeaderPubKey, + gap-1) + if err != nil { + return nil, errors.WithMessagef(err, "NthNext failed, gap %d", gap) + } + + publicToAddress := make(map[bls.SerializedPublicKey]common.Address) + for _, slot := range slots { + publicToAddress[slot.BLSPublicKey] = slot.EcdsaAddress + } + + for i := 0; i < len(slots); i++ { + gap = gap + i + next, err := decider.NthNext( + lastLeaderPubKey, + gap) + if err != nil { + return nil, errors.New("current leader not found") + } + + if publicToAddress[current.Bytes] != publicToAddress[next.Bytes] { + return next, nil + } + } + } else { + next, err := decider.NthNext( + lastLeaderPubKey, + gap) + return next, errors.WithMessagef(err, "NthNext failed, gap %d", gap) + } + return nil, errors.New("current leader not found") +} + func createTimeout() map[TimeoutType]*utils.Timeout { timeouts := make(map[TimeoutType]*utils.Timeout) timeouts[timeoutConsensus] = utils.NewTimeout(phaseDuration) @@ -250,7 +312,8 @@ func (consensus *Consensus) startViewChange() { consensus.current.SetMode(ViewChanging) nextViewID, duration := consensus.getNextViewID() consensus.setViewChangingID(nextViewID) - epoch := consensus.Blockchain().CurrentHeader().Epoch() + currentHeader := consensus.Blockchain().CurrentHeader() + epoch := currentHeader.Epoch() ss, err := consensus.Blockchain().ReadShardState(epoch) if err != nil { utils.Logger().Error().Err(err).Msg("Failed to read shard state") @@ -267,7 +330,12 @@ func (consensus *Consensus) startViewChange() { // aganist the consensus.LeaderPubKey variable. // Ideally, we shall use another variable to keep track of the // leader pubkey in viewchange mode - consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee)) + c := consensus.Blockchain().Config() + if c.IsLeaderRotationV2Epoch(currentHeader.Epoch()) { + consensus.setLeaderPubKey(consensus.getNextLeaderKeySkipSameAddress(nextViewID, committee)) + } else { + consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee)) + } consensus.getLogger().Warn(). Uint64("nextViewID", nextViewID). diff --git a/consensus/view_change_test.go b/consensus/view_change_test.go index 42b44f7c78..ee599ca3da 100644 --- a/consensus/view_change_test.go +++ b/consensus/view_change_test.go @@ -1,13 +1,17 @@ package consensus import ( + "math/big" "testing" - "github.com/harmony-one/harmony/crypto/bls" - + "github.com/ethereum/go-ethereum/common" bls_core "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/crypto/bls" harmony_bls "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/shard" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestBasicViewChanging(t *testing.T) { @@ -118,3 +122,135 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { assert.Equal(t, nextKey, &wrappedBLSKeys[1]) } + +func TestViewChangeNextValidator(t *testing.T) { + decider := quorum.NewDecider(quorum.SuperMajorityVote, shard.BeaconChainShardID) + assert.Equal(t, int64(0), decider.ParticipantsCount()) + wrappedBLSKeys := []bls.PublicKeyWrapper{} + + const keyCount = 5 + for i := 0; i < keyCount; i++ { + blsKey := harmony_bls.RandPrivateKey() + blsPubKey := harmony_bls.WrapperFromPrivateKey(blsKey) + wrappedBLSKeys = append(wrappedBLSKeys, *blsPubKey.Pub) + } + + decider.UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{}) + assert.EqualValues(t, keyCount, decider.ParticipantsCount()) + + t.Run("check_different_address_for_validators", func(t *testing.T) { + var ( + rs *bls.PublicKeyWrapper + err error + slots []shard.Slot + ) + for i := 0; i < keyCount; i++ { + slot := shard.Slot{ + EcdsaAddress: common.BigToAddress(big.NewInt(int64(i))), + BLSPublicKey: wrappedBLSKeys[i].Bytes, + } + slots = append(slots, slot) + } + + rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[0], rs) + + rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[1], rs) + + rs, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[2], rs) + + // and no panic or error for future 1k gaps + for i := 0; i < 1000; i++ { + _, err = viewChangeNextValidator(decider, i, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + } + }) + + // we can't find next validator, because all validators have the same address + t.Run("same_address_for_all_validators", func(t *testing.T) { + var ( + rs *bls.PublicKeyWrapper + err error + slots []shard.Slot + ) + for i := 0; i < keyCount; i++ { + slot := shard.Slot{ + EcdsaAddress: common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z3")), + BLSPublicKey: wrappedBLSKeys[i].Bytes, + } + slots = append(slots, slot) + } + + rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[0], rs) + + rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[1], rs) + + // error because all validators belong same address + _, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0]) + require.Error(t, err) + + // all of them return error, no way to recover + for i := 2; i < 1000; i++ { + _, err = viewChangeNextValidator(decider, i, slots, &wrappedBLSKeys[0]) + require.Errorf(t, err, "error because all validators belong same address %d", i) + } + }) + + // we can't find next validator, because all validators have the same address + t.Run("check_5_validators_2_addrs", func(t *testing.T) { + // Slot represents node id (BLS address) + var ( + addr1 = common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z3")) + addr2 = common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z4")) + rs *bls.PublicKeyWrapper + err error + ) + slots := []shard.Slot{ + { + EcdsaAddress: addr1, + BLSPublicKey: wrappedBLSKeys[0].Bytes, + }, + { + EcdsaAddress: addr1, + BLSPublicKey: wrappedBLSKeys[1].Bytes, + }, + { + EcdsaAddress: addr2, + BLSPublicKey: wrappedBLSKeys[2].Bytes, + }, + { + EcdsaAddress: addr2, + BLSPublicKey: wrappedBLSKeys[3].Bytes, + }, + { + EcdsaAddress: addr2, + BLSPublicKey: wrappedBLSKeys[4].Bytes, + }, + } + + rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[0], rs) + + rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[1], rs) + + rs, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[2], rs) + + rs, err = viewChangeNextValidator(decider, 3, slots, &wrappedBLSKeys[0]) + require.NoError(t, err) + require.Equal(t, &wrappedBLSKeys[1], rs) + }) +} diff --git a/internal/chain/engine.go b/internal/chain/engine.go index d5866e3887..07a458ad7a 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -6,11 +6,11 @@ import ( "sort" "time" + bls2 "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/internal/params" "github.com/harmony-one/harmony/numeric" - bls2 "github.com/harmony-one/bls/ffi/go/bls" blsvrf "github.com/harmony-one/harmony/crypto/vrf/bls" "github.com/ethereum/go-ethereum/common"