From 8b0dff95cbc05ee61212055091c94deb0c5824d3 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 27 Nov 2024 21:05:47 +0100 Subject: [PATCH 1/3] Expose node component management --- cmd/node.go | 64 +++++++++++++++----- cmd/node_builder.go | 17 +++--- cmd/scaffold.go | 137 +++++++++++++++++++++++-------------------- cmd/scaffold_test.go | 2 +- 4 files changed, 133 insertions(+), 87 deletions(-) diff --git a/cmd/node.go b/cmd/node.go index f17b8181f5c..02ff4aad9ef 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -29,18 +29,53 @@ type Node interface { // The Run function starts all the components, and is blocked until either a termination // signal is received or a irrecoverable error is encountered. type FlowNodeImp struct { - component.Component + NodeImp *NodeConfig +} + +// NodeImp can be used to create a node instance from: +// - a logger: to be used during startup and shutdown +// - a component: that will be started with Run +// - a cleanup function: that will be called after the component has been stopped +// - a fatal error handler: to handle any error received from the component +type NodeImp struct { + component.Component logger zerolog.Logger postShutdown func() error fatalHandler func(error) } // NewNode returns a new node instance -func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logger, cleanup func() error, handleFatal func(error)) Node { +func NewNode( + component component.Component, + cfg *NodeConfig, + logger zerolog.Logger, + cleanup func() error, + handleFatal func(error), +) Node { return &FlowNodeImp{ + NodeConfig: cfg, + NodeImp: NewBaseNode( + component, + logger.With(). + Str("node_role", cfg.BaseConfig.NodeRole). + Hex("spork_id", logging.ID(cfg.SporkID)). + Logger(), + cleanup, + handleFatal, + ), + } +} + +// NewBaseNode returns a new base node instance +func NewBaseNode( + component component.Component, + logger zerolog.Logger, + cleanup func() error, + handleFatal func(error), +) NodeImp { + return NodeImp{ Component: component, - NodeConfig: cfg, logger: logger, postShutdown: cleanup, fatalHandler: handleFatal, @@ -51,13 +86,11 @@ func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logg // which point it gracefully shuts down. // Any unhandled irrecoverable errors thrown in child components will propagate up to here and // result in a fatal error. -func (node *FlowNodeImp) Run() { - // Cancelling this context notifies all child components that it's time to shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func (node *NodeImp) Run() { + ctx := context.Background() // Block until node is shutting down - err := node.run(ctx, cancel) + err := node.run(ctx) // Any error received is considered fatal. if err != nil { @@ -73,14 +106,18 @@ func (node *FlowNodeImp) Run() { node.logger.Error().Err(err).Msg("error encountered during cleanup") } - node.logger.Info().Msgf("%s node shutdown complete", node.BaseConfig.NodeRole) + node.logger.Info().Msg("node shutdown complete") } // run starts the node and blocks until a SIGINT/SIGTERM is received or an error is encountered. // It returns: // - nil if a termination signal is received, and all components have been gracefully stopped. -// - error if a irrecoverable error is received -func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) error { +// - error if an irrecoverable error is received +func (node *NodeImp) run(ctx context.Context) error { + // Cancelling this context notifies all child components that it's time to shut down + ctx, shutdown := context.WithCancel(ctx) + defer shutdown() + // Components will pass unhandled irrecoverable errors to this channel via signalerCtx (or a // child context). Any errors received on this channel should halt the node. signalerCtx, errChan := irrecoverable.WithSignaler(ctx) @@ -97,8 +134,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e select { case <-node.Ready(): node.logger.Info(). - Hex("spork_id", logging.ID(node.SporkID)). - Msgf("%s node startup complete", node.BaseConfig.NodeRole) + Msg("node startup complete") case <-ctx.Done(): } }() @@ -118,7 +154,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e // 3: Shut down // Send shutdown signal to components - node.logger.Info().Msgf("%s node shutting down", node.BaseConfig.NodeRole) + node.logger.Info().Msg("node shutting down") shutdown() // Block here until all components have stopped or an irrecoverable error is received. diff --git a/cmd/node_builder.go b/cmd/node_builder.go index 4c8aeaeb263..dc203f66ad4 100644 --- a/cmd/node_builder.go +++ b/cmd/node_builder.go @@ -33,7 +33,10 @@ import ( const NotSet = "not set" type BuilderFunc func(nodeConfig *NodeConfig) error -type ReadyDoneFactory func(node *NodeConfig) (module.ReadyDoneAware, error) + +// ReadyDoneFactory is a function that returns a ReadyDoneAware component or an error if +// the factory cannot create the component +type ReadyDoneFactory[Input any] func(input Input) (module.ReadyDoneAware, error) // NodeBuilder declares the initialization methods needed to bootstrap up a Flow node type NodeBuilder interface { @@ -73,7 +76,7 @@ type NodeBuilder interface { // The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance. // In both cases, the object is started according to its interface when the node is run, // and the node will wait for the component to exit gracefully. - Component(name string, f ReadyDoneFactory) NodeBuilder + Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder // DependableComponent adds a new component to the node that conforms to the ReadyDoneAware // interface. The builder will wait until all of the components in the dependencies list are ready @@ -86,7 +89,7 @@ type NodeBuilder interface { // IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all // dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method // MUST be idempotent. - DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder + DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder // RestartableComponent adds a new component to the node that conforms to the ReadyDoneAware // interface, and calls the provided error handler when an irrecoverable error is encountered. @@ -94,7 +97,7 @@ type NodeBuilder interface { // can/should be independently restarted when an irrecoverable error is encountered. // // Any irrecoverable errors thrown by the component will be passed to the provided error handler. - RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder + RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder // ShutdownFunc adds a callback function that is called after all components have exited. // All shutdown functions are called regardless of errors returned by previous callbacks. Any @@ -299,16 +302,16 @@ func DefaultBaseConfig() *BaseConfig { // DependencyList is a slice of ReadyDoneAware implementations that are used by DependableComponent // to define the list of dependencies that must be ready before starting the component. type DependencyList struct { - components []module.ReadyDoneAware + Components []module.ReadyDoneAware } func NewDependencyList(components ...module.ReadyDoneAware) *DependencyList { return &DependencyList{ - components: components, + Components: components, } } // Add adds a new ReadyDoneAware implementation to the list of dependencies. func (d *DependencyList) Add(component module.ReadyDoneAware) { - d.components = append(d.components, component) + d.Components = append(d.Components, component) } diff --git a/cmd/scaffold.go b/cmd/scaffold.go index be175a0fd12..34405e08b6d 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -107,12 +107,17 @@ type namedModuleFunc struct { name string } -type namedComponentFunc struct { - fn ReadyDoneFactory - name string +// NamedComponentFunc is wrapper for ReadyDoneFactory with additional fields: +// Name - name of the component +// ErrorHandler - error handler for the component +// Dependencies - list of dependencies for the component that should be ready before +// the component is started +type NamedComponentFunc[Input any] struct { + FN ReadyDoneFactory[Input] + Name string - errorHandler component.OnError - dependencies *DependencyList + ErrorHandler component.OnError + Dependencies *DependencyList } // FlowNodeBuilder is the default builder struct used for all flow nodes @@ -128,7 +133,7 @@ type FlowNodeBuilder struct { *NodeConfig flags *pflag.FlagSet modules []namedModuleFunc - components []namedComponentFunc + components []NamedComponentFunc[*NodeConfig] postShutdownFns []func() error preInitFns []BuilderFunc postInitFns []BuilderFunc @@ -1549,10 +1554,20 @@ func (fnb *FlowNodeBuilder) handleModules() error { return nil } -// handleComponents registers the component's factory method with the ComponentManager to be run +func (fnb *FlowNodeBuilder) handleComponents() error { + AddWorkersFromComponents(fnb.Logger, fnb.NodeConfig, fnb.componentBuilder, fnb.components) + return nil +} + +// AddWorkersFromComponents registers the component's factory method with the ComponentManager to be run // when the node starts. // It uses signal channels to ensure that components are started serially. -func (fnb *FlowNodeBuilder) handleComponents() error { +func AddWorkersFromComponents[Input any]( + log zerolog.Logger, + input Input, + componentBuilder component.ComponentManagerBuilder, + components []NamedComponentFunc[Input], +) { // The parent/started channels are used to enforce serial startup. // - parent is the started channel of the previous component. // - when a component is ready, it closes its started channel by calling the provided callback. @@ -1563,27 +1578,22 @@ func (fnb *FlowNodeBuilder) handleComponents() error { parent := make(chan struct{}) close(parent) - var err error - asyncComponents := []namedComponentFunc{} + asyncComponents := []NamedComponentFunc[Input]{} // Run all components - for _, f := range fnb.components { + for _, f := range components { // Components with explicit dependencies are not started serially - if f.dependencies != nil { + if f.Dependencies != nil { asyncComponents = append(asyncComponents, f) continue } started := make(chan struct{}) - if f.errorHandler != nil { - err = fnb.handleRestartableComponent(f, parent, func() { close(started) }) + if f.ErrorHandler != nil { + componentBuilder.AddWorker(WorkerFromRestartableComponent(log, input, f, parent, func() { close(started) })) } else { - err = fnb.handleComponent(f, parent, func() { close(started) }) - } - - if err != nil { - return fmt.Errorf("could not handle component %s: %w", f.name, err) + componentBuilder.AddWorker(WorkerFromComponent(log, input, f, parent, func() { close(started) })) } parent = started @@ -1592,17 +1602,12 @@ func (fnb *FlowNodeBuilder) handleComponents() error { // Components with explicit dependencies are run asynchronously, which means dependencies in // the dependency list must be initialized outside of the component factory. for _, f := range asyncComponents { - fnb.Logger.Debug().Str("component", f.name).Int("dependencies", len(f.dependencies.components)).Msg("handling component asynchronously") - err = fnb.handleComponent(f, util.AllReady(f.dependencies.components...), func() {}) - if err != nil { - return fmt.Errorf("could not handle dependable component %s: %w", f.name, err) - } + log.Debug().Str("component", f.Name).Int("dependencies", len(f.Dependencies.Components)).Msg("handling component asynchronously") + componentBuilder.AddWorker(WorkerFromComponent(log, input, f, util.AllReady(f.Dependencies.Components...), func() {})) } - - return nil } -// handleComponent constructs a component using the provided ReadyDoneFactory, and registers a +// WorkerFromComponent constructs a component using the provided ReadyDoneFactory, and registers a // worker with the ComponentManager to be run when the node is started. // // The ComponentManager starts all workers in parallel. Since some components have non-idempotent @@ -1615,27 +1620,27 @@ func (fnb *FlowNodeBuilder) handleComponents() error { // using their ReadyDoneAware interface. After components are updated to use the idempotent // ReadyDoneAware interface and explicitly wait for their dependencies to be ready, we can remove // this channel chaining. -func (fnb *FlowNodeBuilder) handleComponent(v namedComponentFunc, dependencies <-chan struct{}, started func()) error { +func WorkerFromComponent[Input any](log zerolog.Logger, input Input, v NamedComponentFunc[Input], dependencies <-chan struct{}, started func()) component.ComponentWorker { // Add a closure that starts the component when the node is started, and then waits for it to exit // gracefully. // Startup for all components will happen in parallel, and components can use their dependencies' // ReadyDoneAware interface to wait until they are ready. - fnb.componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + return func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { // wait for the dependencies to be ready before starting if err := util.WaitClosed(ctx, dependencies); err != nil { return } - logger := fnb.Logger.With().Str("component", v.name).Logger() + logger := log.With().Str("component", v.Name).Logger() logger.Info().Msg("component initialization started") // First, build the component using the factory method. - readyAware, err := v.fn(fnb.NodeConfig) + readyAware, err := v.FN(input) if err != nil { - ctx.Throw(fmt.Errorf("component %s initialization failed: %w", v.name, err)) + ctx.Throw(fmt.Errorf("component %s initialization failed: %w", v.Name, err)) } if readyAware == nil { - ctx.Throw(fmt.Errorf("component %s initialization failed: nil component", v.name)) + ctx.Throw(fmt.Errorf("component %s initialization failed: nil component", v.Name)) } logger.Info().Msg("component initialization complete") @@ -1671,20 +1676,24 @@ func (fnb *FlowNodeBuilder) handleComponent(v namedComponentFunc, dependencies < // Finally, wait until component has finished shutting down. <-readyAware.Done() logger.Info().Msg("component shutdown complete") - }) - - return nil + } } -// handleRestartableComponent constructs a component using the provided ReadyDoneFactory, and +// WorkerFromRestartableComponent constructs a component using the provided ReadyDoneFactory, and // registers a worker with the ComponentManager to be run when the node is started. // // Restartable Components are components that can be restarted after successfully handling // an irrecoverable error. // // Any irrecoverable errors thrown by the component will be passed to the provided error handler. -func (fnb *FlowNodeBuilder) handleRestartableComponent(v namedComponentFunc, parentReady <-chan struct{}, started func()) error { - fnb.componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { +func WorkerFromRestartableComponent[Input any]( + log zerolog.Logger, + input Input, + v NamedComponentFunc[Input], + parentReady <-chan struct{}, + started func(), +) component.ComponentWorker { + return func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { // wait for the previous component to be ready before starting if err := util.WaitClosed(ctx, parentReady); err != nil { return @@ -1699,12 +1708,12 @@ func (fnb *FlowNodeBuilder) handleRestartableComponent(v namedComponentFunc, par // from within the componentFactory started() - log := fnb.Logger.With().Str("component", v.name).Logger() + log := log.With().Str("component", v.Name).Logger() // This may be called multiple times if the component is restarted componentFactory := func() (component.Component, error) { log.Info().Msg("component initialization started") - c, err := v.fn(fnb.NodeConfig) + c, err := v.FN(input) if err != nil { return nil, err } @@ -1723,15 +1732,13 @@ func (fnb *FlowNodeBuilder) handleRestartableComponent(v namedComponentFunc, par return c.(component.Component), nil } - err := component.RunComponent(ctx, componentFactory, v.errorHandler) + err := component.RunComponent(ctx, componentFactory, v.ErrorHandler) if err != nil && !errors.Is(err, ctx.Err()) { - ctx.Throw(fmt.Errorf("component %s encountered an unhandled irrecoverable error: %w", v.name, err)) + ctx.Throw(fmt.Errorf("component %s encountered an unhandled irrecoverable error: %w", v.Name, err)) } log.Info().Msg("component shutdown complete") - }) - - return nil + } } // ExtraFlags enables binding additional flags beyond those defined in BaseConfig. @@ -1766,10 +1773,10 @@ func (fnb *FlowNodeBuilder) AdminCommand(command string, f func(config *NodeConf // The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance. // In both cases, the object is started when the node is run, and the node will wait for the // component to exit gracefully. -func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory) NodeBuilder { - fnb.components = append(fnb.components, namedComponentFunc{ - fn: f, - name: name, +func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder { + fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, }) return fnb } @@ -1785,26 +1792,26 @@ func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory) NodeBuild // IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all // dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method // MUST be idempotent. -func (fnb *FlowNodeBuilder) DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder { +func (fnb *FlowNodeBuilder) DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder { // Note: dependencies are passed as a struct to allow updating the list after calling this method. // Passing a slice instead would result in out of sync metadata since slices are passed by reference - fnb.components = append(fnb.components, namedComponentFunc{ - fn: f, - name: name, - dependencies: dependencies, + fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, + Dependencies: dependencies, }) return fnb } // OverrideComponent adds given builder function to the components set of the node builder. If a builder function with that name // already exists, it will be overridden. -func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory) NodeBuilder { +func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder { for i := 0; i < len(fnb.components); i++ { - if fnb.components[i].name == name { + if fnb.components[i].Name == name { // found component with the name, override it. - fnb.components[i] = namedComponentFunc{ - fn: f, - name: name, + fnb.components[i] = NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, } return fnb @@ -1828,11 +1835,11 @@ func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory) N // Note: The ReadyDoneFactory method may be called multiple times if the component is restarted. // // Any irrecoverable errors thrown by the component will be passed to the provided error handler. -func (fnb *FlowNodeBuilder) RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder { - fnb.components = append(fnb.components, namedComponentFunc{ - fn: f, - name: name, - errorHandler: errorHandler, +func (fnb *FlowNodeBuilder) RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder { + fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, + ErrorHandler: errorHandler, }) return fnb } diff --git a/cmd/scaffold_test.go b/cmd/scaffold_test.go index a37994356cc..d23663ec3b3 100644 --- a/cmd/scaffold_test.go +++ b/cmd/scaffold_test.go @@ -284,7 +284,7 @@ func TestOverrideModules(t *testing.T) { type testComponentDefinition struct { name string - factory ReadyDoneFactory + factory ReadyDoneFactory[*NodeConfig] errorHandler component.OnError } From 759daa544649692b60ccb5cfb6d625d24019076c Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 11 Dec 2024 22:10:09 +0100 Subject: [PATCH 2/3] rename --- cmd/scaffold.go | 50 ++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 34405e08b6d..ad94c641f3f 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -107,14 +107,14 @@ type namedModuleFunc struct { name string } -// NamedComponentFunc is wrapper for ReadyDoneFactory with additional fields: +// NamedComponentFactory is wrapper for ReadyDoneFactory with additional fields: // Name - name of the component // ErrorHandler - error handler for the component // Dependencies - list of dependencies for the component that should be ready before // the component is started -type NamedComponentFunc[Input any] struct { - FN ReadyDoneFactory[Input] - Name string +type NamedComponentFactory[Input any] struct { + ComponentFactory ReadyDoneFactory[Input] + Name string ErrorHandler component.OnError Dependencies *DependencyList @@ -133,7 +133,7 @@ type FlowNodeBuilder struct { *NodeConfig flags *pflag.FlagSet modules []namedModuleFunc - components []NamedComponentFunc[*NodeConfig] + components []NamedComponentFactory[*NodeConfig] postShutdownFns []func() error preInitFns []BuilderFunc postInitFns []BuilderFunc @@ -1566,7 +1566,7 @@ func AddWorkersFromComponents[Input any]( log zerolog.Logger, input Input, componentBuilder component.ComponentManagerBuilder, - components []NamedComponentFunc[Input], + components []NamedComponentFactory[Input], ) { // The parent/started channels are used to enforce serial startup. // - parent is the started channel of the previous component. @@ -1578,7 +1578,7 @@ func AddWorkersFromComponents[Input any]( parent := make(chan struct{}) close(parent) - asyncComponents := []NamedComponentFunc[Input]{} + asyncComponents := []NamedComponentFactory[Input]{} // Run all components for _, f := range components { @@ -1620,7 +1620,7 @@ func AddWorkersFromComponents[Input any]( // using their ReadyDoneAware interface. After components are updated to use the idempotent // ReadyDoneAware interface and explicitly wait for their dependencies to be ready, we can remove // this channel chaining. -func WorkerFromComponent[Input any](log zerolog.Logger, input Input, v NamedComponentFunc[Input], dependencies <-chan struct{}, started func()) component.ComponentWorker { +func WorkerFromComponent[Input any](log zerolog.Logger, input Input, v NamedComponentFactory[Input], dependencies <-chan struct{}, started func()) component.ComponentWorker { // Add a closure that starts the component when the node is started, and then waits for it to exit // gracefully. // Startup for all components will happen in parallel, and components can use their dependencies' @@ -1635,7 +1635,7 @@ func WorkerFromComponent[Input any](log zerolog.Logger, input Input, v NamedComp logger.Info().Msg("component initialization started") // First, build the component using the factory method. - readyAware, err := v.FN(input) + readyAware, err := v.ComponentFactory(input) if err != nil { ctx.Throw(fmt.Errorf("component %s initialization failed: %w", v.Name, err)) } @@ -1689,7 +1689,7 @@ func WorkerFromComponent[Input any](log zerolog.Logger, input Input, v NamedComp func WorkerFromRestartableComponent[Input any]( log zerolog.Logger, input Input, - v NamedComponentFunc[Input], + v NamedComponentFactory[Input], parentReady <-chan struct{}, started func(), ) component.ComponentWorker { @@ -1713,7 +1713,7 @@ func WorkerFromRestartableComponent[Input any]( // This may be called multiple times if the component is restarted componentFactory := func() (component.Component, error) { log.Info().Msg("component initialization started") - c, err := v.FN(input) + c, err := v.ComponentFactory(input) if err != nil { return nil, err } @@ -1774,9 +1774,9 @@ func (fnb *FlowNodeBuilder) AdminCommand(command string, f func(config *NodeConf // In both cases, the object is started when the node is run, and the node will wait for the // component to exit gracefully. func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder { - fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ - FN: f, - Name: name, + fnb.components = append(fnb.components, NamedComponentFactory[*NodeConfig]{ + ComponentFactory: f, + Name: name, }) return fnb } @@ -1795,10 +1795,10 @@ func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory[*NodeConfi func (fnb *FlowNodeBuilder) DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder { // Note: dependencies are passed as a struct to allow updating the list after calling this method. // Passing a slice instead would result in out of sync metadata since slices are passed by reference - fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ - FN: f, - Name: name, - Dependencies: dependencies, + fnb.components = append(fnb.components, NamedComponentFactory[*NodeConfig]{ + ComponentFactory: f, + Name: name, + Dependencies: dependencies, }) return fnb } @@ -1809,9 +1809,9 @@ func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory[*N for i := 0; i < len(fnb.components); i++ { if fnb.components[i].Name == name { // found component with the name, override it. - fnb.components[i] = NamedComponentFunc[*NodeConfig]{ - FN: f, - Name: name, + fnb.components[i] = NamedComponentFactory[*NodeConfig]{ + ComponentFactory: f, + Name: name, } return fnb @@ -1836,10 +1836,10 @@ func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory[*N // // Any irrecoverable errors thrown by the component will be passed to the provided error handler. func (fnb *FlowNodeBuilder) RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder { - fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ - FN: f, - Name: name, - ErrorHandler: errorHandler, + fnb.components = append(fnb.components, NamedComponentFactory[*NodeConfig]{ + ComponentFactory: f, + Name: name, + ErrorHandler: errorHandler, }) return fnb } From 85fd812a7e49c921277160d8d31b2eba0a436a57 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Fri, 13 Dec 2024 16:06:09 +0100 Subject: [PATCH 3/3] add context to node run --- cmd/access/main.go | 4 ++- cmd/collection/main.go | 3 +- cmd/consensus/main.go | 3 +- cmd/execution/main.go | 4 ++- cmd/ghost/main.go | 4 ++- cmd/node.go | 6 ++-- cmd/node_test.go | 49 +++++++++++++++++++++++++++---- cmd/observer/main.go | 4 ++- cmd/verification/main.go | 4 ++- insecure/cmd/access/main.go | 4 ++- insecure/cmd/execution/main.go | 4 ++- insecure/cmd/verification/main.go | 4 ++- 12 files changed, 75 insertions(+), 18 deletions(-) diff --git a/cmd/access/main.go b/cmd/access/main.go index 1deac0311ce..6b0dfa9e02c 100644 --- a/cmd/access/main.go +++ b/cmd/access/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/onflow/flow-go/cmd" nodebuilder "github.com/onflow/flow-go/cmd/access/node_builder" "github.com/onflow/flow-go/model/flow" @@ -24,5 +26,5 @@ func main() { if err != nil { builder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 1a241ba703b..8229a1d6bf0 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "time" @@ -646,7 +647,7 @@ func main() { if err != nil { nodeBuilder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } // createQCContractClient creates QC contract client diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index 616e7657d10..7da905963ea 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "errors" "fmt" @@ -933,7 +934,7 @@ func main() { if err != nil { nodeBuilder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } func loadBeaconPrivateKey(dir string, myID flow.Identifier) (*encodable.RandomBeaconPrivKey, error) { diff --git a/cmd/execution/main.go b/cmd/execution/main.go index 58f10f7051c..c435823b029 100644 --- a/cmd/execution/main.go +++ b/cmd/execution/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/onflow/flow-go/cmd" "github.com/onflow/flow-go/model/flow" ) @@ -19,5 +21,5 @@ func main() { if err != nil { exeBuilder.FlowNodeBuilder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } diff --git a/cmd/ghost/main.go b/cmd/ghost/main.go index eade979e506..fa4b4887cd3 100644 --- a/cmd/ghost/main.go +++ b/cmd/ghost/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/spf13/pflag" "github.com/onflow/flow-go/cmd" @@ -45,5 +47,5 @@ func main() { if err != nil { nodeBuilder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } diff --git a/cmd/node.go b/cmd/node.go index 02ff4aad9ef..3bccb2fd345 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -22,7 +22,8 @@ type Node interface { // Run initiates all common components (logger, database, protocol state etc.) // then starts each component. It also sets up a channel to gracefully shut // down each component if a SIGINT is received. - Run() + // The context can also be used to signal the node to shutdown. + Run(ctx context.Context) } // FlowNodeImp is created by the FlowNodeBuilder with all components ready to be started. @@ -86,8 +87,7 @@ func NewBaseNode( // which point it gracefully shuts down. // Any unhandled irrecoverable errors thrown in child components will propagate up to here and // result in a fatal error. -func (node *NodeImp) Run() { - ctx := context.Background() +func (node *NodeImp) Run(ctx context.Context) { // Block until node is shutting down err := node.run(ctx) diff --git a/cmd/node_test.go b/cmd/node_test.go index a42de1f28db..e8fb48248f6 100644 --- a/cmd/node_test.go +++ b/cmd/node_test.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "errors" "os" "syscall" @@ -42,7 +43,7 @@ func TestRunShutsDownCleanly(t *testing.T) { finished := make(chan struct{}) go func() { - node.Run() + node.Run(context.Background()) close(finished) }() @@ -62,6 +63,44 @@ func TestRunShutsDownCleanly(t *testing.T) { }, testLogger.logs) }) + t.Run("Run shuts down gracefully on context cancel", func(t *testing.T) { + testLogger.Reset() + manager := component.NewComponentManagerBuilder(). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + testLogger.Log("worker starting up") + ready() + testLogger.Log("worker startup complete") + + <-ctx.Done() + testLogger.Log("worker shutting down") + testLogger.Log("worker shutdown complete") + }). + Build() + node := NewNode(manager, nodeConfig, logger, postShutdown, fatalHandler) + + ctx, cancel := context.WithCancel(context.Background()) + + finished := make(chan struct{}) + go func() { + node.Run(ctx) + close(finished) + }() + + <-node.Ready() + + cancel() + + <-finished + + assert.Equal(t, []string{ + "worker starting up", + "worker startup complete", + "worker shutting down", + "worker shutdown complete", + "running cleanup", + }, testLogger.logs) + }) + t.Run("Run encounters error during postShutdown", func(t *testing.T) { testLogger.Reset() manager := component.NewComponentManagerBuilder(). @@ -82,7 +121,7 @@ func TestRunShutsDownCleanly(t *testing.T) { finished := make(chan struct{}) go func() { - node.Run() + node.Run(context.Background()) close(finished) }() @@ -123,7 +162,7 @@ func TestRunShutsDownCleanly(t *testing.T) { finished := make(chan struct{}) go func() { - node.Run() + node.Run(context.Background()) close(finished) }() @@ -157,7 +196,7 @@ func TestRunShutsDownCleanly(t *testing.T) { finished := make(chan struct{}) go func() { - node.Run() + node.Run(context.Background()) close(finished) }() @@ -191,7 +230,7 @@ func TestRunShutsDownCleanly(t *testing.T) { finished := make(chan struct{}) go func() { - node.Run() + node.Run(context.Background()) close(finished) }() diff --git a/cmd/observer/main.go b/cmd/observer/main.go index 96ec27bc5cc..bb84036fa9f 100644 --- a/cmd/observer/main.go +++ b/cmd/observer/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + nodebuilder "github.com/onflow/flow-go/cmd/observer/node_builder" ) @@ -22,5 +24,5 @@ func main() { if err != nil { anb.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } diff --git a/cmd/verification/main.go b/cmd/verification/main.go index 6c9fbdd50e3..10ba2ae2670 100644 --- a/cmd/verification/main.go +++ b/cmd/verification/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/onflow/flow-go/cmd" "github.com/onflow/flow-go/model/flow" ) @@ -20,5 +22,5 @@ func main() { if err != nil { verificationBuilder.FlowNodeBuilder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } diff --git a/insecure/cmd/access/main.go b/insecure/cmd/access/main.go index 836f35cbf67..bd59326c769 100644 --- a/insecure/cmd/access/main.go +++ b/insecure/cmd/access/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + nodebuilder "github.com/onflow/flow-go/cmd/access/node_builder" insecmd "github.com/onflow/flow-go/insecure/cmd" "github.com/onflow/flow-go/model/flow" @@ -31,5 +33,5 @@ func main() { builder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } diff --git a/insecure/cmd/execution/main.go b/insecure/cmd/execution/main.go index 1a998fd8351..0bba64f5364 100644 --- a/insecure/cmd/execution/main.go +++ b/insecure/cmd/execution/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/onflow/flow-go/cmd" insecmd "github.com/onflow/flow-go/insecure/cmd" "github.com/onflow/flow-go/model/flow" @@ -23,5 +25,5 @@ func main() { if err != nil { corruptedExecutionBuilder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) } diff --git a/insecure/cmd/verification/main.go b/insecure/cmd/verification/main.go index 91c876dde5a..1a6db4adc51 100644 --- a/insecure/cmd/verification/main.go +++ b/insecure/cmd/verification/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/onflow/flow-go/cmd" insecmd "github.com/onflow/flow-go/insecure/cmd" "github.com/onflow/flow-go/model/flow" @@ -23,5 +25,5 @@ func main() { if err != nil { corruptedVerificationBuilder.Logger.Fatal().Err(err).Send() } - node.Run() + node.Run(context.Background()) }