Skip to content

Commit

Permalink
Merge pull request #4843 from onflow/leo/jobqueue-processed-index
Browse files Browse the repository at this point in the history
[JobQueue] Move the DefaultIndex from Start method to constructor
  • Loading branch information
zhangchiqing authored Oct 23, 2023
2 parents e53460b + a96dec6 commit 695cb60
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 112 deletions.
11 changes: 9 additions & 2 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
execDataCacheBackend,
)

builder.ExecutionDataRequester = edrequester.New(
r, err := edrequester.New(
builder.Logger,
metrics.NewExecutionDataRequesterCollector(),
builder.ExecutionDataDownloader,
Expand All @@ -638,6 +638,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.executionDataConfig,
execDataDistributor,
)
if err != nil {
return nil, fmt.Errorf("failed to create execution data requester: %w", err)
}
builder.ExecutionDataRequester = r

builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.ExecutionDataRequester.OnBlockFinalized)

Expand Down Expand Up @@ -741,7 +745,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.ExecutionIndexerCore = indexerCore

// execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer.
builder.ExecutionIndexer = indexer.NewIndexer(
builder.ExecutionIndexer, err = indexer.NewIndexer(
builder.Logger,
registers.FirstHeight(),
registers,
Expand All @@ -750,6 +754,9 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.ExecutionDataRequester.HighestConsecutiveHeight,
indexedBlockHeight,
)
if err != nil {
return nil, err
}

// setup requester to notify indexer when new execution data is received
execDataDistributor.AddOnExecutionDataReceivedConsumer(builder.ExecutionIndexer.OnExecutionData)
Expand Down
6 changes: 5 additions & 1 deletion cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,18 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
v.verConf.stopAtHeight)

// requester and fetcher engines are started by chunk consumer
chunkConsumer = chunkconsumer.NewChunkConsumer(
chunkConsumer, err = chunkconsumer.NewChunkConsumer(
node.Logger,
collector,
processedChunkIndex,
chunkQueue,
fetcherEngine,
v.verConf.chunkWorkers)

if err != nil {
return nil, fmt.Errorf("could not create chunk consumer: %w", err)
}

err = node.Metrics.Mempool.Register(metrics.ResourceChunkConsumer, chunkConsumer.Size)
if err != nil {
return nil, fmt.Errorf("could not register backend metric: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,12 +1057,13 @@ func VerificationNode(t testing.TB,
}

if node.ChunkConsumer == nil {
node.ChunkConsumer = chunkconsumer.NewChunkConsumer(node.Log,
node.ChunkConsumer, err = chunkconsumer.NewChunkConsumer(node.Log,
collector,
node.ProcessedChunkIndex,
node.ChunksQueue,
node.FetcherEngine,
chunkconsumer.DefaultChunkWorkers) // defaults number of workers to 3.
require.NoError(t, err)
err = mempoolCollector.Register(metrics.ResourceChunkConsumer, node.ChunkConsumer.Size)
require.NoError(t, err)
}
Expand Down
22 changes: 12 additions & 10 deletions engine/verification/assigner/blockconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ const DefaultBlockWorkers = uint64(2)
// and notifies the consumer to check in the job queue
// (i.e., its block reader) for new block jobs.
type BlockConsumer struct {
consumer module.JobConsumer
defaultIndex uint64
unit *engine.Unit
metrics module.VerificationMetrics
consumer module.JobConsumer
unit *engine.Unit
metrics module.VerificationMetrics
}

// defaultProcessedIndex returns the last sealed block height from the protocol state.
Expand Down Expand Up @@ -59,17 +58,20 @@ func NewBlockConsumer(log zerolog.Logger,
// the block reader is where the consumer reads new finalized blocks from (i.e., jobs).
jobs := jobqueue.NewFinalizedBlockReader(state, blocks)

consumer := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0)
defaultIndex, err := defaultProcessedIndex(state)
if err != nil {
return nil, 0, fmt.Errorf("could not read default processed index: %w", err)
}

consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0, defaultIndex)
if err != nil {
return nil, 0, fmt.Errorf("could not create block consumer: %w", err)
}

blockConsumer := &BlockConsumer{
consumer: consumer,
defaultIndex: defaultIndex,
unit: engine.NewUnit(),
metrics: metrics,
consumer: consumer,
unit: engine.NewUnit(),
metrics: metrics,
}
worker.withBlockConsumer(blockConsumer)

Expand Down Expand Up @@ -99,7 +101,7 @@ func (c *BlockConsumer) OnFinalizedBlock(*model.Block) {
}

func (c *BlockConsumer) Ready() <-chan struct{} {
err := c.consumer.Start(c.defaultIndex)
err := c.consumer.Start()
if err != nil {
panic(fmt.Errorf("could not start block consumer for finder engine: %w", err))
}
Expand Down
11 changes: 7 additions & 4 deletions engine/verification/fetcher/chunkconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ func NewChunkConsumer(
chunksQueue storage.ChunksQueue, // to read jobs (chunks) from
chunkProcessor fetcher.AssignedChunkProcessor, // to process jobs (chunks)
maxProcessing uint64, // max number of jobs to be processed in parallel
) *ChunkConsumer {
) (*ChunkConsumer, error) {
worker := NewWorker(chunkProcessor)
chunkProcessor.WithChunkConsumerNotifier(worker)

jobs := &ChunkJobs{locators: chunksQueue}

lg := log.With().Str("module", "chunk_consumer").Logger()
consumer := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0)
consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0, DefaultJobIndex)
if err != nil {
return nil, err
}

chunkConsumer := &ChunkConsumer{
consumer: consumer,
Expand All @@ -50,7 +53,7 @@ func NewChunkConsumer(

worker.consumer = chunkConsumer

return chunkConsumer
return chunkConsumer, nil
}

func (c *ChunkConsumer) NotifyJobIsDone(jobID module.JobID) {
Expand All @@ -68,7 +71,7 @@ func (c ChunkConsumer) Check() {
}

func (c *ChunkConsumer) Ready() <-chan struct{} {
err := c.consumer.Start(DefaultJobIndex)
err := c.consumer.Start()
if err != nil {
panic(fmt.Errorf("could not start the chunk consumer for match engine: %w", err))
}
Expand Down
3 changes: 2 additions & 1 deletion engine/verification/fetcher/chunkconsumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,15 @@ func WithConsumer(
}

collector := &metrics.NoopCollector{}
consumer := chunkconsumer.NewChunkConsumer(
consumer, err := chunkconsumer.NewChunkConsumer(
unittest.Logger(),
collector,
processedIndex,
chunksQueue,
engine,
maxProcessing,
)
require.NoError(t, err)

withConsumer(consumer, chunksQueue)
})
Expand Down
5 changes: 2 additions & 3 deletions module/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ type NewJobListener interface {
type JobConsumer interface {
NewJobListener

// Start starts processing jobs from a job queue. If this is the first time, a processed index
// will be initialized in the storage. If it fails to initialize, an error will be returned
Start(defaultIndex uint64) error
// Start starts processing jobs from a job queue.
Start() error

// Stop gracefully stops the consumer from reading new jobs from the job queue. It does not stop
// the existing worker finishing their jobs
Expand Down
13 changes: 9 additions & 4 deletions module/jobqueue/component_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewComponentConsumer(
processor JobProcessor, // method used to process jobs
maxProcessing uint64,
maxSearchAhead uint64,
) *ComponentConsumer {
) (*ComponentConsumer, error) {

c := &ComponentConsumer{
workSignal: workSignal,
Expand All @@ -47,12 +47,17 @@ func NewComponentConsumer(
func(id module.JobID) { c.NotifyJobIsDone(id) },
maxProcessing,
)
c.consumer = NewConsumer(c.log, c.jobs, progress, worker, maxProcessing, maxSearchAhead)

consumer, err := NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead, defaultIndex)
if err != nil {
return nil, err
}
c.consumer = consumer

builder := component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
c.log.Info().Msg("job consumer starting")
err := c.consumer.Start(defaultIndex)
err := c.consumer.Start()
if err != nil {
ctx.Throw(fmt.Errorf("could not start consumer: %w", err))
}
Expand Down Expand Up @@ -95,7 +100,7 @@ func NewComponentConsumer(
c.cm = cm
c.Component = cm

return c
return c, nil
}

// SetPreNotifier sets a notification function that is invoked before marking a job as done in the
Expand Down
6 changes: 4 additions & 2 deletions module/jobqueue/component_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

Expand Down Expand Up @@ -88,7 +89,7 @@ func (suite *ComponentConsumerSuite) prepareTest(
progress.On("ProcessedIndex").Return(suite.defaultIndex, nil)
progress.On("SetProcessedIndex", mock.AnythingOfType("uint64")).Return(nil)

consumer := NewComponentConsumer(
consumer, err := NewComponentConsumer(
zerolog.New(os.Stdout).With().Timestamp().Logger(),
workSignal,
progress,
Expand All @@ -98,6 +99,7 @@ func (suite *ComponentConsumerSuite) prepareTest(
suite.maxProcessing,
suite.maxSearchAhead,
)
require.NoError(suite.T(), err)
consumer.SetPreNotifier(preNotifier)
consumer.SetPostNotifier(postNotifier)

Expand Down Expand Up @@ -230,7 +232,7 @@ func (suite *ComponentConsumerSuite) TestSignalsBeforeReadyDoNotCheck() {
started := atomic.NewBool(false)

jobConsumer := modulemock.NewJobConsumer(suite.T())
jobConsumer.On("Start", suite.defaultIndex).Return(func(_ uint64) error {
jobConsumer.On("Start").Return(func() error {
// force Start to take a while so the processingLoop is ready first
// the processingLoop should wait to start, otherwise Check would be called
time.Sleep(500 * time.Millisecond)
Expand Down
58 changes: 34 additions & 24 deletions module/jobqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ func NewConsumer(
worker Worker,
maxProcessing uint64,
maxSearchAhead uint64,
) *Consumer {
defaultIndex uint64,
) (*Consumer, error) {

processedIndex, err := readProcessedIndex(log, progress, defaultIndex)
if err != nil {
return nil, fmt.Errorf("could not read processed index: %w", err)
}

return &Consumer{
log: log.With().Str("sub_module", "job_queue").Logger(),

Expand All @@ -71,48 +78,51 @@ func NewConsumer(
running: false,
isChecking: atomic.NewBool(false),
started: atomic.NewBool(false),
processedIndex: 0,
processedIndex: processedIndex,
processings: make(map[uint64]*jobStatus),
processingsIndex: make(map[module.JobID]uint64),
}
}, nil
}

// Start starts consuming the jobs from the job queue.
func (c *Consumer) Start(defaultIndex uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

if !c.started.CompareAndSwap(false, true) {
return fmt.Errorf("consumer has already been started")
}
c.running = true

func readProcessedIndex(log zerolog.Logger, progress storage.ConsumerProgress, defaultIndex uint64) (uint64, error) {
// on startup, sync with storage for the processed index
// to ensure the consistency
processedIndex, err := c.progress.ProcessedIndex()
processedIndex, err := progress.ProcessedIndex()
if errors.Is(err, storage.ErrNotFound) {
err := c.progress.InitProcessedIndex(defaultIndex)
err := progress.InitProcessedIndex(defaultIndex)
if errors.Is(err, storage.ErrAlreadyExists) {
return fmt.Errorf("processed index has already been inited, no effect for the second time. default index: %v",
return 0, fmt.Errorf("processed index has already been inited, no effect for the second time. default index: %v",
defaultIndex)
}

if err != nil {
return fmt.Errorf("could not init processed index: %w", err)
return 0, fmt.Errorf("could not init processed index: %w", err)
}

processedIndex = defaultIndex

c.log.Warn().Uint64("processed index", processedIndex).
log.Warn().Uint64("processed index", processedIndex).
Msg("processed index not found, initialized.")
} else if err != nil {
return fmt.Errorf("could not read processed index: %w", err)
return defaultIndex, nil
}

if err != nil {
return 0, fmt.Errorf("could not read processed index: %w", err)
}

c.processedIndex = processedIndex
return processedIndex, nil
}

// Start starts consuming the jobs from the job queue.
func (c *Consumer) Start() error {
c.mu.Lock()
defer c.mu.Unlock()

if !c.started.CompareAndSwap(false, true) {
return fmt.Errorf("consumer has already been started")
}
c.running = true

c.log.Info().
Uint64("processed", processedIndex).
Uint64("processed", c.processedIndex).
Msg("consumer started")

c.checkProcessable()
Expand Down
Loading

0 comments on commit 695cb60

Please sign in to comment.