Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Replace some uses of engine.Unit with ComponentManager #6833

Merged
merged 42 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
3552f06
replace engine.Unit with ComponentManager in Pusher Engine
tim-barry Nov 21, 2024
6d3f462
pusher engine test: update positive test
tim-barry Nov 21, 2024
0aadc13
Pusher engine test: update negative test
tim-barry Nov 21, 2024
dec0d58
Start pusher engine in mocks
tim-barry Nov 21, 2024
b5418dd
Merge branch 'master' into tim/7018-pusher-engine-use-componentmanager
jordanschalm Nov 22, 2024
b7166f3
Refactor pusher engine: merge function with no callers
tim-barry Nov 27, 2024
e25cba7
Refactor pusher engine: error on non-local messages
tim-barry Nov 27, 2024
f2f53a8
Refactor pusher engine: rename and propagate error
tim-barry Nov 27, 2024
f66ba69
Refactor pusher engine
tim-barry Nov 27, 2024
1c80949
Revert "Pusher engine test: update negative test"
tim-barry Nov 27, 2024
fbed8e7
Refactor pusher engine: (lint) remove unused code
tim-barry Nov 27, 2024
051e6d4
Merge branch 'feature/pusher-engine-refactor' into tim/7018-pusher-en…
tim-barry Nov 27, 2024
ddb0cb7
Refactor pusher engine: queue length metrics
tim-barry Nov 29, 2024
17a9a2b
Update pusher engine doc comment
tim-barry Nov 29, 2024
aad332c
Apply suggestions from code review
tim-barry Nov 30, 2024
ad04f27
Pusher engine refactor: remove unused collection metrics
tim-barry Nov 30, 2024
88a7d45
Refactor pusher engine: add error return doc comments
tim-barry Dec 2, 2024
cebf100
Refactor pusher engine: add metrics for dropped messages
tim-barry Dec 3, 2024
2048b4a
Apply suggestions from code review
tim-barry Dec 3, 2024
7e4258a
Merge pull request #6747 from onflow/tim/7018-pusher-engine-use-compo…
tim-barry Dec 4, 2024
1fc3a19
Refactor pusher engine: change interface
tim-barry Dec 2, 2024
9e37f14
Apply suggestions from code review
tim-barry Dec 6, 2024
d157f48
Update mock checks
tim-barry Dec 6, 2024
eacc992
Rename prov to pusher in Finalizer
tim-barry Dec 6, 2024
f9e16bf
Update finalizer tests to expect correct number of calls
tim-barry Dec 6, 2024
42b2331
Update module/finalizer/collection/finalizer.go comment
tim-barry Dec 6, 2024
f58ccaf
Apply suggestions from code review
tim-barry Dec 10, 2024
656cbd5
Merge pull request #6780 from onflow/tim/6765-pusher-engine-update-in…
tim-barry Dec 10, 2024
4f1906e
Remove engine.Unit from Assigner engine and Fetcher engine
tim-barry Dec 9, 2024
e73b3f3
WIP - Initial refactor of AsyncUploader
tim-barry Dec 10, 2024
9c57c77
Convert BadgerRetryableUploadWrapper to Component
tim-barry Dec 12, 2024
f7709c8
Update AsyncUploader tests
tim-barry Dec 12, 2024
69c99b0
Replace AsyncUploader fifoqueue with channel
tim-barry Dec 12, 2024
72e8703
Update Component interface documentation
tim-barry Dec 12, 2024
75a6b28
fix Lint for golang 1.22
tim-barry Dec 12, 2024
c7a85f7
Apply suggestions from code review
tim-barry Dec 16, 2024
2e6fc67
Merge pull request #6808 from onflow/tim/6807-verification-engines-re…
tim-barry Dec 16, 2024
d3a1438
Apply suggestions from code review
tim-barry Dec 20, 2024
602a4cc
Apply suggestions from code review
tim-barry Dec 20, 2024
cc03317
Update Component comments and fix lint
tim-barry Dec 20, 2024
955373a
Merge branch 'feature/pusher-engine-refactor' into tim/6807-AsyncUplo…
tim-barry Dec 20, 2024
266ad1f
Merge pull request #6809 from onflow/tim/6807-AsyncUploader-engine.Un…
tim-barry Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func main() {
node.EngineRegistry,
node.State,
node.Metrics.Engine,
colMetrics,
node.Metrics.Mempool,
node.Me,
node.Storage.Collections,
node.Storage.Transactions,
Expand Down
6 changes: 3 additions & 3 deletions engine/collection/epochmgr/factories/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"github.com/dgraph-io/badger/v2"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/collection"
"github.com/onflow/flow-go/module"
builder "github.com/onflow/flow-go/module/builder/collection"
finalizer "github.com/onflow/flow-go/module/finalizer/collection"
"github.com/onflow/flow-go/module/mempool"
"github.com/onflow/flow-go/network"
clusterstate "github.com/onflow/flow-go/state/cluster"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
Expand All @@ -23,7 +23,7 @@ type BuilderFactory struct {
trace module.Tracer
opts []builder.Opt
metrics module.CollectionMetrics
pusher network.Engine // engine for pushing finalized collection to consensus committee
pusher collection.GuaranteedCollectionPublisher // engine for pushing finalized collection to consensus committee
log zerolog.Logger
}

Expand All @@ -33,7 +33,7 @@ func NewBuilderFactory(
mainChainHeaders storage.Headers,
trace module.Tracer,
metrics module.CollectionMetrics,
pusher network.Engine,
pusher collection.GuaranteedCollectionPublisher,
log zerolog.Logger,
opts ...builder.Opt,
) (*BuilderFactory, error) {
Expand Down
15 changes: 15 additions & 0 deletions engine/collection/guaranteed_collection_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package collection

import (
"github.com/onflow/flow-go/model/flow"
)

// GuaranteedCollectionPublisher defines the interface to send collection guarantees
// from a collection node to consensus nodes. Collection guarantees are broadcast on a best-effort basis,
// and it is acceptable to discard some guarantees (especially those that are out of date).
// Implementation is non-blocking and concurrency safe.
type GuaranteedCollectionPublisher interface {
// SubmitCollectionGuarantee adds a guarantee to an internal queue
// to be published to consensus nodes.
SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee)
}
32 changes: 32 additions & 0 deletions engine/collection/mock/guaranteed_collection_publisher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

185 changes: 117 additions & 68 deletions engine/collection/pusher/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
package pusher

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/fifoqueue"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
Expand All @@ -21,30 +24,59 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

// Engine is the collection pusher engine, which provides access to resources
// held by the collection node.
// Engine is part of the Collection Node. It broadcasts finalized collections
// ("collection guarantees") that the cluster generates to Consensus Nodes
// for inclusion in blocks.
type Engine struct {
unit *engine.Unit
log zerolog.Logger
engMetrics module.EngineMetrics
colMetrics module.CollectionMetrics
conduit network.Conduit
me module.Local
state protocol.State
collections storage.Collections
transactions storage.Transactions

notifier engine.Notifier
queue *fifoqueue.FifoQueue

component.Component
cm *component.ComponentManager
}

func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) {
var _ network.MessageProcessor = (*Engine)(nil)
var _ component.Component = (*Engine)(nil)

// New creates a new pusher engine.
func New(
log zerolog.Logger,
net network.EngineRegistry,
state protocol.State,
engMetrics module.EngineMetrics,
mempoolMetrics module.MempoolMetrics,
me module.Local,
collections storage.Collections,
transactions storage.Transactions,
) (*Engine, error) {
queue, err := fifoqueue.NewFifoQueue(
200, // roughly 1 minute of collections, at 3BPS
fifoqueue.WithLengthObserver(func(len int) {
mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len))
}),
)
if err != nil {
return nil, fmt.Errorf("could not create fifoqueue: %w", err)
}

e := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "pusher").Logger(),
engMetrics: engMetrics,
colMetrics: colMetrics,
me: me,
state: state,
collections: collections,
transactions: transactions,

notifier: engine.NewNotifier(),
queue: queue,
}

conduit, err := net.Register(channels.PushGuarantees, e)
Expand All @@ -53,88 +85,105 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
}
e.conduit = conduit

e.cm = component.NewComponentManagerBuilder().
AddWorker(e.outboundQueueWorker).
Build()
e.Component = e.cm

return e, nil
}

// Ready returns a ready channel that is closed once the engine has fully
// started.
func (e *Engine) Ready() <-chan struct{} {
return e.unit.Ready()
// outboundQueueWorker implements a component worker which broadcasts collection guarantees,
// enqueued by the Finalizer upon finalization, to Consensus Nodes.
func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()

done := ctx.Done()
wake := e.notifier.Channel()
for {
select {
case <-done:
return
case <-wake:
err := e.processOutboundMessages(ctx)
if err != nil {
ctx.Throw(err)
}
}
}
}

// Done returns a done channel that is closed once the engine has fully stopped.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}
// processOutboundMessages processes any available messages from the queue.
// Only returns when the queue is empty (or the engine is terminated).
// No errors expected during normal operations.
func (e *Engine) processOutboundMessages(ctx context.Context) error {
for {
item, ok := e.queue.Pop()
if !ok {
return nil
}

// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
e.unit.Launch(func() {
err := e.process(e.me.NodeID(), event)
if err != nil {
engine.LogError(e.log, err)
guarantee, ok := item.(*flow.CollectionGuarantee)
if !ok {
return fmt.Errorf("invalid type in pusher engine queue")
}
})
}

// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
e.unit.Launch(func() {
err := e.process(originID, event)
err := e.publishCollectionGuarantee(guarantee)
if err != nil {
engine.LogError(e.log, err)
return err
}
})
}

// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.unit.Do(func() error {
return e.process(e.me.NodeID(), event)
})
}

// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
return e.unit.Do(func() error {
return e.process(originID, event)
})
select {
case <-ctx.Done():
return nil
default:
}
}
}

// process processes events for the pusher engine on the collection node.
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
switch ev := event.(type) {
case *messages.SubmitCollectionGuarantee:
e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
return e.onSubmitCollectionGuarantee(originID, ev)
default:
return fmt.Errorf("invalid event type (%T)", event)
}
// Process is called by the networking layer, when peers broadcast messages with this node
// as one of the recipients. The protocol specifies that Collector nodes broadcast Collection
// Guarantees to Consensus Nodes and _only_ those. When the pusher engine (running only on
// Collectors) receives a message, this message is evidence of byzantine behavior.
// Byzantine inputs are internally handled by the pusher.Engine and do *not* result in
// error returns. No errors expected during normal operation (including byzantine inputs).
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
// Targeting a collector node's pusher.Engine with messages could be considered as a slashable offense.
// Though, for generating cryptographic evidence, we need Message Forensics - see reference [1].
// Much further into the future, when we are implementing slashing challenges, we'll probably implement a
// dedicated consumer to post-process evidence of protocol violations into slashing challenges. For now,
// we just log this with the `KeySuspicious` to alert the node operator.
// [1] Message Forensics FLIP https://github.com/onflow/flips/pull/195)
errs := fmt.Errorf("collector node's pusher.Engine was targeted by message %T on channel %v", message, channel)
e.log.Warn().
Err(errs).
Bool(logging.KeySuspicious, true).
Str("peer_id", originID.String()).
Msg("potentially byzantine networking traffic detected")
return nil
}

// onSubmitCollectionGuarantee handles submitting the given collection guarantee
// to consensus nodes.
func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error {
if originID != e.me.NodeID() {
return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID)
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
// to later be published to consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) {
if e.queue.Push(guarantee) {
e.notifier.Notify()
} else {
e.engMetrics.OutboundMessageDropped(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee)
}

return e.SubmitCollectionGuarantee(&req.Guarantee)
}

// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes.
// No errors expected during normal operation.
func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus))
if err != nil {
return fmt.Errorf("could not get consensus nodes: %w", err)
return fmt.Errorf("could not get consensus nodes' identities: %w", err)
}

// NOTE: Consensus nodes do not broadcast guarantees among themselves, so it needs that
// at least one collection node make a publish to all of them.
// NOTE: Consensus nodes do not broadcast guarantees among themselves. So for the collection to be included,
// at least one collector has to successfully broadcast the collection to consensus nodes. Otherwise, the
// collection is lost, which is acceptable as long as we only lose a small fraction of collections.
err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...)
if err != nil {
return fmt.Errorf("could not submit collection guarantee: %w", err)
Expand Down
Loading
Loading