Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus] Add MinBlockTime to delay mempool reaping #924

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8608bce
[WIP] Add MinBlockTime
red-0ne Jul 21, 2023
4a25b62
[Consensus] Feat: Configurable min block production time
red-0ne Jul 25, 2023
1028ae9
[Consensus] Refactor: decouple timer registration from subscription
red-0ne Jul 26, 2023
a7fd743
address review comments
red-0ne Jul 31, 2023
867e841
[Docs] Update development docs to warn to not use the changelog hook …
h5law Jul 24, 2023
0d448c9
[IBC] chore: Rename FlushAllEntries => FlushCachesToStore (#934)
h5law Jul 24, 2023
accccfc
[Utility] Feat: add client-side session cache (#888)
adshmh Jul 25, 2023
3165b8d
[IBC] Clone `cosmos/ics23` protobuf definitions into IBC repo (#922)
h5law Jul 26, 2023
990321e
[CLI] Consistent config/flag parsing & common helpers (#891)
bryanchriswhite Jul 26, 2023
21d7024
[IBC] Change Events to not have a Height field and use uint64 in quer…
h5law Jul 26, 2023
c67fa14
[IBC] Add ICS-02 Client Interfaces (#932)
h5law Jul 26, 2023
db8d8d6
[Persistence] Adds `node` subcommand to CLI (#935)
dylanlott Jul 26, 2023
74a5816
[IBC] chore: enable IBC module in k8s validators (#941)
h5law Jul 27, 2023
950ccc3
[Utility] Use TreeStore as source of truth (#937)
h5law Jul 27, 2023
d3bf0ad
[IBC] Enable validators and thus IBC host creation in K8s (#942)
h5law Jul 28, 2023
c903ca1
[Utility] Create trustless_relay_validation.md (#938)
adshmh Jul 31, 2023
298b08f
[Persistence] Adds atomic Update for TreeStore (#861)
dylanlott Jul 31, 2023
a68af5c
[chore] Replaces multierr usage with go native errors package (#939)
dylanlott Jul 31, 2023
0941549
hack: 😴 sleep enough for cli debug subcommands to broadcast (#954)
0xBigBoss Jul 31, 2023
50f8846
DevLog 12 (#957)
Olshansk Aug 1, 2023
e0e9fd4
[Utility] servicer signs relays (#952)
adshmh Aug 1, 2023
2a226cc
[LocalNet] Fix metrics scraping (#940)
okdas Aug 1, 2023
6c7599e
prevent sending to closed channels
red-0ne Aug 2, 2023
92ece19
disable block preparation delay when manual mode is on
red-0ne Aug 4, 2023
fef4217
[E2E Test] Utilities for State Sync Test (#874)
Olshansk Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions consensus/e2e_tests/pacemaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,113 @@ func forcePacemakerTimeout(t *testing.T, clockMock *clock.Mock, paceMakerTimeout
advanceTime(t, clockMock, paceMakerTimeout+10*time.Millisecond)
}

func TestPacemaker_MinBlockTime_DelayBlockPrep(t *testing.T) {
// Test preparation
clockMock := clock.NewMock()
timeReminder(t, clockMock, time.Second)

// UnitTestNet configs
paceMakerTimeoutMsec := uint64(300000)
consensusMessageTimeout := time.Duration(paceMakerTimeoutMsec / 5) // Must be smaller than pacemaker timeout because we expect a deterministic number of consensus messages.
paceMakerMinBlockTimeMsec := uint64(paceMakerTimeoutMsec / 6)
runtimeMgrs := GenerateNodeRuntimeMgrs(t, numValidators, clockMock)
for _, runtimeConfig := range runtimeMgrs {
consCfg := runtimeConfig.GetConfig().Consensus.PacemakerConfig
consCfg.TimeoutMsec = paceMakerTimeoutMsec
consCfg.MinBlockTimeMsec = paceMakerMinBlockTimeMsec
}
buses := GenerateBuses(t, runtimeMgrs)

// Create & start test pocket nodes
eventsChannel := make(modules.EventsChannel, 100)
pocketNodes := CreateTestConsensusPocketNodes(t, buses, eventsChannel)
err := StartAllTestPocketNodes(t, pocketNodes)
require.NoError(t, err)

// First round ever has leaderId=2 ((height+round+step-1)%numValidators)+1
// See: consensus/leader_election/module.go#electNextLeaderDeterministicRoundRobin
leaderId := typesCons.NodeId(2)
numReplicas := len(pocketNodes) - 1

// Debug message to start consensus by triggering next view
// Get leader out of replica set
for _, pocketNode := range pocketNodes {
TriggerNextView(t, pocketNode)

// Right after triggering the next view
// Consensus started and all nodes are at NewRound step
step := typesCons.HotstuffStep(pocketNode.GetBus().GetConsensusModule().CurrentStep())
require.Equal(t, consensus.NewRound, step)
}

newRoundMessages, err := WaitForNetworkConsensusEvents(
t, clockMock, eventsChannel, typesCons.HotstuffStep(consensus.NewRound), typesCons.HotstuffMessageType(consensus.NewRound),
numReplicas, // We want new round messages from replicas only
consensusMessageTimeout, false,
)
require.NoError(t, err)

// Broadcast new round messages to leader to build a block
broadcastMessages(t, newRoundMessages, IdToNodeMapping{leaderId: pocketNodes[leaderId]})

var step typesCons.HotstuffStep
minTimeIncrement := 1 * time.Millisecond // Min time it takes to switch from NewRound to Prepare step

// Give go routines time to trigger
advanceTime(t, clockMock, 0)

// We get consensus module from leader to get its POV
leaderConsensusModule := pocketNodes[leaderId].GetBus().GetConsensusModule()

// Make sure all nodes are aligned to the same leader
for _, pocketNode := range pocketNodes {
nodeLeader := pocketNode.GetBus().GetConsensusModule().GetLeaderForView(1, 0, uint8(consensus.NewRound))
require.Equal(t, typesCons.NodeId(nodeLeader), leaderId)
}

// Timer is blocking the proposal step
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())
require.Equal(t, consensus.NewRound, step)

// Advance time right before minBlockTime triggers
paceMakerPreActivationTime := time.Duration(paceMakerMinBlockTimeMsec*uint64(time.Millisecond)) - minTimeIncrement
advanceTime(t, clockMock, paceMakerPreActivationTime)

// Should still be blocking proposal step
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())
require.Equal(t, consensus.NewRound, step)

// Advance time just enough to trigger minBlockTime
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
advanceTime(t, clockMock, minTimeIncrement)
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())

// Time advanced by minBlockTime
require.Equal(t, uint64(clockMock.Now().UnixMilli()), paceMakerMinBlockTimeMsec)
// Leader is at proposal step
require.Equal(t, consensus.Prepare, step)
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: Block preparation triggers ASAP if conditions are met AFTER minBlockTime has triggered.
func TestPacemaker_MinBlockTime_BlockPrepAsapAfterTrigger(t *testing.T) {
t.Skip()
}

// TODO: Block preparation is always discarded if a new one with better QC is received within minBlockTime.
func TestPacemaker_MinBlockTime_AllowOnlyLatestBlockPrep(t *testing.T) {
t.Skip()
}

// TODO: Mempool reaped is the one present at minBlockTime or later.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// Since leader could just reap an earlier mempool and just blocks broadcasting the proposal
func TestPacemaker_MinBlockTime_DelayReapMempool(t *testing.T) {
t.Skip()
}

// TODO: Successive blocks timings are at least minBlockTime apart.
func TestPacemaker_MinBlockTime_ConsecutiveBlocksAtLeastMinBlockTimeApart(t *testing.T) {
t.Skip()
}

// TODO: Implement these tests and use them as a starting point for new ones. Consider using ChatGPT to help you out :)

func TestPacemakerDifferentHeightsCatchup(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions consensus/hotstuff_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
return
}

// DISCUSS: Do we need to pause for `MinBlockFreqMSec` here to let more transactions or should we stick with optimistic responsiveness?

if err := m.didReceiveEnoughMessageForStep(NewRound); err != nil {
m.logger.Info().Fields(hotstuffMsgToLoggingFields(msg)).Msgf("⏳ Waiting ⏳for more messages; %s", err.Error())
return
Expand Down Expand Up @@ -64,6 +62,14 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
// TODO: Add test to make sure same block is not applied twice if round is interrupted after being 'Applied'.
// TODO: Add more unit tests for these checks...
if m.shouldPrepareNewBlock(highPrepareQC) {
// Leader should prepare a new block. Introducing a delay based on configurations.
m.paceMaker.StartMinBlockTimeDelay()

// This function delays block preparation and returns false if a concurrent preparation request with higher QC is available
if shouldPrepareBlock := m.paceMaker.DelayBlockPreparation(); !shouldPrepareBlock {
m.logger.Info().Msg("skip prepare new block, a candidate with higher QC is available")
return
}
block, err := m.prepareBlock(highPrepareQC)
if err != nil {
m.logger.Error().Err(err).Msg(typesCons.ErrPrepareBlock.Error())
Expand Down
109 changes: 106 additions & 3 deletions consensus/pacemaker/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pacemaker
import (
"context"
"fmt"
"sync"
"time"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
Expand Down Expand Up @@ -38,6 +39,8 @@ type Pacemaker interface {
PacemakerDebug

ShouldHandleMessage(message *typesCons.HotstuffMessage) (bool, error)
StartMinBlockTimeDelay()
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
DelayBlockPreparation() bool
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

RestartTimer()
NewHeight()
Expand All @@ -48,9 +51,10 @@ type pacemaker struct {
base_modules.IntegrableModule
base_modules.InterruptableModule

pacemakerCfg *configs.PacemakerConfig
roundTimeout time.Duration
roundCancelFunc context.CancelFunc
pacemakerCfg *configs.PacemakerConfig
roundTimeout time.Duration
roundCancelFunc context.CancelFunc
prepareStepDelayer prepareStepDelayer

// Only used for development and debugging.
debug pacemakerDebug
Expand All @@ -60,6 +64,17 @@ type pacemaker struct {
logPrefix string
}

// prepareStepDelayer delays block preparation (mempool reaping)
// by adding a delay before the next prepare request to prevent block creation
type prepareStepDelayer struct {
m sync.Mutex // mutex locking access to this structure and prevent inconsistent state
ch chan bool // whenever there is a block proposal request arriving before the timeout, this channel is used to signal to it to whether build the block or not (if better candidate request with higher QC)
cancelFunc context.CancelFunc // cancels an ongoing timeout. It should not happen, but future code changes may no longer preserve this guarantee, so this feature maintain its own cancellation logic
shouldProposeBlock bool // a flag to capture whether a block was proposed, so later calls will skip building the block
delayExhausted bool // the delay for this step/round has already passed, so should create the block ASAP
minBlockTime time.Duration // the minimum time to wait before trying to propose a block
}

func CreatePacemaker(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(pacemaker).Create(bus, options...)
}
Expand All @@ -85,6 +100,14 @@ func (*pacemaker) Create(bus modules.Bus, options ...modules.ModuleOption) (modu
debugTimeBetweenStepsMsec: m.pacemakerCfg.GetDebugTimeBetweenStepsMsec(),
quorumCertificate: nil,
}
m.prepareStepDelayer = prepareStepDelayer{
m: sync.Mutex{},
ch: nil,
cancelFunc: nil,
shouldProposeBlock: false,
delayExhausted: false,
minBlockTime: time.Duration(m.pacemakerCfg.MinBlockTimeMsec * uint64(time.Millisecond)),
}

return m, nil
}
Expand Down Expand Up @@ -243,6 +266,86 @@ func (m *pacemaker) NewHeight() {
)
}

// StartMinBlockTimeDelay should be called when a delay should be introduced into proposing a new block
func (m *pacemaker) StartMinBlockTimeDelay() {
// Discard any previous timer if one exists
if m.prepareStepDelayer.cancelFunc != nil {
m.logger.Warn().Msg("RegisterMinBlockTimeDelay has an existing timer which should not happen. Releasing for now...")
m.prepareStepDelayer.cancelFunc()
}

m.prepareStepDelayer.shouldProposeBlock = false
m.prepareStepDelayer.delayExhausted = false

ctx, cancel := context.WithCancel(context.TODO())
m.prepareStepDelayer.cancelFunc = cancel

// Start a timer to wait for the MinBlockTimeMsec delay
// If a channel is provided, signal when the timer expires to it
go func() {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
select {
// only called if the delay timer is explicitly cancelled
case <-ctx.Done():
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
return
// called after the minimum block delay is exhausted meaning it is time to propose a new block
case <-m.GetBus().GetRuntimeMgr().GetClock().After(m.prepareStepDelayer.minBlockTime):
m.prepareStepDelayer.m.Lock()
defer m.prepareStepDelayer.m.Unlock()

// After the timeout, if there was any `HotstuffLeaderMessageHandler.HandleNewRoundMessage()` call delayed to propose a block, unblock it by emitting true
if m.prepareStepDelayer.ch != nil {
m.prepareStepDelayer.ch <- true
close(m.prepareStepDelayer.ch)
m.prepareStepDelayer.shouldProposeBlock = true
}

// From now on, build the block ASAP
m.prepareStepDelayer.delayExhausted = true

// No need to cancel the context anymore
m.prepareStepDelayer.cancelFunc = nil
}
}()
}

// DelayBlockPreparation is called when conditions are met by the leader to build a block but still needs to wait for the MinBlockTimeMsec delay before reaping the mempool.
// With MinBlockTimeMsec delay, multiple concurrent calls may happen
// DelayBlockPreparation is a synchronous blocking function that waits for channel to emit whether to propose a block or not given multiple HotstuffLeaderMessageHandler.HandleNewRoundMessage calling this function concurrently.
// It makes sure that:
// - Block proposal is made by only one of the possible `HotstuffLeaderMessageHandler.HandleNewRoundMessage()` concurrent (because delayed) calls
// - If the timer expires, the first call to this method will trigger the block proposal
// - If a late message is received AFTER the a block is marked as proposed by another call, the late message is discarded
// - Reads and assignments to pacemaker.prepareStepDelayer state are protected by a mutex
func (m *pacemaker) DelayBlockPreparation() bool {
m.prepareStepDelayer.m.Lock()

red-0ne marked this conversation as resolved.
Show resolved Hide resolved
if m.prepareStepDelayer.shouldProposeBlock {
m.prepareStepDelayer.m.Unlock()
return false
}

// If there already is a channel signaling block proposal, make sure it does not propose a block
if m.prepareStepDelayer.ch != nil {
m.prepareStepDelayer.ch <- false
close(m.prepareStepDelayer.ch)
}

// Deadline has passed, no need to have a channel, propose a block now
if m.prepareStepDelayer.delayExhausted {
m.prepareStepDelayer.shouldProposeBlock = true
m.prepareStepDelayer.m.Unlock()
return true
}

// We still need to wait, create a channel and discard the old candidate if any
ch := make(chan bool)
m.prepareStepDelayer.ch = ch
// We cannot defer the unlock here because the channel read is blocking
m.prepareStepDelayer.m.Unlock()

return <-ch
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *pacemaker) startNextView(qc *typesCons.QuorumCertificate, forceNextView bool) {
defer m.RestartTimer()

Expand Down
1 change: 1 addition & 0 deletions runtime/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewDefaultConfig(options ...func(*Config)) *Config {
TimeoutMsec: defaults.DefaultPacemakerTimeoutMsec,
Manual: defaults.DefaultPacemakerManual,
DebugTimeBetweenStepsMsec: defaults.DefaultPacemakerDebugTimeBetweenStepsMsec,
MinBlockTimeMsec: defaults.DefaultPacemakerMinBlockTimeMsec,
},
},
Utility: &UtilityConfig{
Expand Down
3 changes: 3 additions & 0 deletions runtime/configs/proto/consensus_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ message PacemakerConfig {
uint64 timeout_msec = 1;
bool manual = 2;
uint64 debug_time_between_steps_msec = 3;
// consenus could produce blocks as soon as a quorum is reached; responsivness per the Hotstuff whitepaper.
// This option allows to set min time between blocks and gives more time to the mempool to fill up; similar to timeout_propose in Tendermint.
uint64 min_block_time_msec = 4;
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions runtime/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
DefaultPacemakerTimeoutMsec = uint64(10000)
DefaultPacemakerManual = true
DefaultPacemakerDebugTimeBetweenStepsMsec = uint64(1000)
DefaultPacemakerMinBlockTimeMsec = uint64(5000) // 5 seconds
// utility
DefaultUtilityMaxMempoolTransactionBytes = uint64(1024 ^ 3) // 1GB V0 defaults
DefaultUtilityMaxMempoolTransactions = uint32(9000)
Expand Down
1 change: 1 addition & 0 deletions runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,7 @@ func TestNewManagerFromReaders(t *testing.T) {
TimeoutMsec: 10000,
Manual: true,
DebugTimeBetweenStepsMsec: 1000,
MinBlockTimeMsec: 5000,
},
ServerModeEnabled: true,
},
Expand Down