Skip to content

Commit

Permalink
Propagate DECIDE messages from the previous instance (#607)
Browse files Browse the repository at this point in the history
* Propagate DECIDE messages from the previous instance

fixes #579

* don't assume that the tracer is non-nil

* test dropping old messages

* review nits
  • Loading branch information
Stebalien authored Aug 30, 2024
1 parent 7e9ab6e commit 094e430
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 15 deletions.
101 changes: 101 additions & 0 deletions gpbft/gpbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,104 @@ func TestGPBFT_Validation(t *testing.T) {
})
}
}

func TestGPBFT_DropOld(t *testing.T) {
t.Parallel()
participants := gpbft.PowerEntries{
gpbft.PowerEntry{
ID: 0,
Power: gpbft.NewStoragePower(1),
},
gpbft.PowerEntry{
ID: 1,
Power: gpbft.NewStoragePower(1),
},
}

// We define 3 instances, old, last, and new.

driver := emulator.NewDriver(t)
oldInstance := emulator.NewInstance(t, 0, participants, tipset0, tipSet1)
lastInstance := emulator.NewInstance(t, 1, participants, tipSet1, tipSet2)
newInstance := emulator.NewInstance(t, 2, participants, tipSet2)
driver.AddInstance(oldInstance)
driver.AddInstance(lastInstance)
driver.AddInstance(newInstance)

// We immediately skip to the "new" instance.
driver.StartInstance(2)

// All messages from the old instance should be dropped.

oldDecide := &gpbft.GMessage{
Sender: 1,
Vote: oldInstance.NewDecide(0, oldInstance.Proposal()),
Ticket: emulator.ValidTicket,
}
driver.RequireErrOnDeliverMessage(oldDecide, gpbft.ErrValidationTooOld, "message is for prior instance")

// Everything except decides from the last instance should be dropped.

lastCommit := &gpbft.GMessage{
Sender: 1,
Vote: lastInstance.NewCommit(3, lastInstance.Proposal()),
Ticket: emulator.ValidTicket,
}
driver.RequireErrOnDeliverMessage(lastCommit, gpbft.ErrValidationTooOld, "message is for prior instance")

lastDecide := &gpbft.GMessage{
Sender: 1,
Vote: lastInstance.NewDecide(0, lastInstance.Proposal()),
Justification: lastInstance.NewJustification(0, gpbft.COMMIT_PHASE, lastInstance.Proposal(), 0, 1),
Ticket: emulator.ValidTicket,
}
driver.RequireDeliverMessage(lastDecide)

// Everything should be delivered for the new instance.

newQuality := &gpbft.GMessage{
Sender: 1,
Vote: newInstance.NewQuality(newInstance.Proposal()),
Ticket: emulator.ValidTicket,
}
newCommit0 := &gpbft.GMessage{
Sender: 0,
Vote: newInstance.NewCommit(0, newInstance.Proposal()),
Justification: newInstance.NewJustification(0, gpbft.PREPARE_PHASE, newInstance.Proposal(), 0, 1),
Ticket: emulator.ValidTicket,
}
newCommit1 := &gpbft.GMessage{
Sender: 1,
Vote: newInstance.NewCommit(0, newInstance.Proposal()),
Justification: newInstance.NewJustification(0, gpbft.PREPARE_PHASE, newInstance.Proposal(), 0, 1),
Ticket: emulator.ValidTicket,
}
newDecide0 := &gpbft.GMessage{
Sender: 0,
Vote: newInstance.NewDecide(0, newInstance.Proposal()),

Justification: newInstance.NewJustification(0, gpbft.COMMIT_PHASE, newInstance.Proposal(), 0, 1),
Ticket: emulator.ValidTicket,
}
newDecide1 := &gpbft.GMessage{
Sender: 1,
Vote: newInstance.NewDecide(0, newInstance.Proposal()),

Justification: newInstance.NewJustification(0, gpbft.COMMIT_PHASE, newInstance.Proposal(), 0, 1),
Ticket: emulator.ValidTicket,
}
driver.RequireDeliverMessage(newQuality)
driver.RequireDeliverMessage(newDecide0)
driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it
driver.RequireDeliverMessage(newDecide1)

// Once we've received two decides, we should reject messages from the "new" instance.

driver.RequireErrOnDeliverMessage(newCommit1, gpbft.ErrValidationTooOld, "message is for prior instance")

// And we should now reject decides from the "last" instance.
driver.RequireErrOnDeliverMessage(lastDecide, gpbft.ErrValidationTooOld, "message is for prior instance")

// But we should still accept decides from the latest instance.
driver.RequireDeliverMessage(newDecide0)
}
50 changes: 35 additions & 15 deletions gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er
}
}()

comt, err := p.fetchCommittee(msg.Vote.Instance)
comt, err := p.fetchCommittee(msg.Vote.Instance, msg.Vote.Step)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) {

// Drop messages for past instances.
if msg.Vote.Instance < p.currentInstance {
p.tracer.Log("dropping message from old instance %d while received in instance %d",
p.trace("dropping message from old instance %d while received in instance %d",
msg.Vote.Instance, p.currentInstance)
return nil
}
Expand Down Expand Up @@ -441,7 +441,7 @@ func (p *Participant) beginInstance() error {
return fmt.Errorf("invalid canonical chain: %w", err)
}

comt, err := p.fetchCommittee(p.currentInstance)
comt, err := p.fetchCommittee(p.currentInstance, INITIAL_PHASE)
if err != nil {
return err
}
Expand All @@ -453,9 +453,9 @@ func (p *Participant) beginInstance() error {
}
// Deliver any queued messages for the new instance.
queued := p.mqueue.Drain(p.gpbft.instanceID)
if p.tracer != nil {
if p.tracingEnabled() {
for _, msg := range queued {
p.tracer.Log("Delivering queued {%d} ← P%d: %v", p.gpbft.instanceID, msg.Sender, msg)
p.trace("Delivering queued {%d} ← P%d: %v", p.gpbft.instanceID, msg.Sender, msg)
}
}
if err := p.gpbft.ReceiveMany(queued); err != nil {
Expand All @@ -466,12 +466,17 @@ func (p *Participant) beginInstance() error {
}

// Fetches the committee against which to validate messages for some instance.
func (p *Participant) fetchCommittee(instance uint64) (*committee, error) {
func (p *Participant) fetchCommittee(instance uint64, phase Phase) (*committee, error) {
p.instanceMutex.Lock()
defer p.instanceMutex.Unlock()

// Reject messages for past instances.
if instance < p.currentInstance {
switch {
// Accept all messages from the current and future instances.
case instance >= p.currentInstance:
// Accept messages from the previous instance, but only for decide messages.
case instance == p.currentInstance-1 && phase == DECIDE_PHASE:
// Reject all others as too old.
default:
return nil, fmt.Errorf("instance %d, current %d: %w",
instance, p.currentInstance, ErrValidationTooOld)
}
Expand All @@ -498,7 +503,7 @@ func (p *Participant) handleDecision() {
decision := p.finishCurrentInstance()
nextStart, err := p.host.ReceiveDecision(decision)
if err != nil {
p.tracer.Log("failed to receive decision: %+v", err)
p.trace("failed to receive decision: %+v", err)
p.host.SetAlarm(time.Time{})
} else {
p.beginNextInstance(p.currentInstance + 1)
Expand All @@ -520,12 +525,17 @@ func (p *Participant) finishCurrentInstance() *Justification {
func (p *Participant) beginNextInstance(nextInstance uint64) {
p.instanceMutex.Lock()
defer p.instanceMutex.Unlock()
// Clean all messages queued and old committees for instances below the next one.
// Skip if there are none to avoid iterating from instance zero when starting up.
if len(p.mqueue.messages) > 0 || len(p.committees) > 0 {
for i := p.currentInstance; i < nextInstance; i++ {
delete(p.mqueue.messages, i)
delete(p.committees, i)
// Clean all messages queued and for instances below the next one.
for inst := range p.mqueue.messages {
if inst < nextInstance {
delete(p.mqueue.messages, inst)
}
}
// Clean committees from instances below the previous one. We keep the last committee so we
// can continue to validate and propagate DECIDE messages.
for inst := range p.committees {
if inst+1 < nextInstance {
delete(p.mqueue.messages, inst)
}
}
p.currentInstance = nextInstance
Expand All @@ -542,6 +552,16 @@ func (p *Participant) Describe() string {
return p.gpbft.Describe()
}

func (p *Participant) tracingEnabled() bool {
return p.tracer != nil
}

func (p *Participant) trace(format string, args ...any) {
if p.tracingEnabled() {
p.tracer.Log(format, args...)
}
}

// A power table and beacon value used as the committee inputs to an instance.
type committee struct {
power *PowerTable
Expand Down

0 comments on commit 094e430

Please sign in to comment.