Skip to content

Commit

Permalink
refactor: simplify rebroadcast logic
Browse files Browse the repository at this point in the history
This keeps two message lists: one for the current round, one for the
previous. With separate fields for quality/decide. That way we don't
have to "garbage collect".
  • Loading branch information
Stebalien committed Sep 5, 2024
1 parent a4deda5 commit 72090ec
Showing 1 changed file with 54 additions and 56 deletions.
110 changes: 54 additions & 56 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ type instance struct {
// rebroadcast becomes necessary.
//
// See: broadcast, tryRebroadcast.
broadcasted *broadcastState
broadcasted broadcastState
// Supplemental data that all participants must agree on ahead of time. Messages that
// propose supplemental data that differs with our supplemental data will be discarded.
supplementalData *SupplementalData
Expand Down Expand Up @@ -237,7 +237,6 @@ func newInstance(
phase: INITIAL_PHASE,
supplementalData: data,
proposal: input,
broadcasted: newBroadcastState(),
value: ECChain{},
candidates: map[ChainKey]struct{}{
input.BaseChain().Key(): {},
Expand Down Expand Up @@ -940,33 +939,35 @@ func (i *instance) rebroadcastTimeoutElapsed() bool {
}

func (i *instance) rebroadcast() error {

// Rebroadcast all messages from the current and previous rounds, unless the
// Rebroadcast quality and all messages from the current and previous rounds, unless the
// instance has progressed to DECIDE phase. In which case, only DECIDE message is
// rebroadcasted.
//
// Note that the implementation here rebroadcasts more messages than FIP-0086
// strictly requires. Because, the cost of rebroadcasting additional messages is
// small compared to the reduction in need for rebroadcast.
//
// It is also not strictly required to sort the rebroadcasted messages. But we do
// so for deterministic behaviour, useful for testing.
var rounds []uint64
for round := range i.broadcasted.messagesByRound {
rounds = append(rounds, round)
}
slices.Sort(rounds)
for _, round := range rounds {
mbs := i.broadcasted.messagesByRound[round]
for _, mb := range mbs {
if err := i.participant.host.RequestBroadcast(mb); err != nil {
// Silently log the error and proceed. This is consistent with the behaviour of
// instance for regular broadcasts.
i.log("failed to request rebroadcast %s at round %d: %v", mb.Payload.Phase, mb.Payload.Round, err)
} else {
i.log("rebroadcasting %s at round %d for value %s", mb.Payload.Phase.String(), mb.Payload.Round, mb.Payload.Value)
metrics.reBroadcastCounter.Add(context.TODO(), 1)
}
var msgs []*MessageBuilder
if i.broadcasted.decide != nil {
msgs = append(msgs, i.broadcasted.decide)
} else {
// We rebroadcast messages newest to oldest, except quality which we always
// rebroadcast first. That way, if we try to rebroadcast too many at once and some
// get dropped, we've already rebroadcast the most useful messages.
msgs = append(msgs, i.broadcasted.previousRound...)
msgs = append(msgs, i.broadcasted.currentRound...)
if i.broadcasted.quality != nil {
msgs = append(msgs, i.broadcasted.quality)
}
slices.Reverse(msgs)
}
for _, mb := range msgs {
if err := i.participant.host.RequestBroadcast(mb); err != nil {
// Silently log the error and proceed. This is consistent with the behaviour of
// instance for regular broadcasts.
i.log("failed to request rebroadcast %s at round %d: %v", mb.Payload.Phase, mb.Payload.Round, err)
} else {
i.log("rebroadcasting %s at round %d for value %s", mb.Payload.Phase.String(), mb.Payload.Round, mb.Payload.Value)
metrics.reBroadcastCounter.Add(context.TODO(), 1)
}
}
return nil
Expand Down Expand Up @@ -1392,13 +1393,11 @@ func (c *convergeState) FindProposalFor(chain ECChain) ConvergeValue {
}

type broadcastState struct {
messagesByRound map[uint64][]*MessageBuilder
}

func newBroadcastState() *broadcastState {
return &broadcastState{
messagesByRound: make(map[uint64][]*MessageBuilder),
}
quality *MessageBuilder
decide *MessageBuilder
previousRound []*MessageBuilder
currentRound []*MessageBuilder
round uint64
}

// record stores messages that are required should rebroadcast becomes necessary.
Expand All @@ -1418,36 +1417,35 @@ func newBroadcastState() *broadcastState {
// - https://github.com/filecoin-project/FIPs/discussions/809#discussioncomment-10409902
// - https://github.com/filecoin-project/FIPs/discussions/809#discussioncomment-10424988
func (bs *broadcastState) record(mb *MessageBuilder) {
if bs.decide != nil {
return
}
switch mb.Payload.Phase {
case QUALITY_PHASE:
bs.quality = mb
case DECIDE_PHASE:
// Clear all previous messages, as only DECIDE message need to be rebroadcasted.
// Note that DECIDE message is not associated to any round, and is always
// broadcasted using round zero.
clear(bs.messagesByRound)
bs.messagesByRound[0] = []*MessageBuilder{mb}
case QUALITY_PHASE, CONVERGE_PHASE, PREPARE_PHASE, COMMIT_PHASE:
bs.messagesByRound[mb.Payload.Round] = append(bs.messagesByRound[mb.Payload.Round], mb)
// Remove all messages that are older than the latest two rounds, except QUALITY
// which only appears in round 0.
for round := int(mb.Payload.Round) - 2; round >= 0; round-- {
switch redundantRound := uint64(round); redundantRound {
case 0:
var found bool
for i, mb := range bs.messagesByRound[0] {
if mb.Payload.Phase == QUALITY_PHASE {
bs.messagesByRound[0] = bs.messagesByRound[0][i : i+1 : 1]
found = true
break
}
}
if !found {
log.Warn("No QUALITY message found for round 0 while trimming rebroadcast messages")
delete(bs.messagesByRound, redundantRound)
}
default:
delete(bs.messagesByRound, redundantRound)
}
// Note that DECIDE message is not associated to any round.
bs.currentRound = nil
bs.previousRound = nil
bs.quality = nil
bs.decide = mb
case CONVERGE_PHASE, PREPARE_PHASE, COMMIT_PHASE:
if mb.Payload.Round < bs.round {
log.Warnw("Unexpected broadcast of a message for a prior round")
return
}
switch mb.Payload.Round {
case bs.round:
bs.currentRound = append(bs.currentRound, mb)
case bs.round + 1:
bs.previousRound = bs.currentRound
bs.currentRound = []*MessageBuilder{mb}
default:
bs.previousRound = nil
bs.currentRound = []*MessageBuilder{mb}
}
bs.round = mb.Payload.Round
default:
// There should not be any message recorded with any other payload phase. Warn if
// we see any.
Expand Down

0 comments on commit 72090ec

Please sign in to comment.