Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose node component management #6769

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/access/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"github.com/onflow/flow-go/cmd"
nodebuilder "github.com/onflow/flow-go/cmd/access/node_builder"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -24,5 +26,5 @@ func main() {
if err != nil {
builder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
3 changes: 2 additions & 1 deletion cmd/collection/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -646,7 +647,7 @@ func main() {
if err != nil {
nodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}

// createQCContractClient creates QC contract client
Expand Down
3 changes: 2 additions & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -933,7 +934,7 @@ func main() {
if err != nil {
nodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}

func loadBeaconPrivateKey(dir string, myID flow.Identifier) (*encodable.RandomBeaconPrivKey, error) {
Expand Down
4 changes: 3 additions & 1 deletion cmd/execution/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/model/flow"
)
Expand All @@ -19,5 +21,5 @@ func main() {
if err != nil {
exeBuilder.FlowNodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
4 changes: 3 additions & 1 deletion cmd/ghost/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"github.com/spf13/pflag"

"github.com/onflow/flow-go/cmd"
Expand Down Expand Up @@ -45,5 +47,5 @@ func main() {
if err != nil {
nodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
66 changes: 51 additions & 15 deletions cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,61 @@ type Node interface {
// Run initiates all common components (logger, database, protocol state etc.)
// then starts each component. It also sets up a channel to gracefully shut
// down each component if a SIGINT is received.
Run()
// The context can also be used to signal the node to shutdown.
Run(ctx context.Context)
}

// FlowNodeImp is created by the FlowNodeBuilder with all components ready to be started.
// The Run function starts all the components, and is blocked until either a termination
// signal is received or a irrecoverable error is encountered.
type FlowNodeImp struct {
component.Component
NodeImp
*NodeConfig
}

// NodeImp can be used to create a node instance from:
// - a logger: to be used during startup and shutdown
// - a component: that will be started with Run
// - a cleanup function: that will be called after the component has been stopped
// - a fatal error handler: to handle any error received from the component
type NodeImp struct {
component.Component
logger zerolog.Logger
postShutdown func() error
fatalHandler func(error)
}

// NewNode returns a new node instance
func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logger, cleanup func() error, handleFatal func(error)) Node {
func NewNode(
component component.Component,
cfg *NodeConfig,
logger zerolog.Logger,
cleanup func() error,
handleFatal func(error),
) Node {
return &FlowNodeImp{
NodeConfig: cfg,
NodeImp: NewBaseNode(
component,
logger.With().
Str("node_role", cfg.BaseConfig.NodeRole).
Hex("spork_id", logging.ID(cfg.SporkID)).
Comment on lines +62 to +63
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change would cause every single line of log to include this two fields, is it necessary?
To me, the node role and spork id does not change after startup, so they only need to be logged once, as long as they can be found easily, we don't have to include them in other logs.

I would suggest to move the role and spork id info back to node startup complete log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logger is used only in this file, which is like 4 log lines. I don't thing the impact is that big.

Logger(),
cleanup,
handleFatal,
),
}
}

// NewBaseNode returns a new base node instance
func NewBaseNode(
component component.Component,
logger zerolog.Logger,
cleanup func() error,
handleFatal func(error),
) NodeImp {
return NodeImp{
Component: component,
NodeConfig: cfg,
logger: logger,
postShutdown: cleanup,
fatalHandler: handleFatal,
Expand All @@ -51,13 +87,10 @@ func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logg
// which point it gracefully shuts down.
// Any unhandled irrecoverable errors thrown in child components will propagate up to here and
// result in a fatal error.
func (node *FlowNodeImp) Run() {
// Cancelling this context notifies all child components that it's time to shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func (node *NodeImp) Run(ctx context.Context) {

// Block until node is shutting down
err := node.run(ctx, cancel)
err := node.run(ctx)

// Any error received is considered fatal.
if err != nil {
Expand All @@ -73,14 +106,18 @@ func (node *FlowNodeImp) Run() {
node.logger.Error().Err(err).Msg("error encountered during cleanup")
}

node.logger.Info().Msgf("%s node shutdown complete", node.BaseConfig.NodeRole)
node.logger.Info().Msg("node shutdown complete")
}

// run starts the node and blocks until a SIGINT/SIGTERM is received or an error is encountered.
// It returns:
// - nil if a termination signal is received, and all components have been gracefully stopped.
// - error if a irrecoverable error is received
func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) error {
// - error if an irrecoverable error is received
func (node *NodeImp) run(ctx context.Context) error {
// Cancelling this context notifies all child components that it's time to shut down
ctx, shutdown := context.WithCancel(ctx)
defer shutdown()

// Components will pass unhandled irrecoverable errors to this channel via signalerCtx (or a
// child context). Any errors received on this channel should halt the node.
signalerCtx, errChan := irrecoverable.WithSignaler(ctx)
Expand All @@ -97,8 +134,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e
select {
case <-node.Ready():
node.logger.Info().
Hex("spork_id", logging.ID(node.SporkID)).
Msgf("%s node startup complete", node.BaseConfig.NodeRole)
Msg("node startup complete")
case <-ctx.Done():
}
}()
Expand All @@ -118,7 +154,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e

// 3: Shut down
// Send shutdown signal to components
node.logger.Info().Msgf("%s node shutting down", node.BaseConfig.NodeRole)
node.logger.Info().Msg("node shutting down")
shutdown()

// Block here until all components have stopped or an irrecoverable error is received.
Expand Down
17 changes: 10 additions & 7 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
const NotSet = "not set"

type BuilderFunc func(nodeConfig *NodeConfig) error
type ReadyDoneFactory func(node *NodeConfig) (module.ReadyDoneAware, error)

// ReadyDoneFactory is a function that returns a ReadyDoneAware component or an error if
// the factory cannot create the component
type ReadyDoneFactory[Input any] func(input Input) (module.ReadyDoneAware, error)

// NodeBuilder declares the initialization methods needed to bootstrap up a Flow node
type NodeBuilder interface {
Expand Down Expand Up @@ -73,7 +76,7 @@ type NodeBuilder interface {
// The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance.
// In both cases, the object is started according to its interface when the node is run,
// and the node will wait for the component to exit gracefully.
Component(name string, f ReadyDoneFactory) NodeBuilder
Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder

// DependableComponent adds a new component to the node that conforms to the ReadyDoneAware
// interface. The builder will wait until all of the components in the dependencies list are ready
Expand All @@ -86,15 +89,15 @@ type NodeBuilder interface {
// IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all
// dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method
// MUST be idempotent.
DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder
DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder

// RestartableComponent adds a new component to the node that conforms to the ReadyDoneAware
// interface, and calls the provided error handler when an irrecoverable error is encountered.
// Use RestartableComponent if the component is not critical to the node's safe operation and
// can/should be independently restarted when an irrecoverable error is encountered.
//
// Any irrecoverable errors thrown by the component will be passed to the provided error handler.
RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder
RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder

// ShutdownFunc adds a callback function that is called after all components have exited.
// All shutdown functions are called regardless of errors returned by previous callbacks. Any
Expand Down Expand Up @@ -299,16 +302,16 @@ func DefaultBaseConfig() *BaseConfig {
// DependencyList is a slice of ReadyDoneAware implementations that are used by DependableComponent
// to define the list of dependencies that must be ready before starting the component.
type DependencyList struct {
components []module.ReadyDoneAware
Components []module.ReadyDoneAware
}

func NewDependencyList(components ...module.ReadyDoneAware) *DependencyList {
return &DependencyList{
components: components,
Components: components,
}
}

// Add adds a new ReadyDoneAware implementation to the list of dependencies.
func (d *DependencyList) Add(component module.ReadyDoneAware) {
d.components = append(d.components, component)
d.Components = append(d.Components, component)
}
49 changes: 44 additions & 5 deletions cmd/node_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"errors"
"os"
"syscall"
Expand Down Expand Up @@ -42,7 +43,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand All @@ -62,6 +63,44 @@ func TestRunShutsDownCleanly(t *testing.T) {
}, testLogger.logs)
})

t.Run("Run shuts down gracefully on context cancel", func(t *testing.T) {
testLogger.Reset()
manager := component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
testLogger.Log("worker starting up")
ready()
testLogger.Log("worker startup complete")

<-ctx.Done()
testLogger.Log("worker shutting down")
testLogger.Log("worker shutdown complete")
}).
Build()
node := NewNode(manager, nodeConfig, logger, postShutdown, fatalHandler)

ctx, cancel := context.WithCancel(context.Background())

finished := make(chan struct{})
go func() {
node.Run(ctx)
close(finished)
}()

<-node.Ready()

cancel()

<-finished

assert.Equal(t, []string{
"worker starting up",
"worker startup complete",
"worker shutting down",
"worker shutdown complete",
"running cleanup",
}, testLogger.logs)
})

t.Run("Run encounters error during postShutdown", func(t *testing.T) {
testLogger.Reset()
manager := component.NewComponentManagerBuilder().
Expand All @@ -82,7 +121,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down Expand Up @@ -123,7 +162,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down Expand Up @@ -157,7 +196,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down Expand Up @@ -191,7 +230,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

nodebuilder "github.com/onflow/flow-go/cmd/observer/node_builder"
)

Expand All @@ -22,5 +24,5 @@ func main() {
if err != nil {
anb.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
Loading
Loading