From 3552f0617823914d585a8cc3af9aa7a09055a1dd Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 20 Nov 2024 16:02:47 -0800 Subject: [PATCH 01/35] replace engine.Unit with ComponentManager in Pusher Engine Using https://github.com/onflow/flow-go/pull/4219 as an example. Instead of starting new goroutines or directly processing messages in a blocking way, messages are added to a queue that a worker pulls from. The Pusher engine still currently implements network.Engine rather than network.MessageProcessor. --- engine/collection/pusher/engine.go | 117 ++++++++++++++++++++++------- 1 file changed, 89 insertions(+), 28 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 317729108dd..67655449c90 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -4,15 +4,20 @@ package pusher import ( + "context" + "errors" "fmt" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" @@ -24,7 +29,6 @@ import ( // Engine is the collection pusher engine, which provides access to resources // held by the collection node. type Engine struct { - unit *engine.Unit log zerolog.Logger engMetrics module.EngineMetrics colMetrics module.CollectionMetrics @@ -33,11 +37,33 @@ type Engine struct { state protocol.State collections storage.Collections transactions storage.Transactions + + messageHandler *engine.MessageHandler + notifier engine.Notifier + inbound *fifoqueue.FifoQueue + + component.Component + cm *component.ComponentManager } +// TODO convert to network.MessageProcessor +var _ network.Engine = (*Engine)(nil) +var _ component.Component = (*Engine)(nil) + func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) { + // TODO length observer metrics + inbound, err := fifoqueue.NewFifoQueue(1000) + if err != nil { + return nil, fmt.Errorf("could not create inbound fifoqueue: %w", err) + } + + notifier := engine.NewNotifier() + messageHandler := engine.NewMessageHandler(log, notifier, engine.Pattern{ + Match: engine.MatchType[*messages.SubmitCollectionGuarantee], + Store: &engine.FifoMessageStore{FifoQueue: inbound}, + }) + e := &Engine{ - unit: engine.NewUnit(), log: log.With().Str("engine", "pusher").Logger(), engMetrics: engMetrics, colMetrics: colMetrics, @@ -45,6 +71,10 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e state: state, collections: collections, transactions: transactions, + + messageHandler: messageHandler, + notifier: notifier, + inbound: inbound, } conduit, err := net.Register(channels.PushGuarantees, e) @@ -53,55 +83,86 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e } e.conduit = conduit + e.cm = component.NewComponentManagerBuilder(). + AddWorker(e.inboundMessageWorker). + Build() + e.Component = e.cm + return e, nil } -// Ready returns a ready channel that is closed once the engine has fully -// started. -func (e *Engine) Ready() <-chan struct{} { - return e.unit.Ready() +func (e *Engine) inboundMessageWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + done := ctx.Done() + wake := e.notifier.Channel() + for { + select { + case <-done: + return + case <-wake: + e.processInboundMessages(ctx) + } + } } -// Done returns a done channel that is closed once the engine has fully stopped. -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() +func (e *Engine) processInboundMessages(ctx context.Context) { + for { + nextMessage, ok := e.inbound.Pop() + if !ok { + return + } + + asEngineWrapper := nextMessage.(*engine.Message) + asSCGMsg := asEngineWrapper.Payload.(*messages.SubmitCollectionGuarantee) + originID := asEngineWrapper.OriginID + + _ = e.process(originID, asSCGMsg) + + select { + case <-ctx.Done(): + return + default: + } + } } // SubmitLocal submits an event originating on the local node. func (e *Engine) SubmitLocal(event interface{}) { - e.unit.Launch(func() { - err := e.process(e.me.NodeID(), event) - if err != nil { - engine.LogError(e.log, err) - } - }) + err := e.messageHandler.Process(e.me.NodeID(), event) + if err != nil { + engine.LogError(e.log, err) + } } // Submit submits the given event from the node with the given origin ID // for processing in a non-blocking manner. It returns instantly and logs // a potential processing error internally when done. func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - e.unit.Launch(func() { - err := e.process(originID, event) - if err != nil { - engine.LogError(e.log, err) - } - }) + err := e.messageHandler.Process(originID, event) + if err != nil { + engine.LogError(e.log, err) + } } // ProcessLocal processes an event originating on the local node. func (e *Engine) ProcessLocal(event interface{}) error { - return e.unit.Do(func() error { - return e.process(e.me.NodeID(), event) - }) + return e.messageHandler.Process(e.me.NodeID(), event) } // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. -func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { - return e.unit.Do(func() error { - return e.process(originID, event) - }) +func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { + err := e.messageHandler.Process(originID, message) + if err != nil { + if errors.Is(err, engine.IncompatibleInputTypeError) { + e.log.Warn().Bool(logging.KeySuspicious, true).Msgf("%v delivered unsupported message %T through %v", originID, message, channel) + return nil + } + // TODO add comment about Process errors... + return fmt.Errorf("unexpected failure to process inbound pusher message") + } + return nil } // process processes events for the pusher engine on the collection node. From 6d3f4620a3a2b0156d5955d55c0f316159253559 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 21 Nov 2024 10:48:56 -0800 Subject: [PATCH 02/35] pusher engine test: update positive test --- engine/collection/pusher/engine_test.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index fde6d9696dc..9f59f79f666 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -1,8 +1,10 @@ package pusher_test import ( + "context" "io" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -12,6 +14,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/messages" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" @@ -82,12 +85,17 @@ func TestPusherEngine(t *testing.T) { // should be able to submit collection guarantees to consensus nodes func (suite *Suite) TestSubmitCollectionGuarantee() { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background()) + suite.engine.Start(ctx) + defer cancel() + done := make(chan struct{}) guarantee := unittest.CollectionGuaranteeFixture() // should submit the collection to consensus nodes consensus := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleConsensus)) - suite.conduit.On("Publish", guarantee, consensus[0].NodeID).Return(nil) + suite.conduit.On("Publish", guarantee, consensus[0].NodeID). + Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once() msg := &messages.SubmitCollectionGuarantee{ Guarantee: *guarantee, @@ -95,6 +103,8 @@ func (suite *Suite) TestSubmitCollectionGuarantee() { err := suite.engine.ProcessLocal(msg) suite.Require().Nil(err) + unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent") + suite.conduit.AssertExpectations(suite.T()) } From 0aadc1315ec3935b811496e9d4ab7bbb592c994a Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 21 Nov 2024 10:57:29 -0800 Subject: [PATCH 03/35] Pusher engine test: update negative test Because the event processing now happens in a worker, any errors raised within it are no longer visible to the caller of Process(). Because the test checked for error status, moved the tests to the same package and call the internal processing function directly. --- engine/collection/pusher/engine_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index 9f59f79f666..bedc17f16e0 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -1,4 +1,4 @@ -package pusher_test +package pusher import ( "context" @@ -10,14 +10,12 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" - "github.com/onflow/flow-go/engine/collection/pusher" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" - "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/mocknetwork" protocol "github.com/onflow/flow-go/state/protocol/mock" storage "github.com/onflow/flow-go/storage/mock" @@ -35,7 +33,7 @@ type Suite struct { collections *storage.Collections transactions *storage.Transactions - engine *pusher.Engine + engine *Engine } func (suite *Suite) SetupTest() { @@ -66,7 +64,7 @@ func (suite *Suite) SetupTest() { suite.collections = new(storage.Collections) suite.transactions = new(storage.Transactions) - suite.engine, err = pusher.New( + suite.engine, err = New( zerolog.New(io.Discard), net, suite.state, @@ -119,7 +117,7 @@ func (suite *Suite) TestSubmitCollectionGuaranteeNonLocal() { msg := &messages.SubmitCollectionGuarantee{ Guarantee: *guarantee, } - err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg) + err := suite.engine.process(sender.NodeID, msg) suite.Require().Error(err) suite.conduit.AssertNumberOfCalls(suite.T(), "Multicast", 0) From dec0d586bc1b1a7587e73e7fd39fde451f4b117e Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 21 Nov 2024 11:52:42 -0800 Subject: [PATCH 04/35] Start pusher engine in mocks When using Unit, calling Ready would also start the engine. With ComponentManager, we additionally need to invoke Start. --- engine/testutil/mock/nodes.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index 8a2ea4fed1a..aae7383ada0 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -140,6 +140,7 @@ func (n CollectionNode) Start(t *testing.T) { n.IngestionEngine.Start(n.Ctx) n.EpochManagerEngine.Start(n.Ctx) n.ProviderEngine.Start(n.Ctx) + n.PusherEngine.Start(n.Ctx) } func (n CollectionNode) Ready() <-chan struct{} { From b7166f39929683517ac9b37f605e219353ae6923 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 26 Nov 2024 16:36:41 -0800 Subject: [PATCH 05/35] Refactor pusher engine: merge function with no callers --- engine/collection/pusher/engine.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 67655449c90..7ed4668da93 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -171,22 +171,15 @@ func (e *Engine) process(originID flow.Identifier, event interface{}) error { case *messages.SubmitCollectionGuarantee: e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) - return e.onSubmitCollectionGuarantee(originID, ev) + if originID != e.me.NodeID() { + return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID) + } + return e.SubmitCollectionGuarantee(&ev.Guarantee) default: return fmt.Errorf("invalid event type (%T)", event) } } -// onSubmitCollectionGuarantee handles submitting the given collection guarantee -// to consensus nodes. -func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error { - if originID != e.me.NodeID() { - return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID) - } - - return e.SubmitCollectionGuarantee(&req.Guarantee) -} - // SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes. func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) From e25cba77b2c6424fca936f546f998e0cec016ddc Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 26 Nov 2024 16:41:26 -0800 Subject: [PATCH 06/35] Refactor pusher engine: error on non-local messages --- engine/collection/pusher/engine.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 7ed4668da93..178a42be22c 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -139,10 +139,7 @@ func (e *Engine) SubmitLocal(event interface{}) { // for processing in a non-blocking manner. It returns instantly and logs // a potential processing error internally when done. func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := e.messageHandler.Process(originID, event) - if err != nil { - engine.LogError(e.log, err) - } + engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node")) } // ProcessLocal processes an event originating on the local node. @@ -153,16 +150,7 @@ func (e *Engine) ProcessLocal(event interface{}) error { // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { - err := e.messageHandler.Process(originID, message) - if err != nil { - if errors.Is(err, engine.IncompatibleInputTypeError) { - e.log.Warn().Bool(logging.KeySuspicious, true).Msgf("%v delivered unsupported message %T through %v", originID, message, channel) - return nil - } - // TODO add comment about Process errors... - return fmt.Errorf("unexpected failure to process inbound pusher message") - } - return nil + return fmt.Errorf("pusher engine should only receive local messages on the same node") } // process processes events for the pusher engine on the collection node. From f2f53a896507243b899e180fe5942d215a84f6af Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 26 Nov 2024 16:51:25 -0800 Subject: [PATCH 07/35] Refactor pusher engine: rename and propagate error Rename `inboundMessageWorker` and `processInboundMessages` to `outbound` and also propagate errors to the top level of the worker where they can be thrown. --- engine/collection/pusher/engine.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 178a42be22c..ccdd4090684 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -84,14 +84,15 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e e.conduit = conduit e.cm = component.NewComponentManagerBuilder(). - AddWorker(e.inboundMessageWorker). + AddWorker(e.outboundQueueWorker). Build() e.Component = e.cm return e, nil } -func (e *Engine) inboundMessageWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { +// Worker to process SubmitCollectionGuarantee messages coming from the Finalizer. +func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() done := ctx.Done() @@ -101,27 +102,35 @@ func (e *Engine) inboundMessageWorker(ctx irrecoverable.SignalerContext, ready c case <-done: return case <-wake: - e.processInboundMessages(ctx) + err := e.processOutboundMessages(ctx) + if err != nil { + ctx.Throw(err) + } } } } -func (e *Engine) processInboundMessages(ctx context.Context) { +// processOutboundMessages processes any available messages from the queue. +// Only returns when the queue is empty (or the engine is terminated). +func (e *Engine) processOutboundMessages(ctx context.Context) error { for { nextMessage, ok := e.inbound.Pop() if !ok { - return + return nil } asEngineWrapper := nextMessage.(*engine.Message) asSCGMsg := asEngineWrapper.Payload.(*messages.SubmitCollectionGuarantee) originID := asEngineWrapper.OriginID - _ = e.process(originID, asSCGMsg) + err := e.process(originID, asSCGMsg) + if err != nil { + return err + } select { case <-ctx.Done(): - return + return nil default: } } From f66ba69f012d29842c3b3a9f631b7a43d83088a1 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 27 Nov 2024 09:34:09 -0800 Subject: [PATCH 08/35] Refactor pusher engine - Partially implement suggestion https://github.com/onflow/flow-go/pull/6747#discussion_r1857672727 - Make `SubmitCollectionGuarantee` non-exported and rename to `publishCollectionGuarantee` - Add new `SubmitCollectionGuarantee` exported function that just adds to the queue - Remove `messageHandler` field, instead directly add to queue from review: https://github.com/onflow/flow-go/pull/6747#discussion_r1857531386 - `OriginID`s no longer included in messages in the queue, and therefore not checked by the worker - if necessary they should be checked when Submitting --- engine/collection/pusher/engine.go | 57 ++++++++++++++++++------------ 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index ccdd4090684..21becc5b6d7 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -5,7 +5,6 @@ package pusher import ( "context" - "errors" "fmt" "github.com/rs/zerolog" @@ -38,9 +37,8 @@ type Engine struct { collections storage.Collections transactions storage.Transactions - messageHandler *engine.MessageHandler - notifier engine.Notifier - inbound *fifoqueue.FifoQueue + notifier engine.Notifier + inbound *fifoqueue.FifoQueue component.Component cm *component.ComponentManager @@ -58,10 +56,6 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e } notifier := engine.NewNotifier() - messageHandler := engine.NewMessageHandler(log, notifier, engine.Pattern{ - Match: engine.MatchType[*messages.SubmitCollectionGuarantee], - Store: &engine.FifoMessageStore{FifoQueue: inbound}, - }) e := &Engine{ log: log.With().Str("engine", "pusher").Logger(), @@ -72,9 +66,8 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e collections: collections, transactions: transactions, - messageHandler: messageHandler, - notifier: notifier, - inbound: inbound, + notifier: notifier, + inbound: inbound, } conduit, err := net.Register(channels.PushGuarantees, e) @@ -119,11 +112,12 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error { return nil } - asEngineWrapper := nextMessage.(*engine.Message) - asSCGMsg := asEngineWrapper.Payload.(*messages.SubmitCollectionGuarantee) - originID := asEngineWrapper.OriginID + asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee) + if !ok { + return fmt.Errorf("invalid message type in pusher engine queue") + } - err := e.process(originID, asSCGMsg) + err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee) if err != nil { return err } @@ -138,9 +132,11 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error { // SubmitLocal submits an event originating on the local node. func (e *Engine) SubmitLocal(event interface{}) { - err := e.messageHandler.Process(e.me.NodeID(), event) - if err != nil { - engine.LogError(e.log, err) + ev, ok := event.(*messages.SubmitCollectionGuarantee) + if ok { + e.SubmitCollectionGuarantee(ev) + } else { + engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine")) } } @@ -153,7 +149,13 @@ func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, even // ProcessLocal processes an event originating on the local node. func (e *Engine) ProcessLocal(event interface{}) error { - return e.messageHandler.Process(e.me.NodeID(), event) + ev, ok := event.(*messages.SubmitCollectionGuarantee) + if ok { + e.SubmitCollectionGuarantee(ev) + return nil + } else { + return fmt.Errorf("invalid message argument to pusher engine") + } } // Process processes the given event from the node with the given origin ID in @@ -162,6 +164,17 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, mes return fmt.Errorf("pusher engine should only receive local messages on the same node") } +// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue +// to later be published to consensus nodes. +func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) { + ok := e.inbound.Push(msg) + if !ok { + e.log.Err(fmt.Errorf("failed to store collection guarantee in queue")) + return + } + e.notifier.Notify() +} + // process processes events for the pusher engine on the collection node. func (e *Engine) process(originID flow.Identifier, event interface{}) error { switch ev := event.(type) { @@ -171,14 +184,14 @@ func (e *Engine) process(originID flow.Identifier, event interface{}) error { if originID != e.me.NodeID() { return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID) } - return e.SubmitCollectionGuarantee(&ev.Guarantee) + return e.publishCollectionGuarantee(&ev.Guarantee) default: return fmt.Errorf("invalid event type (%T)", event) } } -// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes. -func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { +// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes. +func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) if err != nil { return fmt.Errorf("could not get consensus nodes: %w", err) From 1c80949eda91fb50463f80932abf513663a6b361 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 27 Nov 2024 10:02:26 -0800 Subject: [PATCH 09/35] Revert "Pusher engine test: update negative test" This reverts commit 0aadc1315ec3935b811496e9d4ab7bbb592c994a. Instead of testing the internals, test the exported interface. --- engine/collection/pusher/engine_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index bedc17f16e0..9f59f79f666 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -1,4 +1,4 @@ -package pusher +package pusher_test import ( "context" @@ -10,12 +10,14 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/onflow/flow-go/engine/collection/pusher" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" + "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/mocknetwork" protocol "github.com/onflow/flow-go/state/protocol/mock" storage "github.com/onflow/flow-go/storage/mock" @@ -33,7 +35,7 @@ type Suite struct { collections *storage.Collections transactions *storage.Transactions - engine *Engine + engine *pusher.Engine } func (suite *Suite) SetupTest() { @@ -64,7 +66,7 @@ func (suite *Suite) SetupTest() { suite.collections = new(storage.Collections) suite.transactions = new(storage.Transactions) - suite.engine, err = New( + suite.engine, err = pusher.New( zerolog.New(io.Discard), net, suite.state, @@ -117,7 +119,7 @@ func (suite *Suite) TestSubmitCollectionGuaranteeNonLocal() { msg := &messages.SubmitCollectionGuarantee{ Guarantee: *guarantee, } - err := suite.engine.process(sender.NodeID, msg) + err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg) suite.Require().Error(err) suite.conduit.AssertNumberOfCalls(suite.T(), "Multicast", 0) From fbed8e70139e5a0ccb2142430617199414c0b24c Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 27 Nov 2024 10:24:15 -0800 Subject: [PATCH 10/35] Refactor pusher engine: (lint) remove unused code --- engine/collection/pusher/engine.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 21becc5b6d7..996fb82b29d 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -175,21 +175,6 @@ func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarant e.notifier.Notify() } -// process processes events for the pusher engine on the collection node. -func (e *Engine) process(originID flow.Identifier, event interface{}) error { - switch ev := event.(type) { - case *messages.SubmitCollectionGuarantee: - e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) - defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) - if originID != e.me.NodeID() { - return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID) - } - return e.publishCollectionGuarantee(&ev.Guarantee) - default: - return fmt.Errorf("invalid event type (%T)", event) - } -} - // publishCollectionGuarantee publishes the collection guarantee to all consensus nodes. func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) From ddb0cb7761a23efad3d17b7ad71aac64a29019d8 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 29 Nov 2024 09:40:45 -0800 Subject: [PATCH 11/35] Refactor pusher engine: queue length metrics Rename queue and add length metrics for it, updating creation sites. --- cmd/collection/main.go | 1 + engine/collection/pusher/engine.go | 33 ++++++++++++++++++------- engine/collection/pusher/engine_test.go | 1 + engine/testutil/nodes.go | 2 +- module/metrics/labels.go | 1 + 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 1a241ba703b..fad3548d071 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -480,6 +480,7 @@ func main() { node.EngineRegistry, node.State, node.Metrics.Engine, + node.Metrics.Mempool, colMetrics, node.Me, node.Storage.Collections, diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 996fb82b29d..a750cd7842b 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -38,7 +38,7 @@ type Engine struct { transactions storage.Transactions notifier engine.Notifier - inbound *fifoqueue.FifoQueue + queue *fifoqueue.FifoQueue component.Component cm *component.ComponentManager @@ -48,11 +48,26 @@ type Engine struct { var _ network.Engine = (*Engine)(nil) var _ component.Component = (*Engine)(nil) -func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) { - // TODO length observer metrics - inbound, err := fifoqueue.NewFifoQueue(1000) +// New creates a new pusher engine. +func New( + log zerolog.Logger, + net network.EngineRegistry, + state protocol.State, + engMetrics module.EngineMetrics, + mempoolMetrics module.MempoolMetrics, + colMetrics module.CollectionMetrics, + me module.Local, + collections storage.Collections, + transactions storage.Transactions, +) (*Engine, error) { + queue, err := fifoqueue.NewFifoQueue( + 1000, + fifoqueue.WithLengthObserver(func(len int) { + mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len)) + }), + ) if err != nil { - return nil, fmt.Errorf("could not create inbound fifoqueue: %w", err) + return nil, fmt.Errorf("could not create fifoqueue: %w", err) } notifier := engine.NewNotifier() @@ -67,7 +82,7 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e transactions: transactions, notifier: notifier, - inbound: inbound, + queue: queue, } conduit, err := net.Register(channels.PushGuarantees, e) @@ -107,7 +122,7 @@ func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready co // Only returns when the queue is empty (or the engine is terminated). func (e *Engine) processOutboundMessages(ctx context.Context) error { for { - nextMessage, ok := e.inbound.Pop() + nextMessage, ok := e.queue.Pop() if !ok { return nil } @@ -167,9 +182,9 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, mes // SubmitCollectionGuarantee adds a collection guarantee to the engine's queue // to later be published to consensus nodes. func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) { - ok := e.inbound.Push(msg) + ok := e.queue.Push(msg) if !ok { - e.log.Err(fmt.Errorf("failed to store collection guarantee in queue")) + engine.LogError(e.log, fmt.Errorf("failed to store collection guarantee in queue")) return } e.notifier.Notify() diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index 9f59f79f666..29c4d41a1b5 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -72,6 +72,7 @@ func (suite *Suite) SetupTest() { suite.state, metrics, metrics, + metrics, suite.me, suite.collections, suite.transactions, diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index a602e628565..ecab659bbfe 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -313,7 +313,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro retrieve) require.NoError(t, err) - pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Me, collections, transactions) + pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Metrics, node.Me, collections, transactions) require.NoError(t, err) clusterStateFactory, err := factories.NewClusterStateFactory( diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 20b66ad7d68..70520a61836 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -113,6 +113,7 @@ const ( ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine + ResourceSubmitCollectionGuaranteesQueue = "submit_col_guarantee_queue" // collection node, pusher engine ResourceBeaconKey = "beacon-key" // consensus node, DKG engine ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine From 17a9a2bb0e0aec2acd8751ac99fe90be5f36711a Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 29 Nov 2024 10:30:32 -0800 Subject: [PATCH 12/35] Update pusher engine doc comment Co-authored-by: Alexander Hentschel --- engine/collection/pusher/engine.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index a750cd7842b..03f09673a53 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -25,8 +25,8 @@ import ( "github.com/onflow/flow-go/utils/logging" ) -// Engine is the collection pusher engine, which provides access to resources -// held by the collection node. +// Engine is part of the Collection Nodes. It broadcasts finalized collections +// that the cluster generates to the broader network. type Engine struct { log zerolog.Logger engMetrics module.EngineMetrics From aad332cfa934357ac42d0a95204e861b2ff2eaf8 Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 29 Nov 2024 16:10:46 -0800 Subject: [PATCH 13/35] Apply suggestions from code review Doc comment changes, metrics naming, and queue length. For reasoning behind chosen queue length, see https://github.com/onflow/flow-go/pull/6747#discussion_r1863943732 Co-authored-by: Jordan Schalm --- engine/collection/pusher/engine.go | 10 ++++++---- module/metrics/labels.go | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 03f09673a53..bdd72bd3008 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -25,8 +25,9 @@ import ( "github.com/onflow/flow-go/utils/logging" ) -// Engine is part of the Collection Nodes. It broadcasts finalized collections -// that the cluster generates to the broader network. +// Engine is part of the Collection Node. It broadcasts finalized collections +// ("collection guarantees") that the cluster generates to Consensus Nodes +// for inclusion in blocks. type Engine struct { log zerolog.Logger engMetrics module.EngineMetrics @@ -61,7 +62,7 @@ func New( transactions storage.Transactions, ) (*Engine, error) { queue, err := fifoqueue.NewFifoQueue( - 1000, + 200, // roughly 1 minute of collections, at 3BPS fifoqueue.WithLengthObserver(func(len int) { mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len)) }), @@ -99,7 +100,8 @@ func New( return e, nil } -// Worker to process SubmitCollectionGuarantee messages coming from the Finalizer. +// outboundQueueWorker implements a component worker which broadcasts collection guarantees, +// enqueued by the Finalizer upon finalization, to Consensus Nodes. func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 70520a61836..1e5b6601a2e 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -113,7 +113,7 @@ const ( ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine - ResourceSubmitCollectionGuaranteesQueue = "submit_col_guarantee_queue" // collection node, pusher engine + ResourceSubmitCollectionGuaranteesQueue = "pusher_col_guarantee_queue" // collection node, pusher engine ResourceBeaconKey = "beacon-key" // consensus node, DKG engine ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine From ad04f27443363e8c6cfb63ec9b30a2ce9e0c63a4 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 29 Nov 2024 16:38:30 -0800 Subject: [PATCH 14/35] Pusher engine refactor: remove unused collection metrics --- cmd/collection/main.go | 1 - engine/collection/pusher/engine.go | 5 +---- engine/collection/pusher/engine_test.go | 1 - engine/testutil/nodes.go | 2 +- 4 files changed, 2 insertions(+), 7 deletions(-) diff --git a/cmd/collection/main.go b/cmd/collection/main.go index fad3548d071..2319c5141b2 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -481,7 +481,6 @@ func main() { node.State, node.Metrics.Engine, node.Metrics.Mempool, - colMetrics, node.Me, node.Storage.Collections, node.Storage.Transactions, diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index bdd72bd3008..c2570fb17c8 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -31,7 +31,6 @@ import ( type Engine struct { log zerolog.Logger engMetrics module.EngineMetrics - colMetrics module.CollectionMetrics conduit network.Conduit me module.Local state protocol.State @@ -56,7 +55,6 @@ func New( state protocol.State, engMetrics module.EngineMetrics, mempoolMetrics module.MempoolMetrics, - colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions, @@ -76,7 +74,6 @@ func New( e := &Engine{ log: log.With().Str("engine", "pusher").Logger(), engMetrics: engMetrics, - colMetrics: colMetrics, me: me, state: state, collections: collections, @@ -100,7 +97,7 @@ func New( return e, nil } -// outboundQueueWorker implements a component worker which broadcasts collection guarantees, +// outboundQueueWorker implements a component worker which broadcasts collection guarantees, // enqueued by the Finalizer upon finalization, to Consensus Nodes. func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index 29c4d41a1b5..9f59f79f666 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -72,7 +72,6 @@ func (suite *Suite) SetupTest() { suite.state, metrics, metrics, - metrics, suite.me, suite.collections, suite.transactions, diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index ecab659bbfe..a602e628565 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -313,7 +313,7 @@ func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ro retrieve) require.NoError(t, err) - pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Metrics, node.Me, collections, transactions) + pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Me, collections, transactions) require.NoError(t, err) clusterStateFactory, err := factories.NewClusterStateFactory( From 88a7d450ba3549565886e1ef8e28da57d1c96cc9 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 2 Dec 2024 04:07:19 -0800 Subject: [PATCH 15/35] Refactor pusher engine: add error return doc comments --- engine/collection/pusher/engine.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index c2570fb17c8..30c79b84f7e 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -119,6 +119,7 @@ func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready co // processOutboundMessages processes any available messages from the queue. // Only returns when the queue is empty (or the engine is terminated). +// No errors expected during normal operations. func (e *Engine) processOutboundMessages(ctx context.Context) error { for { nextMessage, ok := e.queue.Pop() @@ -190,6 +191,7 @@ func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarant } // publishCollectionGuarantee publishes the collection guarantee to all consensus nodes. +// No errors expected during normal operation. func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) if err != nil { From cebf10001dbe74aa8f3ee16aefaadf798d2c1463 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 2 Dec 2024 16:04:28 -0800 Subject: [PATCH 16/35] Refactor pusher engine: add metrics for dropped messages Instead of logging an error, add to the metrics when the queue is full and a message needs to be dropped instead of sent. https://github.com/onflow/flow-go/blob/85913ad09e605fb5234155301bb0d517946a75a5/engine/collection/compliance/engine.go#L139-L146 --- engine/collection/pusher/engine.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 30c79b84f7e..c395f4debc1 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -182,12 +182,11 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, mes // SubmitCollectionGuarantee adds a collection guarantee to the engine's queue // to later be published to consensus nodes. func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) { - ok := e.queue.Push(msg) - if !ok { - engine.LogError(e.log, fmt.Errorf("failed to store collection guarantee in queue")) - return + if e.queue.Push(msg) { + e.notifier.Notify() + } else { + e.engMetrics.OutboundMessageDropped(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee) } - e.notifier.Notify() } // publishCollectionGuarantee publishes the collection guarantee to all consensus nodes. From 2048b4aec6fae1b628a6ac1fffd0762be02585a2 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 3 Dec 2024 15:15:50 -0800 Subject: [PATCH 17/35] Apply suggestions from code review --- engine/collection/pusher/engine.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index c395f4debc1..6e34e850dc9 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -69,8 +69,6 @@ func New( return nil, fmt.Errorf("could not create fifoqueue: %w", err) } - notifier := engine.NewNotifier() - e := &Engine{ log: log.With().Str("engine", "pusher").Logger(), engMetrics: engMetrics, @@ -79,7 +77,7 @@ func New( collections: collections, transactions: transactions, - notifier: notifier, + notifier: engine.NewNotifier(), queue: queue, } @@ -194,11 +192,12 @@ func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarant func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) if err != nil { - return fmt.Errorf("could not get consensus nodes: %w", err) + return fmt.Errorf("could not get consensus nodes' identities: %w", err) } - // NOTE: Consensus nodes do not broadcast guarantees among themselves, so it needs that - // at least one collection node make a publish to all of them. + // NOTE: Consensus nodes do not broadcast guarantees among themselves. So for the collection to be included, + // at least one collector has to successfully broadcast the collection to consensus nodes. Otherwise, the + // collection is lost, which is acceptable as long as we only lose a small fraction of collections. err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...) if err != nil { return fmt.Errorf("could not submit collection guarantee: %w", err) From 1fc3a1951baabc90fc8f1d457c20dbd23d499a3f Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 2 Dec 2024 13:10:31 -0800 Subject: [PATCH 18/35] Refactor pusher engine: change interface - Remove pusher engine implementation of network.Engine - Replace with network.MessageProcessor - See: https://github.com/onflow/flow-go/pull/6747#discussion_r1857479984 - Remove SubmitCollectionGuarantee message type - Was only used between Finalizer and Pusher engine - New interface passes and stores collection guarantees directly, instead of wrapping and then unwrapping them - See: https://github.com/onflow/flow-go/pull/6747#discussion_r1863953280 - Add GuaranteedCollectionPublisher interface, implemented by pusher engine - Only used by the Finalizer (and intermediate constructors) - Mocks are generated for it, used in Finalizer unit tests - See: https://github.com/onflow/flow-go/pull/6747#discussion_r1857672727 --- .../collection/epochmgr/factories/builder.go | 6 +- .../guaranteed_collection_publisher.go | 15 +++ .../mock/guaranteed_collection_publisher.go | 32 +++++ engine/collection/pusher/engine.go | 48 ++------ engine/collection/pusher/engine_test.go | 14 +-- model/messages/collection.go | 6 - module/finalizer/collection/finalizer.go | 21 ++-- module/finalizer/collection/finalizer_test.go | 113 ++++++++---------- 8 files changed, 124 insertions(+), 131 deletions(-) create mode 100644 engine/collection/guaranteed_collection_publisher.go create mode 100644 engine/collection/mock/guaranteed_collection_publisher.go diff --git a/engine/collection/epochmgr/factories/builder.go b/engine/collection/epochmgr/factories/builder.go index a00a73ac97e..a588e52a080 100644 --- a/engine/collection/epochmgr/factories/builder.go +++ b/engine/collection/epochmgr/factories/builder.go @@ -6,11 +6,11 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine/collection" "github.com/onflow/flow-go/module" builder "github.com/onflow/flow-go/module/builder/collection" finalizer "github.com/onflow/flow-go/module/finalizer/collection" "github.com/onflow/flow-go/module/mempool" - "github.com/onflow/flow-go/network" clusterstate "github.com/onflow/flow-go/state/cluster" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" @@ -23,7 +23,7 @@ type BuilderFactory struct { trace module.Tracer opts []builder.Opt metrics module.CollectionMetrics - pusher network.Engine // engine for pushing finalized collection to consensus committee + pusher collection.GuaranteedCollectionPublisher // engine for pushing finalized collection to consensus committee log zerolog.Logger } @@ -33,7 +33,7 @@ func NewBuilderFactory( mainChainHeaders storage.Headers, trace module.Tracer, metrics module.CollectionMetrics, - pusher network.Engine, + pusher collection.GuaranteedCollectionPublisher, log zerolog.Logger, opts ...builder.Opt, ) (*BuilderFactory, error) { diff --git a/engine/collection/guaranteed_collection_publisher.go b/engine/collection/guaranteed_collection_publisher.go new file mode 100644 index 00000000000..cb7cdd4c746 --- /dev/null +++ b/engine/collection/guaranteed_collection_publisher.go @@ -0,0 +1,15 @@ +package collection + +import ( + "github.com/onflow/flow-go/model/flow" +) + +// GuaranteedCollectionPublisher defines the interface to send collection guarantees +// from a collection node to consensus nodes. Collection guarantees are broadcast on a best-effort basis, +// and it is acceptable to discard some guarantees (especially those that are out of date). +// Implementation is non-blocking and concurrency safe. +type GuaranteedCollectionPublisher interface { + // SubmitCollectionGuarantee adds a guarantee to an internal queue + // to be published to consensus nodes. + SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) +} diff --git a/engine/collection/mock/guaranteed_collection_publisher.go b/engine/collection/mock/guaranteed_collection_publisher.go new file mode 100644 index 00000000000..9d58eb85560 --- /dev/null +++ b/engine/collection/mock/guaranteed_collection_publisher.go @@ -0,0 +1,32 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" +) + +// GuaranteedCollectionPublisher is an autogenerated mock type for the GuaranteedCollectionPublisher type +type GuaranteedCollectionPublisher struct { + mock.Mock +} + +// SubmitCollectionGuarantee provides a mock function with given fields: guarantee +func (_m *GuaranteedCollectionPublisher) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) { + _m.Called(guarantee) +} + +// NewGuaranteedCollectionPublisher creates a new instance of GuaranteedCollectionPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewGuaranteedCollectionPublisher(t interface { + mock.TestingT + Cleanup(func()) +}) *GuaranteedCollectionPublisher { + mock := &GuaranteedCollectionPublisher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 6e34e850dc9..1a20cb3b00b 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -13,7 +13,6 @@ import ( "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" - "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" @@ -44,8 +43,7 @@ type Engine struct { cm *component.ComponentManager } -// TODO convert to network.MessageProcessor -var _ network.Engine = (*Engine)(nil) +var _ network.MessageProcessor = (*Engine)(nil) var _ component.Component = (*Engine)(nil) // New creates a new pusher engine. @@ -120,17 +118,17 @@ func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready co // No errors expected during normal operations. func (e *Engine) processOutboundMessages(ctx context.Context) error { for { - nextMessage, ok := e.queue.Pop() + item, ok := e.queue.Pop() if !ok { return nil } - asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee) + guarantee, ok := item.(*flow.CollectionGuarantee) if !ok { - return fmt.Errorf("invalid message type in pusher engine queue") + return fmt.Errorf("invalid type in pusher engine queue") } - err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee) + err := e.publishCollectionGuarantee(guarantee) if err != nil { return err } @@ -143,44 +141,18 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error { } } -// SubmitLocal submits an event originating on the local node. -func (e *Engine) SubmitLocal(event interface{}) { - ev, ok := event.(*messages.SubmitCollectionGuarantee) - if ok { - e.SubmitCollectionGuarantee(ev) - } else { - engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine")) - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node")) -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - ev, ok := event.(*messages.SubmitCollectionGuarantee) - if ok { - e.SubmitCollectionGuarantee(ev) - return nil - } else { - return fmt.Errorf("invalid message argument to pusher engine") - } -} - // Process processes the given event from the node with the given origin ID in -// a blocking manner. It returns the potential processing error when done. +// a non-blocking manner. It returns the potential processing error when done. +// Because the pusher engine does not accept inputs from the network, +// always drop any messages and return an error. func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { return fmt.Errorf("pusher engine should only receive local messages on the same node") } // SubmitCollectionGuarantee adds a collection guarantee to the engine's queue // to later be published to consensus nodes. -func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) { - if e.queue.Push(msg) { +func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) { + if e.queue.Push(guarantee) { e.notifier.Notify() } else { e.engMetrics.OutboundMessageDropped(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee) diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index 9f59f79f666..0375d72bda5 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -13,7 +13,6 @@ import ( "github.com/onflow/flow-go/engine/collection/pusher" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" - "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" @@ -97,11 +96,9 @@ func (suite *Suite) TestSubmitCollectionGuarantee() { suite.conduit.On("Publish", guarantee, consensus[0].NodeID). Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once() - msg := &messages.SubmitCollectionGuarantee{ - Guarantee: *guarantee, - } - err := suite.engine.ProcessLocal(msg) - suite.Require().Nil(err) + suite.engine.SubmitCollectionGuarantee(guarantee) + // TODO signature? + //suite.Require().Nil(err) unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent") @@ -116,10 +113,7 @@ func (suite *Suite) TestSubmitCollectionGuaranteeNonLocal() { // send from a non-allowed role sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0] - msg := &messages.SubmitCollectionGuarantee{ - Guarantee: *guarantee, - } - err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg) + err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee) suite.Require().Error(err) suite.conduit.AssertNumberOfCalls(suite.T(), "Multicast", 0) diff --git a/model/messages/collection.go b/model/messages/collection.go index 3ef3251698b..2b4273c900d 100644 --- a/model/messages/collection.go +++ b/model/messages/collection.go @@ -5,12 +5,6 @@ import ( "github.com/onflow/flow-go/model/flow" ) -// SubmitCollectionGuarantee is a request to submit the given collection -// guarantee to consensus nodes. Only valid as a node-local message. -type SubmitCollectionGuarantee struct { - Guarantee flow.CollectionGuarantee -} - // CollectionRequest request all transactions from a collection with the given // fingerprint. type CollectionRequest struct { diff --git a/module/finalizer/collection/finalizer.go b/module/finalizer/collection/finalizer.go index bfe1d76ae4f..c6fd7cf8d77 100644 --- a/module/finalizer/collection/finalizer.go +++ b/module/finalizer/collection/finalizer.go @@ -5,12 +5,11 @@ import ( "github.com/dgraph-io/badger/v2" + "github.com/onflow/flow-go/engine/collection" "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/mempool" - "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" ) @@ -22,7 +21,7 @@ import ( type Finalizer struct { db *badger.DB transactions mempool.Transactions - prov network.Engine + prov collection.GuaranteedCollectionPublisher metrics module.CollectionMetrics } @@ -30,7 +29,7 @@ type Finalizer struct { func NewFinalizer( db *badger.DB, transactions mempool.Transactions, - prov network.Engine, + prov collection.GuaranteedCollectionPublisher, metrics module.CollectionMetrics, ) *Finalizer { f := &Finalizer{ @@ -160,14 +159,12 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error { // collection. // TODO add real signatures here (2711) - f.prov.SubmitLocal(&messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: payload.Collection.ID(), - ReferenceBlockID: payload.ReferenceBlockID, - ChainID: header.ChainID, - SignerIndices: step.ParentVoterIndices, - Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes - }, + f.prov.SubmitCollectionGuarantee(&flow.CollectionGuarantee{ + CollectionID: payload.Collection.ID(), + ReferenceBlockID: payload.ReferenceBlockID, + ChainID: header.ChainID, + SignerIndices: step.ParentVoterIndices, + Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes }) } diff --git a/module/finalizer/collection/finalizer_test.go b/module/finalizer/collection/finalizer_test.go index fa92d3eeafe..21cce0a509a 100644 --- a/module/finalizer/collection/finalizer_test.go +++ b/module/finalizer/collection/finalizer_test.go @@ -8,13 +8,12 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + collectionmock "github.com/onflow/flow-go/engine/collection/mock" model "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module/finalizer/collection" "github.com/onflow/flow-go/module/mempool/herocache" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/network/mocknetwork" cluster "github.com/onflow/flow-go/state/cluster/badger" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" @@ -65,8 +64,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) + prov := new(collectionmock.GuaranteedCollectionPublisher) + prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) fakeBlockID := unittest.IdentifierFixture() @@ -78,8 +77,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) + prov := new(collectionmock.GuaranteedCollectionPublisher) + prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the finalized block @@ -104,8 +103,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) + prov := new(collectionmock.GuaranteedCollectionPublisher) + prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // create a new block that isn't connected to a parent @@ -123,7 +122,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) + prov := new(collectionmock.GuaranteedCollectionPublisher) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // create a block with empty payload on genesis @@ -141,15 +140,15 @@ func TestFinalizer(t *testing.T) { assert.Equal(t, block.ID(), final.ID()) // collection should not have been propagated - prov.AssertNotCalled(t, "SubmitLocal", mock.Anything) + prov.AssertNotCalled(t, "SubmitCollectionGuarantee", mock.Anything) }) t.Run("finalize single block", func(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) + prov := new(collectionmock.GuaranteedCollectionPublisher) + prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the finalized block and mempool @@ -180,15 +179,13 @@ func TestFinalizer(t *testing.T) { assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, final.ID()) // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 1) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block.Header.ChainID, - SignerIndices: block.Header.ParentVoterIndices, - Signature: nil, - }, + prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1) + prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block.Header.ChainID, + SignerIndices: block.Header.ParentVoterIndices, + Signature: nil, }) }) @@ -197,8 +194,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) + prov := new(collectionmock.GuaranteedCollectionPublisher) + prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the first finalized block and mempool @@ -233,24 +230,20 @@ func TestFinalizer(t *testing.T) { assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID(), block2.ID()) // both blocks should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 2) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }, + prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 2) + prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, }) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block2.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block2.Header.ChainID, - SignerIndices: block2.Header.ParentVoterIndices, - Signature: nil, - }, + prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block2.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block2.Header.ChainID, + SignerIndices: block2.Header.ParentVoterIndices, + Signature: nil, }) }) @@ -258,8 +251,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) + prov := new(collectionmock.GuaranteedCollectionPublisher) + prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the finalized parent block and mempool @@ -295,15 +288,13 @@ func TestFinalizer(t *testing.T) { assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 1) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }, + prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1) + prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, }) }) @@ -312,8 +303,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) + prov := new(collectionmock.GuaranteedCollectionPublisher) + prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the finalized block and mempool @@ -349,15 +340,13 @@ func TestFinalizer(t *testing.T) { assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 1) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }, + prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1) + prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, }) }) }) From 9e37f141a26972c434ef7315149331c740415ec6 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 5 Dec 2024 16:03:19 -0800 Subject: [PATCH 19/35] Apply suggestions from code review --- engine/collection/pusher/engine.go | 2 +- engine/collection/pusher/engine_test.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 1a20cb3b00b..dada7adec72 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -146,7 +146,7 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error { // Because the pusher engine does not accept inputs from the network, // always drop any messages and return an error. func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { - return fmt.Errorf("pusher engine should only receive local messages on the same node") + return fmt.Errorf("pusher engine should only receive local messages on the same node: got message %T on channel %v from origin %v", message, channel, originID) } // SubmitCollectionGuarantee adds a collection guarantee to the engine's queue diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index 0375d72bda5..e7cf8831dcc 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -97,8 +97,6 @@ func (suite *Suite) TestSubmitCollectionGuarantee() { Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once() suite.engine.SubmitCollectionGuarantee(guarantee) - // TODO signature? - //suite.Require().Nil(err) unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent") From d157f489b4bc68713b46269a90be534c99a7ee0c Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 5 Dec 2024 16:16:25 -0800 Subject: [PATCH 20/35] Update mock checks - Construct the mock objects with their `.New___()` method instead of using golang's built-in `new` function, which enables automatic cleanup. - Replace explicit AssertCalled checks at the end of tests with .On() and .Once() which will automatically be checked at the end of the test. --- module/finalizer/collection/finalizer_test.go | 110 ++++++++---------- 1 file changed, 51 insertions(+), 59 deletions(-) diff --git a/module/finalizer/collection/finalizer_test.go b/module/finalizer/collection/finalizer_test.go index 21cce0a509a..648b303521c 100644 --- a/module/finalizer/collection/finalizer_test.go +++ b/module/finalizer/collection/finalizer_test.go @@ -64,7 +64,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) @@ -77,7 +77,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) @@ -103,7 +103,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) prov.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, prov, metrics) @@ -122,7 +122,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // create a block with empty payload on genesis @@ -147,8 +147,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) - prov.On("SubmitCollectionGuarantee", mock.Anything) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the finalized block and mempool @@ -163,6 +162,15 @@ func TestFinalizer(t *testing.T) { block.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block) + // block should be passed to provider + prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block.Header.ChainID, + SignerIndices: block.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize the block err := finalizer.MakeFinal(block.ID()) assert.Nil(t, err) @@ -177,16 +185,6 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, final.ID()) - - // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1) - prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ - CollectionID: block.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block.Header.ChainID, - SignerIndices: block.Header.ParentVoterIndices, - Signature: nil, - }) }) // when finalizing a block with un-finalized ancestors, those ancestors should be finalized as well @@ -194,8 +192,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) - prov.On("SubmitCollectionGuarantee", mock.Anything) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the first finalized block and mempool @@ -215,6 +212,22 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) + // both blocks should be passed to provider + prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, + }).Once() + prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block2.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block2.Header.ChainID, + SignerIndices: block2.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize block2 (should indirectly finalize block1 as well) err := finalizer.MakeFinal(block2.ID()) assert.Nil(t, err) @@ -228,31 +241,13 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block2.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID(), block2.ID()) - - // both blocks should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 2) - prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }) - prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ - CollectionID: block2.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block2.Header.ChainID, - SignerIndices: block2.Header.ParentVoterIndices, - Signature: nil, - }) }) t.Run("finalize with un-finalized child", func(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) - prov.On("SubmitCollectionGuarantee", mock.Anything) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the finalized parent block and mempool @@ -272,6 +267,15 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) + // block should be passed to provider + prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize block1 (should NOT finalize block2) err := finalizer.MakeFinal(block1.ID()) assert.Nil(t, err) @@ -286,16 +290,6 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block1.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) - - // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1) - prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }) }) // when finalizing a block with a conflicting fork, the fork should not be finalized. @@ -303,8 +297,7 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(collectionmock.GuaranteedCollectionPublisher) - prov.On("SubmitCollectionGuarantee", mock.Anything) + prov := collectionmock.NewGuaranteedCollectionPublisher(t) finalizer := collection.NewFinalizer(db, pool, prov, metrics) // tx1 is included in the finalized block and mempool @@ -324,6 +317,15 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) + // block should be passed to provider + prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize block1 err := finalizer.MakeFinal(block1.ID()) assert.Nil(t, err) @@ -338,16 +340,6 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block1.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) - - // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1) - prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }) }) }) } From eacc99232398a224a88445f20c6a4f6945046718 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 5 Dec 2024 16:28:20 -0800 Subject: [PATCH 21/35] Rename prov to pusher in Finalizer --- module/finalizer/collection/finalizer.go | 8 +-- module/finalizer/collection/finalizer_test.go | 58 +++++++++---------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/module/finalizer/collection/finalizer.go b/module/finalizer/collection/finalizer.go index c6fd7cf8d77..ae6e83e5a96 100644 --- a/module/finalizer/collection/finalizer.go +++ b/module/finalizer/collection/finalizer.go @@ -21,7 +21,7 @@ import ( type Finalizer struct { db *badger.DB transactions mempool.Transactions - prov collection.GuaranteedCollectionPublisher + pusher collection.GuaranteedCollectionPublisher metrics module.CollectionMetrics } @@ -29,13 +29,13 @@ type Finalizer struct { func NewFinalizer( db *badger.DB, transactions mempool.Transactions, - prov collection.GuaranteedCollectionPublisher, + pusher collection.GuaranteedCollectionPublisher, metrics module.CollectionMetrics, ) *Finalizer { f := &Finalizer{ db: db, transactions: transactions, - prov: prov, + pusher: pusher, metrics: metrics, } return f @@ -159,7 +159,7 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error { // collection. // TODO add real signatures here (2711) - f.prov.SubmitCollectionGuarantee(&flow.CollectionGuarantee{ + f.pusher.SubmitCollectionGuarantee(&flow.CollectionGuarantee{ CollectionID: payload.Collection.ID(), ReferenceBlockID: payload.ReferenceBlockID, ChainID: header.ChainID, diff --git a/module/finalizer/collection/finalizer_test.go b/module/finalizer/collection/finalizer_test.go index 648b303521c..dc08af7d957 100644 --- a/module/finalizer/collection/finalizer_test.go +++ b/module/finalizer/collection/finalizer_test.go @@ -64,9 +64,9 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - prov.On("SubmitCollectionGuarantee", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + pusher.On("SubmitCollectionGuarantee", mock.Anything) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) fakeBlockID := unittest.IdentifierFixture() err := finalizer.MakeFinal(fakeBlockID) @@ -77,9 +77,9 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - prov.On("SubmitCollectionGuarantee", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + pusher.On("SubmitCollectionGuarantee", mock.Anything) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized block tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -103,9 +103,9 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - prov.On("SubmitCollectionGuarantee", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + pusher.On("SubmitCollectionGuarantee", mock.Anything) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // create a new block that isn't connected to a parent block := unittest.ClusterBlockWithParent(genesis) @@ -122,8 +122,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // create a block with empty payload on genesis block := unittest.ClusterBlockWithParent(genesis) @@ -140,15 +140,15 @@ func TestFinalizer(t *testing.T) { assert.Equal(t, block.ID(), final.ID()) // collection should not have been propagated - prov.AssertNotCalled(t, "SubmitCollectionGuarantee", mock.Anything) + pusher.AssertNotCalled(t, "SubmitCollectionGuarantee", mock.Anything) }) t.Run("finalize single block", func(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -162,8 +162,8 @@ func TestFinalizer(t *testing.T) { block.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block) - // block should be passed to provider - prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + // block should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ CollectionID: block.Payload.Collection.ID(), ReferenceBlockID: refBlock.ID(), ChainID: block.Header.ChainID, @@ -192,8 +192,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the first finalized block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -212,15 +212,15 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) - // both blocks should be passed to provider - prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + // both blocks should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ CollectionID: block1.Payload.Collection.ID(), ReferenceBlockID: refBlock.ID(), ChainID: block1.Header.ChainID, SignerIndices: block1.Header.ParentVoterIndices, Signature: nil, }).Once() - prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ CollectionID: block2.Payload.Collection.ID(), ReferenceBlockID: refBlock.ID(), ChainID: block2.Header.ChainID, @@ -247,8 +247,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized parent block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -267,8 +267,8 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) - // block should be passed to provider - prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + // block should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ CollectionID: block1.Payload.Collection.ID(), ReferenceBlockID: refBlock.ID(), ChainID: block1.Header.ChainID, @@ -297,8 +297,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := collectionmock.NewGuaranteedCollectionPublisher(t) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -317,8 +317,8 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) - // block should be passed to provider - prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + // block should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ CollectionID: block1.Payload.Collection.ID(), ReferenceBlockID: refBlock.ID(), ChainID: block1.Header.ChainID, From f9e16bf6834a30a6e6c8b5af7d6042322cfc14bb Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 5 Dec 2024 17:02:38 -0800 Subject: [PATCH 22/35] Update finalizer tests to expect correct number of calls --- module/finalizer/collection/finalizer_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/module/finalizer/collection/finalizer_test.go b/module/finalizer/collection/finalizer_test.go index dc08af7d957..b9b1f38bc19 100644 --- a/module/finalizer/collection/finalizer_test.go +++ b/module/finalizer/collection/finalizer_test.go @@ -65,7 +65,6 @@ func TestFinalizer(t *testing.T) { defer cleanup() pusher := collectionmock.NewGuaranteedCollectionPublisher(t) - pusher.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, pusher, metrics) fakeBlockID := unittest.IdentifierFixture() @@ -78,7 +77,7 @@ func TestFinalizer(t *testing.T) { defer cleanup() pusher := collectionmock.NewGuaranteedCollectionPublisher(t) - pusher.On("SubmitCollectionGuarantee", mock.Anything) + pusher.On("SubmitCollectionGuarantee", mock.Anything).Once() finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized block @@ -104,7 +103,6 @@ func TestFinalizer(t *testing.T) { defer cleanup() pusher := collectionmock.NewGuaranteedCollectionPublisher(t) - pusher.On("SubmitCollectionGuarantee", mock.Anything) finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // create a new block that isn't connected to a parent From 42b23317e7fe3669913ffde9bff0dfd7a7a23bef Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:01:47 -0800 Subject: [PATCH 23/35] Update module/finalizer/collection/finalizer.go comment Co-authored-by: Jordan Schalm --- module/finalizer/collection/finalizer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/finalizer/collection/finalizer.go b/module/finalizer/collection/finalizer.go index ae6e83e5a96..e5b2e21904f 100644 --- a/module/finalizer/collection/finalizer.go +++ b/module/finalizer/collection/finalizer.go @@ -158,7 +158,7 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error { // For now, we just use the parent signers as the guarantors of this // collection. - // TODO add real signatures here (2711) + // TODO add real signatures here (https://github.com/onflow/flow-go-internal/issues/4569) f.pusher.SubmitCollectionGuarantee(&flow.CollectionGuarantee{ CollectionID: payload.Collection.ID(), ReferenceBlockID: payload.ReferenceBlockID, From f58ccaf8dea3ec7e4eccff8b4bfe809951078ac8 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 10 Dec 2024 12:10:58 -0800 Subject: [PATCH 24/35] Apply suggestions from code review Improve documentation for Process method of pusher engine, and log an error instead of returning an error. See: https://github.com/onflow/flow-go/pull/6780#discussion_r1876742974 Co-authored-by: Alexander Hentschel --- engine/collection/pusher/engine.go | 24 +++++++++++++++++++----- engine/collection/pusher/engine_test.go | 8 +++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index dada7adec72..d132024346b 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -141,12 +141,26 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error { } } -// Process processes the given event from the node with the given origin ID in -// a non-blocking manner. It returns the potential processing error when done. -// Because the pusher engine does not accept inputs from the network, -// always drop any messages and return an error. +// Process is called by the networking layer, when peers broadcast messages with this node +// as one of the recipients. The protocol specifies that Collector nodes broadcast Collection +// Guarantees to Consensus Nodes and _only_ those. When the pusher engine (running only on +// Collectors) receives a message, this message is evidence of byzantine behavior. +// Byzantine inputs are internally handled by the pusher.Engine and do *not* result in +// error returns. No errors expected during normal operation (including byzantine inputs). func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { - return fmt.Errorf("pusher engine should only receive local messages on the same node: got message %T on channel %v from origin %v", message, channel, originID) + // Targeting a collector node's pusher.Engine with messages could be considered as a slashable offense. + // Though, for generating cryptographic evidence, we need Message Forensics - see reference [1]. + // Much further into the future, when we are implementing slashing challenges, we'll probably implement a + // dedicated consumer to post-process evidence of protocol violations into slashing challenges. For now, + // we just log this with the `KeySuspicious` to alert the node operator. + // [1] Message Forensics FLIP https://github.com/onflow/flips/pull/195) + errs := fmt.Errorf("collector node's pusher.Engine was targeted by message %T on channel %v", message, channel) + e.log.Warn(). + Err(errs). + Bool(logging.KeySuspicious, true). + Str("peer_id", originID.String()). + Msg("potentially byzantine networking traffic detected") + return nil } // SubmitCollectionGuarantee adds a collection guarantee to the engine's queue diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index e7cf8831dcc..746957071f8 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -108,11 +108,13 @@ func (suite *Suite) TestSubmitCollectionGuaranteeNonLocal() { guarantee := unittest.CollectionGuaranteeFixture() - // send from a non-allowed role + // verify that pusher.Engine handles any (potentially byzantine) input: + // A byzantine peer could target the collector node's pusher engine with messages + // The pusher should discard those and explicitly not get tricked into broadcasting + // collection guarantees which a byzantine peer might try to inject into the system. sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0] err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee) - suite.Require().Error(err) - + suite.Require().NoError(err) suite.conduit.AssertNumberOfCalls(suite.T(), "Multicast", 0) } From 4f1906e96f9b31a0e5c4ecef3ef839322950a2cf Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 9 Dec 2024 12:18:33 -0800 Subject: [PATCH 25/35] Remove engine.Unit from Assigner engine and Fetcher engine The engine.Unit was only used for its context. In similar situations, other code uses a context.Background() instead. --- engine/verification/assigner/engine.go | 17 +++++------------ engine/verification/fetcher/engine.go | 13 ++++--------- 2 files changed, 9 insertions(+), 21 deletions(-) diff --git a/engine/verification/assigner/engine.go b/engine/verification/assigner/engine.go index ba2e7d2d1f7..47b9eec819d 100644 --- a/engine/verification/assigner/engine.go +++ b/engine/verification/assigner/engine.go @@ -8,7 +8,6 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/model/chunks" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" @@ -24,7 +23,6 @@ import ( // to me to verify, and then save it to the chunks job queue for the // fetcher engine to process. type Engine struct { - unit *engine.Unit log zerolog.Logger metrics module.VerificationMetrics tracer module.Tracer @@ -36,8 +34,11 @@ type Engine struct { blockConsumerNotifier module.ProcessingNotifier // to report a block has been processed. stopAtHeight uint64 stopAtBlockID atomic.Value + *module.NoopReadyDoneAware } +var _ module.ReadyDoneAware = (*Engine)(nil) + func New( log zerolog.Logger, metrics module.VerificationMetrics, @@ -50,7 +51,6 @@ func New( stopAtHeight uint64, ) *Engine { e := &Engine{ - unit: engine.NewUnit(), log: log.With().Str("engine", "assigner").Logger(), metrics: metrics, tracer: tracer, @@ -69,14 +69,6 @@ func (e *Engine) WithBlockConsumerNotifier(notifier module.ProcessingNotifier) { e.blockConsumerNotifier = notifier } -func (e *Engine) Ready() <-chan struct{} { - return e.unit.Ready() -} - -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() -} - // resultChunkAssignment receives an execution result that appears in a finalized incorporating block. // In case this verification node is authorized at the reference block of this execution receipt's result, // chunk assignment is computed for the result, and the list of assigned chunks returned. @@ -164,7 +156,8 @@ func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier, block func (e *Engine) ProcessFinalizedBlock(block *flow.Block) { blockID := block.ID() - span, ctx := e.tracer.StartBlockSpan(e.unit.Ctx(), blockID, trace.VERProcessFinalizedBlock) + // We don't have any existing information and don't need cancellation, so use a background (empty) context + span, ctx := e.tracer.StartBlockSpan(context.Background(), blockID, trace.VERProcessFinalizedBlock) defer span.End() e.processFinalizedBlock(ctx, block) diff --git a/engine/verification/fetcher/engine.go b/engine/verification/fetcher/engine.go index 20afad04021..e384d834ac5 100644 --- a/engine/verification/fetcher/engine.go +++ b/engine/verification/fetcher/engine.go @@ -34,7 +34,6 @@ import ( // to the verifier engine. type Engine struct { // common - unit *engine.Unit state protocol.State // used to verify the origin ID of chunk data response, and sealing status. // monitoring @@ -72,7 +71,6 @@ func New( stopAtHeight uint64, ) *Engine { e := &Engine{ - unit: engine.NewUnit(), metrics: metrics, tracer: tracer, log: log.With().Str("engine", "fetcher").Logger(), @@ -104,16 +102,12 @@ func (e *Engine) Ready() <-chan struct{} { if e.chunkConsumerNotifier == nil { e.log.Fatal().Msg("missing chunk consumer notifier callback in verification fetcher engine") } - return e.unit.Ready(func() { - <-e.requester.Ready() - }) + return e.requester.Ready() } // Done terminates the engine and returns a channel that is closed when the termination is done func (e *Engine) Done() <-chan struct{} { - return e.unit.Done(func() { - <-e.requester.Done() - }) + return e.requester.Done() } // ProcessAssignedChunk is the entry point of fetcher engine. @@ -169,7 +163,8 @@ func (e *Engine) ProcessAssignedChunk(locator *chunks.Locator) { // processAssignedChunkWithTracing encapsulates the logic of processing assigned chunk with tracing enabled. func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, uint64, error) { - span, _ := e.tracer.StartBlockSpan(e.unit.Ctx(), result.BlockID, trace.VERProcessAssignedChunk) + // We don't have any existing information and don't need cancellation, so use a background (empty) context + span, _ := e.tracer.StartBlockSpan(context.Background(), result.BlockID, trace.VERProcessAssignedChunk) span.SetAttributes(attribute.Int("collection_index", int(chunk.CollectionIndex))) defer span.End() From e73b3f30e85bc537848668930aa4a681c6f0b6ba Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 10 Dec 2024 14:19:40 -0800 Subject: [PATCH 26/35] WIP - Initial refactor of AsyncUploader Replace engine.Unit with component.Component - Arbitrarily set number of workers to 3 Uses a fifoqueue to buffer tasks (planned to replace with channel, since we would rather wait to upload than let a block be not uploaded). Does not correctly propagate errors, or calculate any new metrics yet. --- .../execution/ingestion/uploader/uploader.go | 131 +++++++++++++----- 1 file changed, 98 insertions(+), 33 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index f9486ea99d8..094de50314e 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -2,13 +2,17 @@ package uploader import ( "context" + "fmt" "time" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/utils/logging" "github.com/sethvargo/go-retry" @@ -26,74 +30,135 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber uint64, log zerolog.Logger, metrics module.ExecutionMetrics) *AsyncUploader { - return &AsyncUploader{ - unit: engine.NewUnit(), + // TODO queue size, add length metrics and check error + queue, _ := fifoqueue.NewFifoQueue(1000) + a := &AsyncUploader{ uploader: uploader, log: log.With().Str("component", "block_data_uploader").Logger(), metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, + queue: queue, + notifier: engine.NewNotifier(), } + builder := component.NewComponentManagerBuilder() + for range 3 { + builder.AddWorker(a.UploadWorker) + } + a.cm = builder.Build() + a.Component = a.cm + return a } // AsyncUploader wraps up another Uploader instance and make its upload asynchronous type AsyncUploader struct { - module.ReadyDoneAware - unit *engine.Unit uploader Uploader log zerolog.Logger metrics module.ExecutionMetrics retryInitialTimeout time.Duration maxRetryNumber uint64 onComplete OnCompleteFunc // callback function called after Upload is completed + queue *fifoqueue.FifoQueue + notifier engine.Notifier + cm *component.ComponentManager + component.Component + // TODO Replace fifoqueue with channel, and make Upload() blocking } -func (a *AsyncUploader) Ready() <-chan struct{} { - return a.unit.Ready() +// UploadWorker implements a component worker which asynchronously uploads computation results +// from the execution node (after a block is executed) to storage such as a GCP bucket or S3 bucket. +func (a *AsyncUploader) UploadWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + done := ctx.Done() + wake := a.notifier.Channel() + for { + select { + case <-done: + return + case <-wake: + err := a.processUploadTasks(ctx) + if err != nil { + ctx.Throw(err) + } + } + } } -func (a *AsyncUploader) Done() <-chan struct{} { - return a.unit.Done() +// processUploadTasks processes any available tasks from the queue. +// Only returns when the queue is empty (or the component is terminated). +// No errors expected during normal operation. +func (a *AsyncUploader) processUploadTasks(ctx context.Context) error { + for { + item, ok := a.queue.Pop() + if !ok { + return nil + } + + computationResult, ok := item.(*execution.ComputationResult) + if !ok { + return fmt.Errorf("invalid type in AsyncUploader queue") + } + + a.UploadTask(ctx, computationResult) + select { + case <-ctx.Done(): + return nil + default: + } + } } func (a *AsyncUploader) SetOnCompleteCallback(onComplete OnCompleteFunc) { a.onComplete = onComplete } +// Upload adds the computation result to a queue to be processed asynchronously by workers, +// ensuring that multiple uploads can be run in parallel. +// No errors expected during normal operation. func (a *AsyncUploader) Upload(computationResult *execution.ComputationResult) error { + if a.queue.Push(computationResult) { + a.notifier.Notify() + } else { + // TODO record in metrics + } + return nil +} +// UploadTask implements retrying for uploading computation results. +// When the upload is complete, the callback will be called with the result (for example, +// to record that the upload was successful) and any error. +// No errors expected during normal operation. +func (a *AsyncUploader) UploadTask(ctx context.Context, computationResult *execution.ComputationResult) { backoff := retry.NewFibonacci(a.retryInitialTimeout) backoff = retry.WithMaxRetries(a.maxRetryNumber, backoff) - a.unit.Launch(func() { - a.metrics.ExecutionBlockDataUploadStarted() - start := time.Now() + a.metrics.ExecutionBlockDataUploadStarted() + start := time.Now() - a.log.Debug().Msgf("computation result of block %s is being uploaded", - computationResult.ExecutableBlock.ID().String()) - - err := retry.Do(a.unit.Ctx(), backoff, func(ctx context.Context) error { - err := a.uploader.Upload(computationResult) - if err != nil { - a.log.Warn().Err(err).Msg("error while uploading block data, retrying") - } - return retry.RetryableError(err) - }) + a.log.Debug().Msgf("computation result of block %s is being uploaded", + computationResult.ExecutableBlock.ID().String()) + err := retry.Do(ctx, backoff, func(ctx context.Context) error { + err := a.uploader.Upload(computationResult) if err != nil { - a.log.Error().Err(err). - Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). - Msg("failed to upload block data") - } else { - a.log.Debug().Msgf("computation result of block %s was successfully uploaded", - computationResult.ExecutableBlock.ID().String()) + a.log.Warn().Err(err).Msg("error while uploading block data, retrying") } + return retry.RetryableError(err) + }) - a.metrics.ExecutionBlockDataUploadFinished(time.Since(start)) + if err != nil { + a.log.Error().Err(err). + Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). + Msg("failed to upload block data") + } else { + a.log.Debug().Msgf("computation result of block %s was successfully uploaded", + computationResult.ExecutableBlock.ID().String()) + } - if a.onComplete != nil { - a.onComplete(computationResult, err) - } - }) - return nil + a.metrics.ExecutionBlockDataUploadFinished(time.Since(start)) + + if a.onComplete != nil { + a.onComplete(computationResult, err) + } } From 9c57c77c1772cb258e0be1dda91cb133e2b5e473 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 10:02:02 -0800 Subject: [PATCH 27/35] Convert BadgerRetryableUploadWrapper to Component BadgerRetryableUploadWrapper wraps an AsyncUploader. Now that the AsyncUploader is a Component, it needs to be `Start()`ed. The retryable wrapper did not itself do anything special on ready/done, so the Component functionality is directly delegated to the wrapped AsyncUploader. --- .../uploader/retryable_uploader_wrapper.go | 11 +++-------- .../uploader/retryable_uploader_wrapper_test.go | 14 +++++++++++--- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go b/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go index ecad4801741..418177b4dc8 100644 --- a/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go +++ b/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/storage" @@ -34,6 +35,7 @@ type BadgerRetryableUploaderWrapper struct { results storage.ExecutionResults transactionResults storage.TransactionResults uploadStatusStore storage.ComputationResultUploadStatus + component.Component } func NewBadgerRetryableUploaderWrapper( @@ -99,17 +101,10 @@ func NewBadgerRetryableUploaderWrapper( results: results, transactionResults: transactionResults, uploadStatusStore: uploadStatusStore, + Component: uploader, // delegate to the AsyncUploader } } -func (b *BadgerRetryableUploaderWrapper) Ready() <-chan struct{} { - return b.uploader.Ready() -} - -func (b *BadgerRetryableUploaderWrapper) Done() <-chan struct{} { - return b.uploader.Done() -} - func (b *BadgerRetryableUploaderWrapper) Upload(computationResult *execution.ComputationResult) error { if computationResult == nil || computationResult.ExecutableBlock == nil || computationResult.ExecutableBlock.Block == nil { diff --git a/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go b/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go index 491307705eb..adff343f61f 100644 --- a/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go +++ b/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go @@ -1,6 +1,7 @@ package uploader import ( + "context" "sync" "testing" "time" @@ -11,6 +12,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/executiondatasync/execution_data" executionDataMock "github.com/onflow/flow-go/module/executiondatasync/execution_data/mock" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/module/metrics" @@ -26,6 +28,8 @@ import ( ) func Test_Upload_invoke(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wg := sync.WaitGroup{} uploaderCalled := false @@ -40,7 +44,7 @@ func Test_Upload_invoke(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) // nil input - no call to Upload() err := testRetryableUploaderWrapper.Upload(nil) @@ -58,6 +62,8 @@ func Test_Upload_invoke(t *testing.T) { } func Test_RetryUpload(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wg := sync.WaitGroup{} wg.Add(1) uploaderCalled := false @@ -72,7 +78,7 @@ func Test_RetryUpload(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) err := testRetryableUploaderWrapper.RetryUpload() wg.Wait() @@ -82,6 +88,8 @@ func Test_RetryUpload(t *testing.T) { } func Test_AsyncUploaderCallback(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wgUploadCalleded := sync.WaitGroup{} wgUploadCalleded.Add(1) @@ -95,7 +103,7 @@ func Test_AsyncUploaderCallback(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) testComputationResult := createTestComputationResult() err := testRetryableUploaderWrapper.Upload(testComputationResult) From f7709c8df59534dadfd8c2b6880c8ae4cab474e5 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 10:41:08 -0800 Subject: [PATCH 28/35] Update AsyncUploader tests Since AsyncUploader now implements Component instead of ReadyDoneAware, update the methods used to start and end the AsyncUploader (using a context and its cancel() function). Test "uploads are run in parallel" currently failing (due to only one worker taking tasks from the queue?) --- .../ingestion/uploader/uploader_test.go | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader_test.go b/engine/execution/ingestion/uploader/uploader_test.go index c14fa683715..f4ee13883a4 100644 --- a/engine/execution/ingestion/uploader/uploader_test.go +++ b/engine/execution/ingestion/uploader/uploader_test.go @@ -2,6 +2,7 @@ package uploader import ( "bytes" + "context" "fmt" "runtime/debug" "sync" @@ -13,17 +14,17 @@ import ( "go.uber.org/atomic" "github.com/onflow/flow-go/engine/execution" - "github.com/onflow/flow-go/engine/execution/state/unittest" + exeunittest "github.com/onflow/flow-go/engine/execution/state/unittest" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" - testutils "github.com/onflow/flow-go/utils/unittest" - unittest2 "github.com/onflow/flow-go/utils/unittest" + "github.com/onflow/flow-go/utils/unittest" ) func Test_AsyncUploader(t *testing.T) { - computationResult := unittest.ComputationResultFixture( + computationResult := exeunittest.ComputationResultFixture( t, - testutils.IdentifierFixture(), + unittest.IdentifierFixture(), nil) t.Run("uploads are run in parallel and emit metrics", func(t *testing.T) { @@ -46,6 +47,8 @@ func Test_AsyncUploader(t *testing.T) { metrics := &DummyCollector{} async := NewAsyncUploader(uploader, 1*time.Nanosecond, 1, zerolog.Nop(), metrics) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) err := async.Upload(computationResult) require.NoError(t, err) @@ -63,6 +66,8 @@ func Test_AsyncUploader(t *testing.T) { wgContinueUpload.Done() //release all // shut down component + cancel() + unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader did not finish in time") <-async.Done() require.Equal(t, int64(0), metrics.Counter.Load()) @@ -89,6 +94,9 @@ func Test_AsyncUploader(t *testing.T) { } async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{}) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) + defer cancel() err := async.Upload(computationResult) require.NoError(t, err) @@ -107,7 +115,7 @@ func Test_AsyncUploader(t *testing.T) { // 2. shut down async uploader right after upload initiated (not completed) // 3. assert that upload called only once even when trying to use retry mechanism t.Run("stopping component stops retrying", func(t *testing.T) { - testutils.SkipUnless(t, testutils.TEST_FLAKY, "flaky") + unittest.SkipUnless(t, unittest.TEST_FLAKY, "flaky") callCount := 0 t.Log("test started grID:", string(bytes.Fields(debug.Stack())[1])) @@ -151,6 +159,8 @@ func Test_AsyncUploader(t *testing.T) { } t.Log("about to create NewAsyncUploader grID:", string(bytes.Fields(debug.Stack())[1])) async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{}) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) t.Log("about to call async.Upload() grID:", string(bytes.Fields(debug.Stack())[1])) err := async.Upload(computationResult) // doesn't matter what we upload require.NoError(t, err) @@ -163,11 +173,11 @@ func Test_AsyncUploader(t *testing.T) { // stop component and check that it's fully stopped t.Log("about to initiate shutdown grID: ", string(bytes.Fields(debug.Stack())[1])) - c := async.Done() + cancel() t.Log("about to notify upload() that shutdown started and can continue uploading grID:", string(bytes.Fields(debug.Stack())[1])) wgShutdownStarted.Done() t.Log("about to check async done channel is closed grID:", string(bytes.Fields(debug.Stack())[1])) - unittest2.RequireCloseBefore(t, c, 1*time.Second, "async uploader not closed in time") + unittest.RequireCloseBefore(t, async.Done(), 1*time.Second, "async uploader not closed in time") t.Log("about to check if callCount is 1 grID:", string(bytes.Fields(debug.Stack())[1])) require.Equal(t, 1, callCount) @@ -190,12 +200,15 @@ func Test_AsyncUploader(t *testing.T) { async.SetOnCompleteCallback(func(computationResult *execution.ComputationResult, err error) { onCompleteCallbackCalled = true }) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) err := async.Upload(computationResult) require.NoError(t, err) wgUploadCalleded.Wait() - <-async.Done() + cancel() + unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time") require.True(t, onCompleteCallbackCalled) }) From 69c99b059d9be9969b8df8cc39346ee10867ca6c Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 10:49:39 -0800 Subject: [PATCH 29/35] Replace AsyncUploader fifoqueue with channel Instead of using a notifier and fifoqueue with metrics, use a buffered channel. Block when the channel is full. (Reasoning: we want to ensure no uploads get dropped.) Buffer size of 100 was chosen arbitrarily. All AsyncUploader tests now pass. --- .../execution/ingestion/uploader/uploader.go | 49 ++----------------- 1 file changed, 5 insertions(+), 44 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index 094de50314e..d00eb7a2c9e 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -2,13 +2,10 @@ package uploader import ( "context" - "fmt" "time" "github.com/rs/zerolog" - "github.com/onflow/flow-go/engine" - "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" @@ -30,16 +27,13 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber uint64, log zerolog.Logger, metrics module.ExecutionMetrics) *AsyncUploader { - // TODO queue size, add length metrics and check error - queue, _ := fifoqueue.NewFifoQueue(1000) a := &AsyncUploader{ uploader: uploader, log: log.With().Str("component", "block_data_uploader").Logger(), metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, - queue: queue, - notifier: engine.NewNotifier(), + queue: make(chan *execution.ComputationResult, 100), } builder := component.NewComponentManagerBuilder() for range 3 { @@ -58,8 +52,7 @@ type AsyncUploader struct { retryInitialTimeout time.Duration maxRetryNumber uint64 onComplete OnCompleteFunc // callback function called after Upload is completed - queue *fifoqueue.FifoQueue - notifier engine.Notifier + queue chan *execution.ComputationResult cm *component.ComponentManager component.Component // TODO Replace fifoqueue with channel, and make Upload() blocking @@ -71,40 +64,12 @@ func (a *AsyncUploader) UploadWorker(ctx irrecoverable.SignalerContext, ready co ready() done := ctx.Done() - wake := a.notifier.Channel() for { select { case <-done: return - case <-wake: - err := a.processUploadTasks(ctx) - if err != nil { - ctx.Throw(err) - } - } - } -} - -// processUploadTasks processes any available tasks from the queue. -// Only returns when the queue is empty (or the component is terminated). -// No errors expected during normal operation. -func (a *AsyncUploader) processUploadTasks(ctx context.Context) error { - for { - item, ok := a.queue.Pop() - if !ok { - return nil - } - - computationResult, ok := item.(*execution.ComputationResult) - if !ok { - return fmt.Errorf("invalid type in AsyncUploader queue") - } - - a.UploadTask(ctx, computationResult) - select { - case <-ctx.Done(): - return nil - default: + case computationResult := <-a.queue: + a.UploadTask(ctx, computationResult) } } } @@ -117,11 +82,7 @@ func (a *AsyncUploader) SetOnCompleteCallback(onComplete OnCompleteFunc) { // ensuring that multiple uploads can be run in parallel. // No errors expected during normal operation. func (a *AsyncUploader) Upload(computationResult *execution.ComputationResult) error { - if a.queue.Push(computationResult) { - a.notifier.Notify() - } else { - // TODO record in metrics - } + a.queue <- computationResult return nil } From 72e87038f5b2ced753833e91d7449ad4d6c3439a Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 11:28:37 -0800 Subject: [PATCH 30/35] Update Component interface documentation Because Component's `Ready()` and `Done()` methods work differently from ReadyDoneAware's, include them in Component with Component-specific comments. Namely, specify that Components should be started with Start() and shutdown by canceling the context they were started with. --- module/component/component.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/module/component/component.go b/module/component/component.go index 34f8f61cf14..3f813ad0612 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -19,9 +19,25 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down") // channels that close when startup and shutdown have completed. // Once Start has been called, the channel returned by Done must close eventually, // whether that be because of a graceful shutdown or an irrecoverable error. +// See also ComponentManager below. type Component interface { module.Startable - module.ReadyDoneAware + // Ready returns a ready channel that is closed once startup has completed. + // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // and only exposes information about the component's state. + // To start the component, instead use the Start() method. + // Note that the ready channel may never close if errors are encountered during startup, + // or if shutdown has already commenced before startup is complete. + // This should be an idempotent method. + Ready() <-chan struct{} + + // Done returns a done channel that is closed once shutdown has completed. + // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // and only exposes information about the component's state. + // To shutdown the component, instead cancel the context that was passed to Start(). + // Note that the done channel should be closed even if errors are encountered during shutdown. + // This should be an idempotent method. + Done() <-chan struct{} } type ComponentFactory func() (Component, error) From 75a6b280c12a33c4dc78e4f026f2f577c642cb5e Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 12:36:05 -0800 Subject: [PATCH 31/35] fix Lint for golang 1.22 --- engine/execution/ingestion/uploader/uploader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index d00eb7a2c9e..e888d9589aa 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -36,7 +36,7 @@ func NewAsyncUploader(uploader Uploader, queue: make(chan *execution.ComputationResult, 100), } builder := component.NewComponentManagerBuilder() - for range 3 { + for i := 0; i < 3; i++ { builder.AddWorker(a.UploadWorker) } a.cm = builder.Build() From c7a85f73dadd600e6ab8166efbdd224889457124 Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Mon, 16 Dec 2024 08:55:51 -0800 Subject: [PATCH 32/35] Apply suggestions from code review Update/clarify doc comments Co-authored-by: Jordan Schalm --- engine/execution/ingestion/uploader/uploader.go | 3 ++- module/component/component.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index e888d9589aa..db8bbe93189 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -55,7 +55,6 @@ type AsyncUploader struct { queue chan *execution.ComputationResult cm *component.ComponentManager component.Component - // TODO Replace fifoqueue with channel, and make Upload() blocking } // UploadWorker implements a component worker which asynchronously uploads computation results @@ -108,6 +107,8 @@ func (a *AsyncUploader) UploadTask(ctx context.Context, computationResult *execu return retry.RetryableError(err) }) + // We only log upload errors here because the errors originate from an external cloud provider + // and the upload success is not critical to correct continued operation of the node if err != nil { a.log.Error().Err(err). Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). diff --git a/module/component/component.go b/module/component/component.go index 3f813ad0612..e78b24fffec 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -23,7 +23,7 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down") type Component interface { module.Startable // Ready returns a ready channel that is closed once startup has completed. - // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, // and only exposes information about the component's state. // To start the component, instead use the Start() method. // Note that the ready channel may never close if errors are encountered during startup, @@ -32,10 +32,10 @@ type Component interface { Ready() <-chan struct{} // Done returns a done channel that is closed once shutdown has completed. - // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, // and only exposes information about the component's state. // To shutdown the component, instead cancel the context that was passed to Start(). - // Note that the done channel should be closed even if errors are encountered during shutdown. + // Implementations must close the done channel even if errors are encountered during shutdown. // This should be an idempotent method. Done() <-chan struct{} } From d3a1438462f96fc126cd50ba739c37ebb14d875b Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:03:45 -0800 Subject: [PATCH 33/35] Apply suggestions from code review Co-authored-by: Jordan Schalm --- engine/execution/ingestion/uploader/uploader.go | 2 ++ engine/execution/ingestion/uploader/uploader_test.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index db8bbe93189..d1dcabb0cbc 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -33,6 +33,8 @@ func NewAsyncUploader(uploader Uploader, metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, + // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, + // but it is not acceptable to skip uploading an execution result queue: make(chan *execution.ComputationResult, 100), } builder := component.NewComponentManagerBuilder() diff --git a/engine/execution/ingestion/uploader/uploader_test.go b/engine/execution/ingestion/uploader/uploader_test.go index f4ee13883a4..73a72b91507 100644 --- a/engine/execution/ingestion/uploader/uploader_test.go +++ b/engine/execution/ingestion/uploader/uploader_test.go @@ -206,7 +206,7 @@ func Test_AsyncUploader(t *testing.T) { err := async.Upload(computationResult) require.NoError(t, err) - wgUploadCalleded.Wait() + unittest.AssertReturnsBefore(t, wgUploadCalleded.Wait, time.Second) cancel() unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time") From 602a4cc71fa2b3db17c209950d5f7f38044c0ec4 Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:04:49 -0800 Subject: [PATCH 34/35] Apply suggestions from code review Channel size of 20000 and worker count of 100 suggested by Leo. 20000 is approximately equal to 4 hours of execution results. Co-authored-by: Leo Zhang --- engine/execution/ingestion/uploader/uploader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index d1dcabb0cbc..d29a0d3a69c 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -35,10 +35,10 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber: maxRetryNumber, // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, // but it is not acceptable to skip uploading an execution result - queue: make(chan *execution.ComputationResult, 100), + queue: make(chan *execution.ComputationResult, 20000), } builder := component.NewComponentManagerBuilder() - for i := 0; i < 3; i++ { + for i := 0; i < 10; i++ { builder.AddWorker(a.UploadWorker) } a.cm = builder.Build() From cc03317752ce0993d856fae4caa78f7a1d4ca14f Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 20 Dec 2024 10:40:15 -0800 Subject: [PATCH 35/35] Update Component comments and fix lint --- engine/execution/ingestion/uploader/uploader.go | 2 +- module/component/component.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index d29a0d3a69c..f5abca7e490 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -35,7 +35,7 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber: maxRetryNumber, // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, // but it is not acceptable to skip uploading an execution result - queue: make(chan *execution.ComputationResult, 20000), + queue: make(chan *execution.ComputationResult, 20000), } builder := component.NewComponentManagerBuilder() for i := 0; i < 10; i++ { diff --git a/module/component/component.go b/module/component/component.go index e78b24fffec..11b543f239f 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -23,8 +23,8 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down") type Component interface { module.Startable // Ready returns a ready channel that is closed once startup has completed. - // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, - // and only exposes information about the component's state. + // Unlike the previous [module.ReadyDoneAware] interface, Ready does not start the component, + // but only exposes information about whether the component has completed startup. // To start the component, instead use the Start() method. // Note that the ready channel may never close if errors are encountered during startup, // or if shutdown has already commenced before startup is complete. @@ -32,8 +32,8 @@ type Component interface { Ready() <-chan struct{} // Done returns a done channel that is closed once shutdown has completed. - // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, - // and only exposes information about the component's state. + // Unlike the previous [module.ReadyDoneAware] interface, Done does not shut down the component, + // but only exposes information about whether the component has shut down yet. // To shutdown the component, instead cancel the context that was passed to Start(). // Implementations must close the done channel even if errors are encountered during shutdown. // This should be an idempotent method.