diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 1a241ba703b..2319c5141b2 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -480,7 +480,7 @@ func main() { node.EngineRegistry, node.State, node.Metrics.Engine, - colMetrics, + node.Metrics.Mempool, node.Me, node.Storage.Collections, node.Storage.Transactions, diff --git a/engine/collection/epochmgr/factories/builder.go b/engine/collection/epochmgr/factories/builder.go index a00a73ac97e..a588e52a080 100644 --- a/engine/collection/epochmgr/factories/builder.go +++ b/engine/collection/epochmgr/factories/builder.go @@ -6,11 +6,11 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine/collection" "github.com/onflow/flow-go/module" builder "github.com/onflow/flow-go/module/builder/collection" finalizer "github.com/onflow/flow-go/module/finalizer/collection" "github.com/onflow/flow-go/module/mempool" - "github.com/onflow/flow-go/network" clusterstate "github.com/onflow/flow-go/state/cluster" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" @@ -23,7 +23,7 @@ type BuilderFactory struct { trace module.Tracer opts []builder.Opt metrics module.CollectionMetrics - pusher network.Engine // engine for pushing finalized collection to consensus committee + pusher collection.GuaranteedCollectionPublisher // engine for pushing finalized collection to consensus committee log zerolog.Logger } @@ -33,7 +33,7 @@ func NewBuilderFactory( mainChainHeaders storage.Headers, trace module.Tracer, metrics module.CollectionMetrics, - pusher network.Engine, + pusher collection.GuaranteedCollectionPublisher, log zerolog.Logger, opts ...builder.Opt, ) (*BuilderFactory, error) { diff --git a/engine/collection/guaranteed_collection_publisher.go b/engine/collection/guaranteed_collection_publisher.go new file mode 100644 index 00000000000..cb7cdd4c746 --- /dev/null +++ b/engine/collection/guaranteed_collection_publisher.go @@ -0,0 +1,15 @@ +package collection + +import ( + "github.com/onflow/flow-go/model/flow" +) + +// GuaranteedCollectionPublisher defines the interface to send collection guarantees +// from a collection node to consensus nodes. Collection guarantees are broadcast on a best-effort basis, +// and it is acceptable to discard some guarantees (especially those that are out of date). +// Implementation is non-blocking and concurrency safe. +type GuaranteedCollectionPublisher interface { + // SubmitCollectionGuarantee adds a guarantee to an internal queue + // to be published to consensus nodes. + SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) +} diff --git a/engine/collection/mock/guaranteed_collection_publisher.go b/engine/collection/mock/guaranteed_collection_publisher.go new file mode 100644 index 00000000000..9d58eb85560 --- /dev/null +++ b/engine/collection/mock/guaranteed_collection_publisher.go @@ -0,0 +1,32 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" +) + +// GuaranteedCollectionPublisher is an autogenerated mock type for the GuaranteedCollectionPublisher type +type GuaranteedCollectionPublisher struct { + mock.Mock +} + +// SubmitCollectionGuarantee provides a mock function with given fields: guarantee +func (_m *GuaranteedCollectionPublisher) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) { + _m.Called(guarantee) +} + +// NewGuaranteedCollectionPublisher creates a new instance of GuaranteedCollectionPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewGuaranteedCollectionPublisher(t interface { + mock.TestingT + Cleanup(func()) +}) *GuaranteedCollectionPublisher { + mock := &GuaranteedCollectionPublisher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/engine/collection/pusher/engine.go b/engine/collection/pusher/engine.go index 317729108dd..d132024346b 100644 --- a/engine/collection/pusher/engine.go +++ b/engine/collection/pusher/engine.go @@ -4,15 +4,18 @@ package pusher import ( + "context" "fmt" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" - "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" @@ -21,30 +24,59 @@ import ( "github.com/onflow/flow-go/utils/logging" ) -// Engine is the collection pusher engine, which provides access to resources -// held by the collection node. +// Engine is part of the Collection Node. It broadcasts finalized collections +// ("collection guarantees") that the cluster generates to Consensus Nodes +// for inclusion in blocks. type Engine struct { - unit *engine.Unit log zerolog.Logger engMetrics module.EngineMetrics - colMetrics module.CollectionMetrics conduit network.Conduit me module.Local state protocol.State collections storage.Collections transactions storage.Transactions + + notifier engine.Notifier + queue *fifoqueue.FifoQueue + + component.Component + cm *component.ComponentManager } -func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) { +var _ network.MessageProcessor = (*Engine)(nil) +var _ component.Component = (*Engine)(nil) + +// New creates a new pusher engine. +func New( + log zerolog.Logger, + net network.EngineRegistry, + state protocol.State, + engMetrics module.EngineMetrics, + mempoolMetrics module.MempoolMetrics, + me module.Local, + collections storage.Collections, + transactions storage.Transactions, +) (*Engine, error) { + queue, err := fifoqueue.NewFifoQueue( + 200, // roughly 1 minute of collections, at 3BPS + fifoqueue.WithLengthObserver(func(len int) { + mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len)) + }), + ) + if err != nil { + return nil, fmt.Errorf("could not create fifoqueue: %w", err) + } + e := &Engine{ - unit: engine.NewUnit(), log: log.With().Str("engine", "pusher").Logger(), engMetrics: engMetrics, - colMetrics: colMetrics, me: me, state: state, collections: collections, transactions: transactions, + + notifier: engine.NewNotifier(), + queue: queue, } conduit, err := net.Register(channels.PushGuarantees, e) @@ -53,88 +85,105 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e } e.conduit = conduit + e.cm = component.NewComponentManagerBuilder(). + AddWorker(e.outboundQueueWorker). + Build() + e.Component = e.cm + return e, nil } -// Ready returns a ready channel that is closed once the engine has fully -// started. -func (e *Engine) Ready() <-chan struct{} { - return e.unit.Ready() +// outboundQueueWorker implements a component worker which broadcasts collection guarantees, +// enqueued by the Finalizer upon finalization, to Consensus Nodes. +func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + done := ctx.Done() + wake := e.notifier.Channel() + for { + select { + case <-done: + return + case <-wake: + err := e.processOutboundMessages(ctx) + if err != nil { + ctx.Throw(err) + } + } + } } -// Done returns a done channel that is closed once the engine has fully stopped. -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() -} +// processOutboundMessages processes any available messages from the queue. +// Only returns when the queue is empty (or the engine is terminated). +// No errors expected during normal operations. +func (e *Engine) processOutboundMessages(ctx context.Context) error { + for { + item, ok := e.queue.Pop() + if !ok { + return nil + } -// SubmitLocal submits an event originating on the local node. -func (e *Engine) SubmitLocal(event interface{}) { - e.unit.Launch(func() { - err := e.process(e.me.NodeID(), event) - if err != nil { - engine.LogError(e.log, err) + guarantee, ok := item.(*flow.CollectionGuarantee) + if !ok { + return fmt.Errorf("invalid type in pusher engine queue") } - }) -} -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - e.unit.Launch(func() { - err := e.process(originID, event) + err := e.publishCollectionGuarantee(guarantee) if err != nil { - engine.LogError(e.log, err) + return err } - }) -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - return e.unit.Do(func() error { - return e.process(e.me.NodeID(), event) - }) -} -// Process processes the given event from the node with the given origin ID in -// a blocking manner. It returns the potential processing error when done. -func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { - return e.unit.Do(func() error { - return e.process(originID, event) - }) + select { + case <-ctx.Done(): + return nil + default: + } + } } -// process processes events for the pusher engine on the collection node. -func (e *Engine) process(originID flow.Identifier, event interface{}) error { - switch ev := event.(type) { - case *messages.SubmitCollectionGuarantee: - e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) - defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee) - return e.onSubmitCollectionGuarantee(originID, ev) - default: - return fmt.Errorf("invalid event type (%T)", event) - } +// Process is called by the networking layer, when peers broadcast messages with this node +// as one of the recipients. The protocol specifies that Collector nodes broadcast Collection +// Guarantees to Consensus Nodes and _only_ those. When the pusher engine (running only on +// Collectors) receives a message, this message is evidence of byzantine behavior. +// Byzantine inputs are internally handled by the pusher.Engine and do *not* result in +// error returns. No errors expected during normal operation (including byzantine inputs). +func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error { + // Targeting a collector node's pusher.Engine with messages could be considered as a slashable offense. + // Though, for generating cryptographic evidence, we need Message Forensics - see reference [1]. + // Much further into the future, when we are implementing slashing challenges, we'll probably implement a + // dedicated consumer to post-process evidence of protocol violations into slashing challenges. For now, + // we just log this with the `KeySuspicious` to alert the node operator. + // [1] Message Forensics FLIP https://github.com/onflow/flips/pull/195) + errs := fmt.Errorf("collector node's pusher.Engine was targeted by message %T on channel %v", message, channel) + e.log.Warn(). + Err(errs). + Bool(logging.KeySuspicious, true). + Str("peer_id", originID.String()). + Msg("potentially byzantine networking traffic detected") + return nil } -// onSubmitCollectionGuarantee handles submitting the given collection guarantee -// to consensus nodes. -func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error { - if originID != e.me.NodeID() { - return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID) +// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue +// to later be published to consensus nodes. +func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) { + if e.queue.Push(guarantee) { + e.notifier.Notify() + } else { + e.engMetrics.OutboundMessageDropped(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee) } - - return e.SubmitCollectionGuarantee(&req.Guarantee) } -// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes. -func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { +// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes. +// No errors expected during normal operation. +func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error { consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus)) if err != nil { - return fmt.Errorf("could not get consensus nodes: %w", err) + return fmt.Errorf("could not get consensus nodes' identities: %w", err) } - // NOTE: Consensus nodes do not broadcast guarantees among themselves, so it needs that - // at least one collection node make a publish to all of them. + // NOTE: Consensus nodes do not broadcast guarantees among themselves. So for the collection to be included, + // at least one collector has to successfully broadcast the collection to consensus nodes. Otherwise, the + // collection is lost, which is acceptable as long as we only lose a small fraction of collections. err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...) if err != nil { return fmt.Errorf("could not submit collection guarantee: %w", err) diff --git a/engine/collection/pusher/engine_test.go b/engine/collection/pusher/engine_test.go index fde6d9696dc..746957071f8 100644 --- a/engine/collection/pusher/engine_test.go +++ b/engine/collection/pusher/engine_test.go @@ -1,8 +1,10 @@ package pusher_test import ( + "context" "io" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -11,7 +13,7 @@ import ( "github.com/onflow/flow-go/engine/collection/pusher" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" - "github.com/onflow/flow-go/model/messages" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" @@ -82,18 +84,21 @@ func TestPusherEngine(t *testing.T) { // should be able to submit collection guarantees to consensus nodes func (suite *Suite) TestSubmitCollectionGuarantee() { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background()) + suite.engine.Start(ctx) + defer cancel() + done := make(chan struct{}) guarantee := unittest.CollectionGuaranteeFixture() // should submit the collection to consensus nodes consensus := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleConsensus)) - suite.conduit.On("Publish", guarantee, consensus[0].NodeID).Return(nil) + suite.conduit.On("Publish", guarantee, consensus[0].NodeID). + Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once() - msg := &messages.SubmitCollectionGuarantee{ - Guarantee: *guarantee, - } - err := suite.engine.ProcessLocal(msg) - suite.Require().Nil(err) + suite.engine.SubmitCollectionGuarantee(guarantee) + + unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent") suite.conduit.AssertExpectations(suite.T()) } @@ -103,14 +108,13 @@ func (suite *Suite) TestSubmitCollectionGuaranteeNonLocal() { guarantee := unittest.CollectionGuaranteeFixture() - // send from a non-allowed role + // verify that pusher.Engine handles any (potentially byzantine) input: + // A byzantine peer could target the collector node's pusher engine with messages + // The pusher should discard those and explicitly not get tricked into broadcasting + // collection guarantees which a byzantine peer might try to inject into the system. sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0] - msg := &messages.SubmitCollectionGuarantee{ - Guarantee: *guarantee, - } - err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg) - suite.Require().Error(err) - + err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee) + suite.Require().NoError(err) suite.conduit.AssertNumberOfCalls(suite.T(), "Multicast", 0) } diff --git a/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go b/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go index ecad4801741..418177b4dc8 100644 --- a/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go +++ b/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/storage" @@ -34,6 +35,7 @@ type BadgerRetryableUploaderWrapper struct { results storage.ExecutionResults transactionResults storage.TransactionResults uploadStatusStore storage.ComputationResultUploadStatus + component.Component } func NewBadgerRetryableUploaderWrapper( @@ -99,17 +101,10 @@ func NewBadgerRetryableUploaderWrapper( results: results, transactionResults: transactionResults, uploadStatusStore: uploadStatusStore, + Component: uploader, // delegate to the AsyncUploader } } -func (b *BadgerRetryableUploaderWrapper) Ready() <-chan struct{} { - return b.uploader.Ready() -} - -func (b *BadgerRetryableUploaderWrapper) Done() <-chan struct{} { - return b.uploader.Done() -} - func (b *BadgerRetryableUploaderWrapper) Upload(computationResult *execution.ComputationResult) error { if computationResult == nil || computationResult.ExecutableBlock == nil || computationResult.ExecutableBlock.Block == nil { diff --git a/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go b/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go index 491307705eb..adff343f61f 100644 --- a/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go +++ b/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go @@ -1,6 +1,7 @@ package uploader import ( + "context" "sync" "testing" "time" @@ -11,6 +12,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/executiondatasync/execution_data" executionDataMock "github.com/onflow/flow-go/module/executiondatasync/execution_data/mock" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/module/metrics" @@ -26,6 +28,8 @@ import ( ) func Test_Upload_invoke(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wg := sync.WaitGroup{} uploaderCalled := false @@ -40,7 +44,7 @@ func Test_Upload_invoke(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) // nil input - no call to Upload() err := testRetryableUploaderWrapper.Upload(nil) @@ -58,6 +62,8 @@ func Test_Upload_invoke(t *testing.T) { } func Test_RetryUpload(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wg := sync.WaitGroup{} wg.Add(1) uploaderCalled := false @@ -72,7 +78,7 @@ func Test_RetryUpload(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) err := testRetryableUploaderWrapper.RetryUpload() wg.Wait() @@ -82,6 +88,8 @@ func Test_RetryUpload(t *testing.T) { } func Test_AsyncUploaderCallback(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wgUploadCalleded := sync.WaitGroup{} wgUploadCalleded.Add(1) @@ -95,7 +103,7 @@ func Test_AsyncUploaderCallback(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) testComputationResult := createTestComputationResult() err := testRetryableUploaderWrapper.Upload(testComputationResult) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index f9486ea99d8..f5abca7e490 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -6,9 +6,10 @@ import ( "github.com/rs/zerolog" - "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/utils/logging" "github.com/sethvargo/go-retry" @@ -26,74 +27,102 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber uint64, log zerolog.Logger, metrics module.ExecutionMetrics) *AsyncUploader { - return &AsyncUploader{ - unit: engine.NewUnit(), + a := &AsyncUploader{ uploader: uploader, log: log.With().Str("component", "block_data_uploader").Logger(), metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, + // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, + // but it is not acceptable to skip uploading an execution result + queue: make(chan *execution.ComputationResult, 20000), } + builder := component.NewComponentManagerBuilder() + for i := 0; i < 10; i++ { + builder.AddWorker(a.UploadWorker) + } + a.cm = builder.Build() + a.Component = a.cm + return a } // AsyncUploader wraps up another Uploader instance and make its upload asynchronous type AsyncUploader struct { - module.ReadyDoneAware - unit *engine.Unit uploader Uploader log zerolog.Logger metrics module.ExecutionMetrics retryInitialTimeout time.Duration maxRetryNumber uint64 onComplete OnCompleteFunc // callback function called after Upload is completed + queue chan *execution.ComputationResult + cm *component.ComponentManager + component.Component } -func (a *AsyncUploader) Ready() <-chan struct{} { - return a.unit.Ready() -} - -func (a *AsyncUploader) Done() <-chan struct{} { - return a.unit.Done() +// UploadWorker implements a component worker which asynchronously uploads computation results +// from the execution node (after a block is executed) to storage such as a GCP bucket or S3 bucket. +func (a *AsyncUploader) UploadWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + done := ctx.Done() + for { + select { + case <-done: + return + case computationResult := <-a.queue: + a.UploadTask(ctx, computationResult) + } + } } func (a *AsyncUploader) SetOnCompleteCallback(onComplete OnCompleteFunc) { a.onComplete = onComplete } +// Upload adds the computation result to a queue to be processed asynchronously by workers, +// ensuring that multiple uploads can be run in parallel. +// No errors expected during normal operation. func (a *AsyncUploader) Upload(computationResult *execution.ComputationResult) error { + a.queue <- computationResult + return nil +} +// UploadTask implements retrying for uploading computation results. +// When the upload is complete, the callback will be called with the result (for example, +// to record that the upload was successful) and any error. +// No errors expected during normal operation. +func (a *AsyncUploader) UploadTask(ctx context.Context, computationResult *execution.ComputationResult) { backoff := retry.NewFibonacci(a.retryInitialTimeout) backoff = retry.WithMaxRetries(a.maxRetryNumber, backoff) - a.unit.Launch(func() { - a.metrics.ExecutionBlockDataUploadStarted() - start := time.Now() - - a.log.Debug().Msgf("computation result of block %s is being uploaded", - computationResult.ExecutableBlock.ID().String()) + a.metrics.ExecutionBlockDataUploadStarted() + start := time.Now() - err := retry.Do(a.unit.Ctx(), backoff, func(ctx context.Context) error { - err := a.uploader.Upload(computationResult) - if err != nil { - a.log.Warn().Err(err).Msg("error while uploading block data, retrying") - } - return retry.RetryableError(err) - }) + a.log.Debug().Msgf("computation result of block %s is being uploaded", + computationResult.ExecutableBlock.ID().String()) + err := retry.Do(ctx, backoff, func(ctx context.Context) error { + err := a.uploader.Upload(computationResult) if err != nil { - a.log.Error().Err(err). - Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). - Msg("failed to upload block data") - } else { - a.log.Debug().Msgf("computation result of block %s was successfully uploaded", - computationResult.ExecutableBlock.ID().String()) + a.log.Warn().Err(err).Msg("error while uploading block data, retrying") } + return retry.RetryableError(err) + }) - a.metrics.ExecutionBlockDataUploadFinished(time.Since(start)) + // We only log upload errors here because the errors originate from an external cloud provider + // and the upload success is not critical to correct continued operation of the node + if err != nil { + a.log.Error().Err(err). + Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). + Msg("failed to upload block data") + } else { + a.log.Debug().Msgf("computation result of block %s was successfully uploaded", + computationResult.ExecutableBlock.ID().String()) + } - if a.onComplete != nil { - a.onComplete(computationResult, err) - } - }) - return nil + a.metrics.ExecutionBlockDataUploadFinished(time.Since(start)) + + if a.onComplete != nil { + a.onComplete(computationResult, err) + } } diff --git a/engine/execution/ingestion/uploader/uploader_test.go b/engine/execution/ingestion/uploader/uploader_test.go index c14fa683715..73a72b91507 100644 --- a/engine/execution/ingestion/uploader/uploader_test.go +++ b/engine/execution/ingestion/uploader/uploader_test.go @@ -2,6 +2,7 @@ package uploader import ( "bytes" + "context" "fmt" "runtime/debug" "sync" @@ -13,17 +14,17 @@ import ( "go.uber.org/atomic" "github.com/onflow/flow-go/engine/execution" - "github.com/onflow/flow-go/engine/execution/state/unittest" + exeunittest "github.com/onflow/flow-go/engine/execution/state/unittest" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" - testutils "github.com/onflow/flow-go/utils/unittest" - unittest2 "github.com/onflow/flow-go/utils/unittest" + "github.com/onflow/flow-go/utils/unittest" ) func Test_AsyncUploader(t *testing.T) { - computationResult := unittest.ComputationResultFixture( + computationResult := exeunittest.ComputationResultFixture( t, - testutils.IdentifierFixture(), + unittest.IdentifierFixture(), nil) t.Run("uploads are run in parallel and emit metrics", func(t *testing.T) { @@ -46,6 +47,8 @@ func Test_AsyncUploader(t *testing.T) { metrics := &DummyCollector{} async := NewAsyncUploader(uploader, 1*time.Nanosecond, 1, zerolog.Nop(), metrics) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) err := async.Upload(computationResult) require.NoError(t, err) @@ -63,6 +66,8 @@ func Test_AsyncUploader(t *testing.T) { wgContinueUpload.Done() //release all // shut down component + cancel() + unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader did not finish in time") <-async.Done() require.Equal(t, int64(0), metrics.Counter.Load()) @@ -89,6 +94,9 @@ func Test_AsyncUploader(t *testing.T) { } async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{}) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) + defer cancel() err := async.Upload(computationResult) require.NoError(t, err) @@ -107,7 +115,7 @@ func Test_AsyncUploader(t *testing.T) { // 2. shut down async uploader right after upload initiated (not completed) // 3. assert that upload called only once even when trying to use retry mechanism t.Run("stopping component stops retrying", func(t *testing.T) { - testutils.SkipUnless(t, testutils.TEST_FLAKY, "flaky") + unittest.SkipUnless(t, unittest.TEST_FLAKY, "flaky") callCount := 0 t.Log("test started grID:", string(bytes.Fields(debug.Stack())[1])) @@ -151,6 +159,8 @@ func Test_AsyncUploader(t *testing.T) { } t.Log("about to create NewAsyncUploader grID:", string(bytes.Fields(debug.Stack())[1])) async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{}) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) t.Log("about to call async.Upload() grID:", string(bytes.Fields(debug.Stack())[1])) err := async.Upload(computationResult) // doesn't matter what we upload require.NoError(t, err) @@ -163,11 +173,11 @@ func Test_AsyncUploader(t *testing.T) { // stop component and check that it's fully stopped t.Log("about to initiate shutdown grID: ", string(bytes.Fields(debug.Stack())[1])) - c := async.Done() + cancel() t.Log("about to notify upload() that shutdown started and can continue uploading grID:", string(bytes.Fields(debug.Stack())[1])) wgShutdownStarted.Done() t.Log("about to check async done channel is closed grID:", string(bytes.Fields(debug.Stack())[1])) - unittest2.RequireCloseBefore(t, c, 1*time.Second, "async uploader not closed in time") + unittest.RequireCloseBefore(t, async.Done(), 1*time.Second, "async uploader not closed in time") t.Log("about to check if callCount is 1 grID:", string(bytes.Fields(debug.Stack())[1])) require.Equal(t, 1, callCount) @@ -190,12 +200,15 @@ func Test_AsyncUploader(t *testing.T) { async.SetOnCompleteCallback(func(computationResult *execution.ComputationResult, err error) { onCompleteCallbackCalled = true }) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) err := async.Upload(computationResult) require.NoError(t, err) - wgUploadCalleded.Wait() - <-async.Done() + unittest.AssertReturnsBefore(t, wgUploadCalleded.Wait, time.Second) + cancel() + unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time") require.True(t, onCompleteCallbackCalled) }) diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index 8a2ea4fed1a..aae7383ada0 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -140,6 +140,7 @@ func (n CollectionNode) Start(t *testing.T) { n.IngestionEngine.Start(n.Ctx) n.EpochManagerEngine.Start(n.Ctx) n.ProviderEngine.Start(n.Ctx) + n.PusherEngine.Start(n.Ctx) } func (n CollectionNode) Ready() <-chan struct{} { diff --git a/engine/verification/assigner/engine.go b/engine/verification/assigner/engine.go index ba2e7d2d1f7..47b9eec819d 100644 --- a/engine/verification/assigner/engine.go +++ b/engine/verification/assigner/engine.go @@ -8,7 +8,6 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/model/chunks" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" @@ -24,7 +23,6 @@ import ( // to me to verify, and then save it to the chunks job queue for the // fetcher engine to process. type Engine struct { - unit *engine.Unit log zerolog.Logger metrics module.VerificationMetrics tracer module.Tracer @@ -36,8 +34,11 @@ type Engine struct { blockConsumerNotifier module.ProcessingNotifier // to report a block has been processed. stopAtHeight uint64 stopAtBlockID atomic.Value + *module.NoopReadyDoneAware } +var _ module.ReadyDoneAware = (*Engine)(nil) + func New( log zerolog.Logger, metrics module.VerificationMetrics, @@ -50,7 +51,6 @@ func New( stopAtHeight uint64, ) *Engine { e := &Engine{ - unit: engine.NewUnit(), log: log.With().Str("engine", "assigner").Logger(), metrics: metrics, tracer: tracer, @@ -69,14 +69,6 @@ func (e *Engine) WithBlockConsumerNotifier(notifier module.ProcessingNotifier) { e.blockConsumerNotifier = notifier } -func (e *Engine) Ready() <-chan struct{} { - return e.unit.Ready() -} - -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() -} - // resultChunkAssignment receives an execution result that appears in a finalized incorporating block. // In case this verification node is authorized at the reference block of this execution receipt's result, // chunk assignment is computed for the result, and the list of assigned chunks returned. @@ -164,7 +156,8 @@ func (e *Engine) processChunk(chunk *flow.Chunk, resultID flow.Identifier, block func (e *Engine) ProcessFinalizedBlock(block *flow.Block) { blockID := block.ID() - span, ctx := e.tracer.StartBlockSpan(e.unit.Ctx(), blockID, trace.VERProcessFinalizedBlock) + // We don't have any existing information and don't need cancellation, so use a background (empty) context + span, ctx := e.tracer.StartBlockSpan(context.Background(), blockID, trace.VERProcessFinalizedBlock) defer span.End() e.processFinalizedBlock(ctx, block) diff --git a/engine/verification/fetcher/engine.go b/engine/verification/fetcher/engine.go index 551b0571526..b12ef5637a9 100644 --- a/engine/verification/fetcher/engine.go +++ b/engine/verification/fetcher/engine.go @@ -35,7 +35,6 @@ import ( // to the verifier engine. type Engine struct { // common - unit *engine.Unit state protocol.State // used to verify the origin ID of chunk data response, and sealing status. // monitoring @@ -73,7 +72,6 @@ func New( stopAtHeight uint64, ) *Engine { e := &Engine{ - unit: engine.NewUnit(), metrics: metrics, tracer: tracer, log: log.With().Str("engine", "fetcher").Logger(), @@ -105,16 +103,12 @@ func (e *Engine) Ready() <-chan struct{} { if e.chunkConsumerNotifier == nil { e.log.Fatal().Msg("missing chunk consumer notifier callback in verification fetcher engine") } - return e.unit.Ready(func() { - <-e.requester.Ready() - }) + return e.requester.Ready() } // Done terminates the engine and returns a channel that is closed when the termination is done func (e *Engine) Done() <-chan struct{} { - return e.unit.Done(func() { - <-e.requester.Done() - }) + return e.requester.Done() } // ProcessAssignedChunk is the entry point of fetcher engine. @@ -170,7 +164,8 @@ func (e *Engine) ProcessAssignedChunk(locator *chunks.Locator) { // processAssignedChunkWithTracing encapsulates the logic of processing assigned chunk with tracing enabled. func (e *Engine) processAssignedChunkWithTracing(chunk *flow.Chunk, result *flow.ExecutionResult, chunkLocatorID flow.Identifier) (bool, uint64, error) { - span, _ := e.tracer.StartBlockSpan(e.unit.Ctx(), result.BlockID, trace.VERProcessAssignedChunk) + // We don't have any existing information and don't need cancellation, so use a background (empty) context + span, _ := e.tracer.StartBlockSpan(context.Background(), result.BlockID, trace.VERProcessAssignedChunk) span.SetAttributes(attribute.Int("collection_index", int(chunk.CollectionIndex))) defer span.End() diff --git a/model/messages/collection.go b/model/messages/collection.go index 3ef3251698b..2b4273c900d 100644 --- a/model/messages/collection.go +++ b/model/messages/collection.go @@ -5,12 +5,6 @@ import ( "github.com/onflow/flow-go/model/flow" ) -// SubmitCollectionGuarantee is a request to submit the given collection -// guarantee to consensus nodes. Only valid as a node-local message. -type SubmitCollectionGuarantee struct { - Guarantee flow.CollectionGuarantee -} - // CollectionRequest request all transactions from a collection with the given // fingerprint. type CollectionRequest struct { diff --git a/module/component/component.go b/module/component/component.go index 34f8f61cf14..11b543f239f 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -19,9 +19,25 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down") // channels that close when startup and shutdown have completed. // Once Start has been called, the channel returned by Done must close eventually, // whether that be because of a graceful shutdown or an irrecoverable error. +// See also ComponentManager below. type Component interface { module.Startable - module.ReadyDoneAware + // Ready returns a ready channel that is closed once startup has completed. + // Unlike the previous [module.ReadyDoneAware] interface, Ready does not start the component, + // but only exposes information about whether the component has completed startup. + // To start the component, instead use the Start() method. + // Note that the ready channel may never close if errors are encountered during startup, + // or if shutdown has already commenced before startup is complete. + // This should be an idempotent method. + Ready() <-chan struct{} + + // Done returns a done channel that is closed once shutdown has completed. + // Unlike the previous [module.ReadyDoneAware] interface, Done does not shut down the component, + // but only exposes information about whether the component has shut down yet. + // To shutdown the component, instead cancel the context that was passed to Start(). + // Implementations must close the done channel even if errors are encountered during shutdown. + // This should be an idempotent method. + Done() <-chan struct{} } type ComponentFactory func() (Component, error) diff --git a/module/finalizer/collection/finalizer.go b/module/finalizer/collection/finalizer.go index bfe1d76ae4f..e5b2e21904f 100644 --- a/module/finalizer/collection/finalizer.go +++ b/module/finalizer/collection/finalizer.go @@ -5,12 +5,11 @@ import ( "github.com/dgraph-io/badger/v2" + "github.com/onflow/flow-go/engine/collection" "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/mempool" - "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" ) @@ -22,7 +21,7 @@ import ( type Finalizer struct { db *badger.DB transactions mempool.Transactions - prov network.Engine + pusher collection.GuaranteedCollectionPublisher metrics module.CollectionMetrics } @@ -30,13 +29,13 @@ type Finalizer struct { func NewFinalizer( db *badger.DB, transactions mempool.Transactions, - prov network.Engine, + pusher collection.GuaranteedCollectionPublisher, metrics module.CollectionMetrics, ) *Finalizer { f := &Finalizer{ db: db, transactions: transactions, - prov: prov, + pusher: pusher, metrics: metrics, } return f @@ -159,15 +158,13 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error { // For now, we just use the parent signers as the guarantors of this // collection. - // TODO add real signatures here (2711) - f.prov.SubmitLocal(&messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: payload.Collection.ID(), - ReferenceBlockID: payload.ReferenceBlockID, - ChainID: header.ChainID, - SignerIndices: step.ParentVoterIndices, - Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes - }, + // TODO add real signatures here (https://github.com/onflow/flow-go-internal/issues/4569) + f.pusher.SubmitCollectionGuarantee(&flow.CollectionGuarantee{ + CollectionID: payload.Collection.ID(), + ReferenceBlockID: payload.ReferenceBlockID, + ChainID: header.ChainID, + SignerIndices: step.ParentVoterIndices, + Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes }) } diff --git a/module/finalizer/collection/finalizer_test.go b/module/finalizer/collection/finalizer_test.go index fa92d3eeafe..b9b1f38bc19 100644 --- a/module/finalizer/collection/finalizer_test.go +++ b/module/finalizer/collection/finalizer_test.go @@ -8,13 +8,12 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + collectionmock "github.com/onflow/flow-go/engine/collection/mock" model "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module/finalizer/collection" "github.com/onflow/flow-go/module/mempool/herocache" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/network/mocknetwork" cluster "github.com/onflow/flow-go/state/cluster/badger" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" @@ -65,9 +64,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) fakeBlockID := unittest.IdentifierFixture() err := finalizer.MakeFinal(fakeBlockID) @@ -78,9 +76,9 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + pusher.On("SubmitCollectionGuarantee", mock.Anything).Once() + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized block tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -104,9 +102,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // create a new block that isn't connected to a parent block := unittest.ClusterBlockWithParent(genesis) @@ -123,8 +120,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // create a block with empty payload on genesis block := unittest.ClusterBlockWithParent(genesis) @@ -141,16 +138,15 @@ func TestFinalizer(t *testing.T) { assert.Equal(t, block.ID(), final.ID()) // collection should not have been propagated - prov.AssertNotCalled(t, "SubmitLocal", mock.Anything) + pusher.AssertNotCalled(t, "SubmitCollectionGuarantee", mock.Anything) }) t.Run("finalize single block", func(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -164,6 +160,15 @@ func TestFinalizer(t *testing.T) { block.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block) + // block should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block.Header.ChainID, + SignerIndices: block.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize the block err := finalizer.MakeFinal(block.ID()) assert.Nil(t, err) @@ -178,18 +183,6 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, final.ID()) - - // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 1) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block.Header.ChainID, - SignerIndices: block.Header.ParentVoterIndices, - Signature: nil, - }, - }) }) // when finalizing a block with un-finalized ancestors, those ancestors should be finalized as well @@ -197,9 +190,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the first finalized block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -218,6 +210,22 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) + // both blocks should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, + }).Once() + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block2.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block2.Header.ChainID, + SignerIndices: block2.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize block2 (should indirectly finalize block1 as well) err := finalizer.MakeFinal(block2.ID()) assert.Nil(t, err) @@ -231,36 +239,14 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block2.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID(), block2.ID()) - - // both blocks should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 2) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }, - }) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block2.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block2.Header.ChainID, - SignerIndices: block2.Header.ParentVoterIndices, - Signature: nil, - }, - }) }) t.Run("finalize with un-finalized child", func(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized parent block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -279,6 +265,15 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) + // block should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize block1 (should NOT finalize block2) err := finalizer.MakeFinal(block1.ID()) assert.Nil(t, err) @@ -293,18 +288,6 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block1.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) - - // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 1) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }, - }) }) // when finalizing a block with a conflicting fork, the fork should not be finalized. @@ -312,9 +295,8 @@ func TestFinalizer(t *testing.T) { bootstrap() defer cleanup() - prov := new(mocknetwork.Engine) - prov.On("SubmitLocal", mock.Anything) - finalizer := collection.NewFinalizer(db, pool, prov, metrics) + pusher := collectionmock.NewGuaranteedCollectionPublisher(t) + finalizer := collection.NewFinalizer(db, pool, pusher, metrics) // tx1 is included in the finalized block and mempool tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 }) @@ -333,6 +315,15 @@ func TestFinalizer(t *testing.T) { block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) + // block should be passed to pusher + pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{ + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + ChainID: block1.Header.ChainID, + SignerIndices: block1.Header.ParentVoterIndices, + Signature: nil, + }).Once() + // finalize block1 err := finalizer.MakeFinal(block1.ID()) assert.Nil(t, err) @@ -347,18 +338,6 @@ func TestFinalizer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, block1.ID(), final.ID()) assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) - - // block should be passed to provider - prov.AssertNumberOfCalls(t, "SubmitLocal", 1) - prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ - Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - ReferenceBlockID: refBlock.ID(), - ChainID: block1.Header.ChainID, - SignerIndices: block1.Header.ParentVoterIndices, - Signature: nil, - }, - }) }) }) } diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 20b66ad7d68..1e5b6601a2e 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -113,6 +113,7 @@ const ( ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine + ResourceSubmitCollectionGuaranteesQueue = "pusher_col_guarantee_queue" // collection node, pusher engine ResourceBeaconKey = "beacon-key" // consensus node, DKG engine ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine