diff --git a/config/config.go b/config/config.go index ac43d0441..191f84359 100644 --- a/config/config.go +++ b/config/config.go @@ -234,6 +234,13 @@ type BaseConfig struct { //nolint: maligned // Default: 0 DeadlockDetection time.Duration `mapstructure:"deadlock-detection"` + // SyncTimeout is the timeout for the initial sync process, before switching to consensus. + // If zero or empty, the default value is used. + // + // Default: 60s + SyncTimeout time.Duration `mapstructure:"sync-timeout"` + + // Other options should be empty Other map[string]interface{} `mapstructure:",remain"` } @@ -250,6 +257,7 @@ func DefaultBaseConfig() BaseConfig { DBBackend: "goleveldb", DBPath: "data", DeadlockDetection: 0, + SyncTimeout: 60 * time.Second, } } diff --git a/config/config_test.go b/config/config_test.go index 03c70fe81..a667aabd3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -24,6 +24,8 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, "/foo/bar", cfg.GenesisFile()) assert.Equal(t, "/opt/data", cfg.DBDir()) + + assert.Equal(t, 60*time.Second, cfg.BaseConfig.SyncTimeout) } func TestConfigValidateBasic(t *testing.T) { diff --git a/config/toml.go b/config/toml.go index b6be127b2..a7183369c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -137,6 +137,12 @@ filter-peers = {{ .BaseConfig.FilterPeers }} # Default: 0 deadlock-detection = "{{ .BaseConfig.DeadlockDetection }}" +# Timeout for the initial sync process, before switching to consensus. +# If zero or empty, the default value is used. +# +# Default: 60s +sync-timeout = "{{ .BaseConfig.SyncTimeout }}" + ####################################################### ### ABCI App Connection Options ### ####################################################### diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 822550f16..3c83fe665 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -30,7 +30,7 @@ const ( switchToConsensusIntervalSeconds = 1 // switch to consensus after this duration of inactivity - syncTimeout = 60 * time.Second + defaultSyncTimeout = 60 * time.Second ) type consensusReactor interface { @@ -62,6 +62,8 @@ type Reactor struct { eventBus *eventbus.EventBus syncStartTime time.Time + // syncTimeout defines how much time we will try to start sync before switching to consensus + syncTimeout time.Duration nodeProTxHash types.ProTxHash @@ -94,6 +96,7 @@ func NewReactor( metrics: metrics, eventBus: eventBus, nodeProTxHash: nodeProTxHash, + syncTimeout: defaultSyncTimeout, executor: newBlockApplier( blockExec, store, @@ -106,6 +109,12 @@ func NewReactor( return r } +func (r *Reactor) WithSyncTimeout(timeout time.Duration) *Reactor { + r.syncTimeout = timeout + + return r +} + // OnStart starts separate go routines for each p2p Channel and listens for // envelopes on each. In addition, it also listens for peer updates and handles // messages on that p2p channel accordingly. The caller must be sure to execute @@ -130,7 +139,10 @@ func (r *Reactor) OnStart(ctx context.Context) error { startHeight = state.InitialHeight } - r.synchronizer = NewSynchronizer(startHeight, r.p2pClient, r.executor, WithLogger(r.logger)) + r.synchronizer = NewSynchronizer(startHeight, r.p2pClient, r.executor, + WithLogger(r.logger), + WithSyncTimeout(r.syncTimeout), + ) if r.blockSyncFlag.Load() { if err := r.synchronizer.Start(ctx); err != nil { return err diff --git a/internal/blocksync/synchronizer.go b/internal/blocksync/synchronizer.go index d4812731c..aed6670ad 100644 --- a/internal/blocksync/synchronizer.go +++ b/internal/blocksync/synchronizer.go @@ -70,6 +70,8 @@ type ( logger log.Logger lastAdvance time.Time + // syncTimeout defines how much time we will try to sync; defaults to 60 seconds + syncTimeout time.Duration mtx sync.RWMutex @@ -112,6 +114,12 @@ func WithClock(clock clockwork.Clock) OptionFunc { } } +func WithSyncTimeout(timeout time.Duration) OptionFunc { + return func(v *Synchronizer) { + v.syncTimeout = timeout + } +} + // NewSynchronizer returns a new Synchronizer with the height equal to start func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApplier, opts ...OptionFunc) *Synchronizer { peerStore := NewInMemPeerStore() @@ -127,6 +135,7 @@ func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApp height: start, workerPool: workerpool.New(poolWorkerSize, workerpool.WithLogger(logger)), pendingToApply: map[int64]BlockResponse{}, + syncTimeout: defaultSyncTimeout, } for _, opt := range opts { opt(bp) @@ -239,14 +248,14 @@ func (s *Synchronizer) WaitForSync(ctx context.Context) { lastAdvance = s.LastAdvance() isCaughtUp = s.IsCaughtUp() ) - if isCaughtUp || time.Since(lastAdvance) > syncTimeout { + if isCaughtUp || time.Since(lastAdvance) > s.syncTimeout { return } s.logger.Info( "not caught up yet", "height", height, "max_peer_height", s.MaxPeerHeight(), - "timeout_in", syncTimeout-time.Since(lastAdvance), + "timeout_in", s.syncTimeout-time.Since(lastAdvance), ) } } diff --git a/node/node.go b/node/node.go index f69b7d196..83ef4c8ba 100644 --- a/node/node.go +++ b/node/node.go @@ -367,7 +367,7 @@ func makeNode( blockSync && !stateSync, nodeMetrics.consensus, eventBus, - ) + ).WithSyncTimeout(cfg.SyncTimeout) node.services = append(node.services, bcReactor) node.rpcEnv.BlockSyncReactor = bcReactor