diff --git a/mq/mq.go b/mq/mq.go index 9b0a970..7755ed6 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -33,22 +33,32 @@ func New(opts Options) MessageQueue { // have heights up to (and including) the given height. The appropriate callback // will be called for every message that is consumed. All consumed messages will // be dropped from the MessageQueue. -func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit)) (n int) { +func (mq *MessageQueue) Consume(h process.Height, propose func(process.Propose), prevote func(process.Prevote), precommit func(process.Precommit), procsAllowed map[id.Signatory]bool) (n int) { for from, q := range mq.queuesByPid { for len(q) > 0 { if q[0] == nil || height(q[0]) > h { break } - switch msg := q[0].(type) { - case process.Propose: - propose(msg) - case process.Prevote: - prevote(msg) - case process.Precommit: - precommit(msg) - } - n++ - q = q[1:] + + func() { + defer func() { + n++ + q = q[1:] + }() + + if ok := procsAllowed[from]; !ok { + return + } + + switch msg := q[0].(type) { + case process.Propose: + propose(msg) + case process.Prevote: + prevote(msg) + case process.Precommit: + precommit(msg) + } + }() } mq.queuesByPid[from] = q } diff --git a/mq/mq_test.go b/mq/mq_test.go index 4fdd9e5..ff77ff0 100644 --- a/mq/mq_test.go +++ b/mq/mq_test.go @@ -108,6 +108,7 @@ var _ = Describe("MQ", func() { proposeCallback, prevoteCallback, precommitCallback, + map[id.Signatory]bool{}, ) Expect(n).To(Equal(0)) @@ -115,6 +116,221 @@ var _ = Describe("MQ", func() { }) Context("when we can insert new messages", func() { + Context("when filtering the sender against the whitelist", func() { + Context("when the sender is whitelisted", func() { + It("should process the message", func() { + opts := mq.DefaultOptions() + queue := mq.New(opts) + + loop := func() bool { + sender := id.NewPrivKey().Signatory() + height := process.Height(r.Int63()) + round := process.Round(r.Int63()) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true + + msg := randomMsg(r, sender, height, round) + switch msg := msg.(type) { + case process.Propose: + queue.InsertPropose(msg) + case process.Prevote: + queue.InsertPrevote(msg) + case process.Precommit: + queue.InsertPrecommit(msg) + } + + processed := false + proposeCallback := func(propose process.Propose) { + processed = true + } + prevoteCallback := func(prevote process.Prevote) { + processed = true + } + precommitCallback := func(precommit process.Precommit) { + processed = true + } + + // cannot consume msgs of height less than lowerHeight + n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) + Expect(n).To(Equal(1)) + Expect(processed).Should(BeTrue()) + + return true + } + Expect(quick.Check(loop, nil)).To(Succeed()) + }) + }) + + Context("when the sender is not whitelisted", func() { + It("should reject the message", func() { + opts := mq.DefaultOptions() + queue := mq.New(opts) + + loop := func() bool { + sender := id.NewPrivKey().Signatory() + height := process.Height(r.Int63()) + round := process.Round(r.Int63()) + procsAllowed := map[id.Signatory]bool{} + + msg := randomMsg(r, sender, height, round) + switch msg := msg.(type) { + case process.Propose: + queue.InsertPropose(msg) + case process.Prevote: + queue.InsertPrevote(msg) + case process.Precommit: + queue.InsertPrecommit(msg) + } + + processed := false + proposeCallback := func(propose process.Propose) { + processed = true + } + prevoteCallback := func(prevote process.Prevote) { + processed = true + } + precommitCallback := func(precommit process.Precommit) { + processed = true + } + + // It should filter out the message + n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) + Expect(n).To(Equal(1)) + Expect(processed).Should(BeFalse()) + + return true + } + Expect(quick.Check(loop, nil)).To(Succeed()) + }) + }) + + Context("when sender is removed from the whitelist in the future", func() { + It("should process message from the sender until it's removed from the whitelist", func() { + opts := mq.DefaultOptions() + queue := mq.New(opts) + + loop := func() bool { + sender := id.NewPrivKey().Signatory() + height := process.Height(r.Int63()) + higherHeight := height + 1 + process.Height(r.Intn(100)) + round := process.Round(r.Int63()) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true + + msg := randomMsg(r, sender, height, round) + switch msg := msg.(type) { + case process.Propose: + queue.InsertPropose(msg) + case process.Prevote: + queue.InsertPrevote(msg) + case process.Precommit: + queue.InsertPrecommit(msg) + } + + msgWithHigherHeight := randomMsg(r, sender, higherHeight, round) + switch msgWithHigherHeight := msgWithHigherHeight.(type) { + case process.Propose: + queue.InsertPropose(msgWithHigherHeight) + case process.Prevote: + queue.InsertPrevote(msgWithHigherHeight) + case process.Precommit: + queue.InsertPrecommit(msgWithHigherHeight) + } + + processed := false + proposeCallback := func(propose process.Propose) { + processed = true + } + prevoteCallback := func(prevote process.Prevote) { + processed = true + } + precommitCallback := func(precommit process.Precommit) { + processed = true + } + + // It should process the message when consuming current height + n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) + Expect(n).To(Equal(1)) + Expect(processed).Should(BeTrue()) + + // Remove the sender from whitelist + delete(procsAllowed, sender) + processed = false + + // It should not process the message when consuming future height + n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) + Expect(n).To(Equal(1)) + Expect(processed).Should(BeFalse()) + + return true + } + Expect(quick.Check(loop, nil)).To(Succeed()) + }) + }) + + Context("when sender is added to the whitelist in the future", func() { + It("should not process message from the sender until it's added to the whitelist", func() { + opts := mq.DefaultOptions() + queue := mq.New(opts) + + loop := func() bool { + sender := id.NewPrivKey().Signatory() + height := process.Height(r.Int63()) + higherHeight := height + 1 + process.Height(r.Intn(100)) + round := process.Round(r.Int63()) + procsAllowed := map[id.Signatory]bool{} + + msg := randomMsg(r, sender, height, round) + switch msg := msg.(type) { + case process.Propose: + queue.InsertPropose(msg) + case process.Prevote: + queue.InsertPrevote(msg) + case process.Precommit: + queue.InsertPrecommit(msg) + } + + msgWithHigherHeight := randomMsg(r, sender, higherHeight, round) + switch msgWithHigherHeight := msgWithHigherHeight.(type) { + case process.Propose: + queue.InsertPropose(msgWithHigherHeight) + case process.Prevote: + queue.InsertPrevote(msgWithHigherHeight) + case process.Precommit: + queue.InsertPrecommit(msgWithHigherHeight) + } + + processed := false + proposeCallback := func(propose process.Propose) { + processed = true + } + prevoteCallback := func(prevote process.Prevote) { + processed = true + } + precommitCallback := func(precommit process.Precommit) { + processed = true + } + + // It should not process the message when consuming current height + n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) + Expect(n).To(Equal(1)) + Expect(processed).Should(BeFalse()) + + // Add the sender to whitelist + procsAllowed[sender] = true + + // It should process the message when consuming future height + n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) + Expect(n).To(Equal(1)) + Expect(processed).Should(BeTrue()) + + return true + } + Expect(quick.Check(loop, nil)).To(Succeed()) + }) + }) + }) + Context("when two messages have different heights", func() { It("should correctly sort the messages based on height", func() { opts := mq.DefaultOptions() @@ -124,6 +340,8 @@ var _ = Describe("MQ", func() { sender := id.NewPrivKey().Signatory() lowerHeight := process.Height(r.Int63()) higherHeight := lowerHeight + 1 + process.Height(r.Intn(100)) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true // send msg1 msg1 := randomMsg(r, sender, lowerHeight, processutil.RandomRound(r)) @@ -199,12 +417,12 @@ var _ = Describe("MQ", func() { // cannot consume msgs of height less than lowerHeight evenLowerHeight := lowerHeight - 1 - process.Height(r.Intn(100)) - n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(evenLowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(i).To(Equal(0)) // consume all messages - n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(higherHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(2)) Expect(i).To(Equal(2)) @@ -222,6 +440,9 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() height := process.Height(r.Int63()) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true + // at the most 20 rounds rounds := make([]process.Round, 1+r.Intn(20)) for t := 0; t < cap(rounds); t++ { @@ -280,12 +501,12 @@ var _ = Describe("MQ", func() { // cannot consume msgs of height less than lowerHeight lowerHeight := height - 1 - process.Height(r.Intn(100)) - n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(lowerHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(t).To(Equal(0)) // consume all messages - n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(cap(rounds))) Expect(t).To(Equal(cap(rounds))) @@ -303,6 +524,8 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() minHeight, maxHeight, msgsCount := insertRandomMessages(&queue, sender) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true // we should first consume msg1 and then msg2 prevHeight := process.Height(-1) @@ -369,12 +592,12 @@ var _ = Describe("MQ", func() { } // cannot consume msgs of height less than the min height - n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(minHeight-1, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(0)) Expect(i).To(Equal(0)) // consume all messages - n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback) + n = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(msgsCount)) Expect(i).To(Equal(msgsCount)) @@ -392,6 +615,8 @@ var _ = Describe("MQ", func() { loop := func() bool { sender := id.NewPrivKey().Signatory() + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true _, maxHeight, _ := insertRandomMessages(&queue, sender) thresholdHeight := process.Height(r.Intn(int(maxHeight))) queue.DropMessagesBelowHeight(thresholdHeight) @@ -406,7 +631,7 @@ var _ = Describe("MQ", func() { Expect(precommit.Height >= thresholdHeight).To(BeTrue()) } - _ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback) + _ = queue.Consume(maxHeight, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) return true } Expect(quick.Check(loop, nil)).To(Succeed()) @@ -418,6 +643,7 @@ var _ = Describe("MQ", func() { loop := func() bool { opts := mq.DefaultOptions().WithMaxCapacity(1) queue := mq.New(opts) + procsAllowed := map[id.Signatory]bool{} // insert a msg originalSender := id.NewPrivKey().Signatory() @@ -425,6 +651,7 @@ var _ = Describe("MQ", func() { originalMsg.From = originalSender originalMsg.Height = process.Height(1) originalMsg.Round = process.Round(1) + procsAllowed[originalMsg.From] = true queue.InsertPropose(originalMsg) // any message in height > 1 or (height = 1 || round > 1) will be dropped @@ -434,11 +661,12 @@ var _ = Describe("MQ", func() { msg.From = id.NewPrivKey().Signatory() msg.Height = process.Height(1) msg.Round = process.Round(2) + procsAllowed[msg.From] = true queue.InsertPropose(msg) // so consuming will only return the first msg proposeCallback := func(propose process.Propose) {} - n := queue.Consume(process.Height(1), proposeCallback, nil, nil) + n := queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(2)) // re-insert the original msg @@ -458,7 +686,7 @@ var _ = Describe("MQ", func() { Expect(propose.Round).To(Equal(originalMsg.Round)) Expect(propose.From).To(Equal(originalSender)) } - n = queue.Consume(process.Height(1), proposeCallback, nil, nil) + n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(1)) // re-insert the original msg @@ -477,7 +705,7 @@ var _ = Describe("MQ", func() { Expect(propose.Round).To(Equal(msg.Round)) Expect(propose.From).To(Equal(originalSender)) } - n = queue.Consume(process.Height(1), proposeCallback, nil, nil) + n = queue.Consume(process.Height(1), proposeCallback, nil, nil, procsAllowed) Expect(n).To(Equal(1)) return true @@ -497,6 +725,8 @@ var _ = Describe("MQ", func() { // msgsCount > c sender := id.NewPrivKey().Signatory() height := process.Height(1) + procsAllowed := map[id.Signatory]bool{} + procsAllowed[sender] = true msgsCount := c + 5 + r.Intn(20) rounds := make([]process.Round, msgsCount) msgs := make([]interface{}, msgsCount) @@ -553,7 +783,7 @@ var _ = Describe("MQ", func() { i++ } - n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback) + n := queue.Consume(height, proposeCallback, prevoteCallback, precommitCallback, procsAllowed) Expect(n).To(Equal(c)) Expect(i).To(Equal(c)) diff --git a/process/process.go b/process/process.go index b448de4..9dc05da 100644 --- a/process/process.go +++ b/process/process.go @@ -69,7 +69,7 @@ type Validator interface { // new Value implies that all correct Processes agree on this Value at this // Height, and will never revert. type Committer interface { - Commit(Height, Value) + Commit(Height, Value) (uint64, Scheduler) } // A Catcher is used to catch bad behaviour in other Processes. For example, @@ -278,6 +278,12 @@ func (p *Process) Start() { p.StartRound(0) } +func (p *Process) StartWithNewSignatories(f uint64, scheduler Scheduler) { + p.f = f + p.scheduler = scheduler + p.StartRound(0) +} + // StartRound will progress the Process to a new Round. It does not assume that // the Height has changed. Since this changes the current Round and the current // Step, most of the condition methods will be retried at the end (by way of @@ -694,7 +700,13 @@ func (p *Process) tryCommitUponSufficientPrecommits(round Round) { } } if precommitsForValue >= int(2*p.f+1) { - p.committer.Commit(p.CurrentHeight, propose.Value) + f, scheduler := p.committer.Commit(p.CurrentHeight, propose.Value) + if f != 0 { + p.f = f + } + if scheduler != nil { + p.scheduler = scheduler + } p.CurrentHeight++ // Reset lockedRound, lockedValue, validRound, and validValue to initial diff --git a/process/process_test.go b/process/process_test.go index c9aa184..573a9a5 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -820,9 +820,10 @@ var _ = Describe("Process", func() { BroadcastPrecommitCallback: nil, } committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { // we should not commit to the nil value proposal Expect(true).To(BeFalse()) + return 0, nil }, } p := process.New(whoami, f, nil, nil, nil, nil, broadcaster, committer, nil) @@ -2674,10 +2675,11 @@ var _ = Describe("Process", func() { f := 5 + (r.Int() % 10) acknowledge := false committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { Expect(height).To(Equal(currentHeight)) Expect(value).To(Equal(proposedValue)) acknowledge = true + return 0, nil }, } scheduledProposer := id.NewPrivKey().Signatory() @@ -2737,10 +2739,11 @@ var _ = Describe("Process", func() { f := 5 + (r.Int() % 10) acknowledge := false committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { Expect(height).To(Equal(currentHeight)) Expect(value).To(Equal(proposedValue)) acknowledge = true + return 0, nil }, } @@ -2786,6 +2789,223 @@ var _ = Describe("Process", func() { }) }) + Context("when the committer signals a signatories change", func() { + It("should update the f", func() { + loop := func() bool { + currentHeight := process.Height(r.Int63()) + currentRound := process.Round(r.Int63()) + proposedValue := processutil.RandomValue(r) + for proposedValue == process.NilValue { + proposedValue = processutil.RandomValue(r) + } + whoami := id.NewPrivKey().Signatory() + f := 5 + (r.Int() % 10) + acknowledge := false + newF := f + 1 + newsSheduledProposer := id.NewPrivKey().Signatory() + newScheduler := scheduler.NewRoundRobin([]id.Signatory{newsSheduledProposer}) + committer := processutil.CommitterCallback{ + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { + acknowledge = true + return uint64(newF), newScheduler + }, + } + scheduledProposer := id.NewPrivKey().Signatory() + scheduler := scheduler.NewRoundRobin([]id.Signatory{scheduledProposer}) + validator := processutil.MockValidator{MockValid: func(process.Height, process.Round, process.Value) bool { return true }} + + // instantiate a new process at the current round and height + // and at any valid step + p := process.New(whoami, f, nil, scheduler, nil, validator, nil, committer, nil) + p.StartRound(currentRound) + p.State.CurrentHeight = currentHeight + p.State.CurrentStep = process.Step(r.Int() % 3) + + // feed the process with 2f+1 precommit messages + // we expect nothing to happen + for t := 0; t < 2*f+1; t++ { + msg := randomValidPrecommitMsg(r, currentHeight, currentRound, proposedValue) + p.Precommit(msg) + } + Expect(p.State.CurrentHeight).To(Equal(currentHeight)) + Expect(p.State.CurrentRound).To(Equal(currentRound)) + Expect(acknowledge).ToNot(BeTrue()) + + // feed the process with a propose message + msg := process.Propose{ + Height: currentHeight, + Round: currentRound, + ValidRound: processutil.RandomRound(r), + Value: proposedValue, + From: scheduledProposer, + } + p.Propose(msg) + + defaultState := process.DefaultState() + Expect(p.State.CurrentHeight).To(Equal(currentHeight + 1)) + Expect(p.CurrentRound).To(Equal(process.Round(0))) + Expect(p.State.CurrentStep).To(Equal(defaultState.CurrentStep)) + Expect(p.State.LockedRound).To(Equal(defaultState.LockedRound)) + Expect(p.State.LockedValue).To(Equal(defaultState.LockedValue)) + Expect(p.State.ValidValue).To(Equal(defaultState.ValidValue)) + Expect(p.State.ValidRound).To(Equal(defaultState.ValidRound)) + Expect(acknowledge).To(BeTrue()) + + // To verify the change of f and scheduler, we simulate another height + acknowledge = false + + // feed the process with a propose message from the new scheduled proposer + proposeMsg := process.Propose{ + Height: currentHeight +1, + Round: 0, + ValidRound: processutil.RandomRound(r), + Value: proposedValue, + From: newsSheduledProposer, + } + p.Propose(proposeMsg) + + // feed the process with 2f+1 precommit messages + // we expect nothing to happen + for t := 0; t < 2*f+1; t++ { + msg := randomValidPrecommitMsg(r, currentHeight + 1, 0, proposedValue) + p.Precommit(msg) + } + Expect(p.State.CurrentHeight).To(Equal(currentHeight +1)) + Expect(p.State.CurrentRound).To(Equal(process.Round(0))) + Expect(acknowledge).ToNot(BeTrue()) + + // Feed two more precommit since newF = f + 1 + for i := 0 ;i <2 ; i ++ { + msg := randomValidPrecommitMsg(r, currentHeight + 1, 0, proposedValue) + p.Precommit(msg) + } + Expect(p.State.CurrentHeight).To(Equal(currentHeight + 2)) + Expect(p.CurrentRound).To(Equal(process.Round(0))) + Expect(p.State.CurrentStep).To(Equal(defaultState.CurrentStep)) + Expect(p.State.LockedRound).To(Equal(defaultState.LockedRound)) + Expect(p.State.LockedValue).To(Equal(defaultState.LockedValue)) + Expect(p.State.ValidValue).To(Equal(defaultState.ValidValue)) + Expect(p.State.ValidRound).To(Equal(defaultState.ValidRound)) + Expect(acknowledge).To(BeTrue()) + return true + } + Expect(quick.Check(loop, nil)).To(Succeed()) + }) + + It("should update the scheduler", func() { + loop := func() bool { + currentHeight := process.Height(r.Int63()) + currentRound := process.Round(r.Int63()) + proposedValue := processutil.RandomValue(r) + for proposedValue == process.NilValue { + proposedValue = processutil.RandomValue(r) + } + whoami := id.NewPrivKey().Signatory() + f := 5 + (r.Int() % 10) + acknowledge := false + newF := f + 1 + newsSheduledProposer := id.NewPrivKey().Signatory() + newScheduler := scheduler.NewRoundRobin([]id.Signatory{newsSheduledProposer}) + committer := processutil.CommitterCallback{ + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { + acknowledge = true + return uint64(newF), newScheduler + }, + } + scheduledProposer := id.NewPrivKey().Signatory() + scheduler := scheduler.NewRoundRobin([]id.Signatory{scheduledProposer}) + validator := processutil.MockValidator{MockValid: func(process.Height, process.Round, process.Value) bool { return true }} + + // instantiate a new process at the current round and height + // and at any valid step + p := process.New(whoami, f, nil, scheduler, nil, validator, nil, committer, nil) + p.StartRound(currentRound) + p.State.CurrentHeight = currentHeight + p.State.CurrentStep = process.Step(r.Int() % 3) + + // feed the process with 2f+1 precommit messages + // we expect nothing to happen + for t := 0; t < 2*f+1; t++ { + msg := randomValidPrecommitMsg(r, currentHeight, currentRound, proposedValue) + p.Precommit(msg) + } + Expect(p.State.CurrentHeight).To(Equal(currentHeight)) + Expect(p.State.CurrentRound).To(Equal(currentRound)) + Expect(acknowledge).ToNot(BeTrue()) + + // feed the process with a propose message + msg := process.Propose{ + Height: currentHeight, + Round: currentRound, + ValidRound: processutil.RandomRound(r), + Value: proposedValue, + From: scheduledProposer, + } + p.Propose(msg) + + defaultState := process.DefaultState() + Expect(p.State.CurrentHeight).To(Equal(currentHeight + 1)) + Expect(p.CurrentRound).To(Equal(process.Round(0))) + Expect(p.State.CurrentStep).To(Equal(defaultState.CurrentStep)) + Expect(p.State.LockedRound).To(Equal(defaultState.LockedRound)) + Expect(p.State.LockedValue).To(Equal(defaultState.LockedValue)) + Expect(p.State.ValidValue).To(Equal(defaultState.ValidValue)) + Expect(p.State.ValidRound).To(Equal(defaultState.ValidRound)) + Expect(acknowledge).To(BeTrue()) + + // To verify the change of f and scheduler, we simulate another height + acknowledge = false + + // feed the process with 2 newF+1 precommit messages + // we expect nothing to happen + for t := 0; t < 2*newF+1; t++ { + msg := randomValidPrecommitMsg(r, currentHeight + 1, 0, proposedValue) + p.Precommit(msg) + } + Expect(p.State.CurrentHeight).To(Equal(currentHeight +1)) + Expect(p.State.CurrentRound).To(Equal(process.Round(0))) + Expect(acknowledge).ToNot(BeTrue()) + + // feed the process with a propose message from the old scheduled proposer + proposeMsgFromOldProposer := process.Propose{ + Height: currentHeight +1, + Round: 0, + ValidRound: processutil.RandomRound(r), + Value: proposedValue, + From: scheduledProposer, + } + p.Propose(proposeMsgFromOldProposer) + + // Nothing should happen since the scheduler has been changed + Expect(p.State.CurrentHeight).To(Equal(currentHeight+1)) + Expect(p.State.CurrentRound).To(Equal(process.Round(0))) + Expect(acknowledge).ToNot(BeTrue()) + + // feed the process with a propose message from the new scheduled proposer + proposeMsgFromNewProposer := process.Propose{ + Height: currentHeight +1, + Round: 0, + ValidRound: processutil.RandomRound(r), + Value: proposedValue, + From: newsSheduledProposer, + } + p.Propose(proposeMsgFromNewProposer) + + // Expect the process to commit a new height + Expect(p.State.CurrentHeight).To(Equal(currentHeight + 2)) + Expect(p.CurrentRound).To(Equal(process.Round(0))) + Expect(p.State.CurrentStep).To(Equal(defaultState.CurrentStep)) + Expect(p.State.LockedRound).To(Equal(defaultState.LockedRound)) + Expect(p.State.LockedValue).To(Equal(defaultState.LockedValue)) + Expect(p.State.ValidValue).To(Equal(defaultState.ValidValue)) + Expect(p.State.ValidRound).To(Equal(defaultState.ValidRound)) + Expect(acknowledge).To(BeTrue()) + return true + } + Expect(quick.Check(loop, nil)).To(Succeed()) + }) + }) + Context("when the 2f+1 precommits are not all towards the same value", func() { It("should do nothing", func() { loop := func() bool { @@ -2795,8 +3015,9 @@ var _ = Describe("Process", func() { whoami := id.NewPrivKey().Signatory() f := 5 + (r.Int() % 10) committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { Fail("unexpectedly received a commit") + return 0, nil }, } broadcaster := processutil.BroadcasterCallbacks{ @@ -2860,8 +3081,9 @@ var _ = Describe("Process", func() { whoami := id.NewPrivKey().Signatory() f := 5 + (r.Int() % 10) committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { Fail("unexpectedly received a commit") + return 0, nil }, } scheduler := scheduler.NewRoundRobin([]id.Signatory{id.NewPrivKey().Signatory()}) @@ -2910,8 +3132,9 @@ var _ = Describe("Process", func() { whoami := id.NewPrivKey().Signatory() f := 5 + (r.Int() % 10) committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { Fail("unexpectedly received a commit") + return 0, nil }, } validator := processutil.MockValidator{MockValid: func(process.Height, process.Round, process.Value) bool { return false }} @@ -2962,10 +3185,11 @@ var _ = Describe("Process", func() { acknowledge := false proposedValue := processutil.RandomGoodValue(r) committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { Expect(height).To(Equal(currentHeight)) Expect(value).To(Equal(proposedValue)) acknowledge = true + return 0, nil }, } p := process.New(whoami, f, nil, nil, nil, nil, nil, committer, nil) @@ -3009,8 +3233,9 @@ var _ = Describe("Process", func() { f := 5 + (r.Int() % 10) proposedValue := processutil.RandomGoodValue(r) committer := processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { Fail("unexpectedly received a commit") + return 0, nil }, } p := process.New(whoami, f, nil, nil, nil, nil, nil, committer, nil) diff --git a/process/processutil/processutil.go b/process/processutil/processutil.go index 08cd9a5..5752b72 100644 --- a/process/processutil/processutil.go +++ b/process/processutil/processutil.go @@ -42,15 +42,15 @@ func (broadcaster BroadcasterCallbacks) BroadcastPrecommit(precommit process.Pre // CommitterCallback provides a callback function to test the Committer // behaviour required by a Process type CommitterCallback struct { - Callback func(process.Height, process.Value) + Callback func(process.Height, process.Value) (uint64, process.Scheduler) } // Commit passes the commitment parameters height and round to the commit callback, if present -func (committer CommitterCallback) Commit(height process.Height, value process.Value) { +func (committer CommitterCallback) Commit(height process.Height, value process.Value) (uint64, process.Scheduler) { if committer.Callback == nil { - return + return 0, nil } - committer.Callback(height, value) + return committer.Callback(height, value) } // MockProposer is a mock implementation of the Proposer interface diff --git a/replica/replica.go b/replica/replica.go index eebda6e..df9303f 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -118,29 +118,30 @@ func (replica *Replica) Run(ctx context.Context) { if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPropose(m) case process.Prevote: if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPrevote(m) case process.Precommit: if !replica.filterHeight(m.Height) { return } - if !replica.filterFrom(m.From) { - return - } replica.mq.InsertPrecommit(m) - case process.Height: - replica.proc.State = process.DefaultState().WithCurrentHeight(m) - replica.mq.DropMessagesBelowHeight(m) + case ResetHeightMessage: + replica.proc.State = process.DefaultState().WithCurrentHeight(m.height) + replica.mq.DropMessagesBelowHeight(m.height) + + // If the signatories change in the new height + if len(m.signatories) != 0 { + f := len(m.signatories) / 3 + replica.proc.StartWithNewSignatories(uint64(f), m.scheduler) + replica.procsAllowed = map[id.Signatory]bool{} + for _, sig := range m.signatories { + replica.procsAllowed[sig] = true + } + } case getState: m.responder <- getStateResponse{ height: replica.proc.CurrentHeight, @@ -224,13 +225,18 @@ func (replica *Replica) TimeoutPrecommit(ctx context.Context, timeout timer.Time // // NOTE: All messages that are currently in the message queue for heights less // than the given height will be dropped. -func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height) { +func (replica *Replica) ResetHeight(ctx context.Context, newHeight process.Height, signatories []id.Signatory) { if newHeight <= replica.proc.State.CurrentHeight { return } + message := ResetHeightMessage{ + height: newHeight, + signatories: signatories, + scheduler: scheduler.NewRoundRobin(signatories), + } select { case <-ctx.Done(): - case replica.mch <- newHeight: + case replica.mch <- message: } } @@ -266,10 +272,6 @@ func (replica *Replica) filterHeight(height process.Height) bool { return height >= replica.proc.CurrentHeight } -func (replica *Replica) filterFrom(from id.Signatory) bool { - return replica.procsAllowed[from] -} - func (replica *Replica) flush() { for { n := replica.mq.Consume( @@ -277,9 +279,16 @@ func (replica *Replica) flush() { replica.proc.Propose, replica.proc.Prevote, replica.proc.Precommit, + replica.procsAllowed, ) if n == 0 { return } } } + +type ResetHeightMessage struct { + height process.Height + signatories []id.Signatory + scheduler process.Scheduler +} diff --git a/replica/replica_test.go b/replica/replica_test.go index fcfe230..e814a6f 100644 --- a/replica/replica_test.go +++ b/replica/replica_test.go @@ -155,7 +155,7 @@ var _ = Describe("Replica", func() { }, // Committer processutil.CommitterCallback{ - Callback: func(height process.Height, value process.Value) { + Callback: func(height process.Height, value process.Value) (uint64, process.Scheduler) { // add to the map of commits mqMutex.Lock() (*commits)[replicaIndex][height] = value @@ -165,6 +165,7 @@ var _ = Describe("Replica", func() { if height == targetHeight { completionSignal <- true } + return 0, nil }, }, // Catcher