Skip to content

Commit

Permalink
Merge pull request #5028 from onflow/leo/storehouse-loader
Browse files Browse the repository at this point in the history
[Storehouse] Unfinalized blocks loader
  • Loading branch information
zhangchiqing authored Nov 17, 2023
2 parents 9b0633e + 4685e19 commit fec85ee
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
}

fetcher := fetcher.NewCollectionFetcher(node.Logger, exeNode.collectionRequester, node.State, exeNode.exeConf.onflowOnlyLNs)
loader := loader.NewLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)
loader := loader.NewUnexecutedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState)

exeNode.ingestionEng, err = ingestion.New(
exeNode.ingestionUnit,
Expand Down
2 changes: 1 addition & 1 deletion engine/execution/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func runWithEngine(t *testing.T, f func(testingContext)) {
uploadMgr := uploader.NewManager(trace.NewNoopTracer())

fetcher := mocks.NewMockFetcher()
loader := loader.NewLoader(log, protocolState, headers, executionState)
loader := loader.NewUnexecutedLoader(log, protocolState, headers, executionState)

engine, err = New(
unit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,31 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

type Loader struct {
// deprecated. Storehouse is going to use unfinalized loader instead
type UnexecutedLoader struct {
log zerolog.Logger
state protocol.State
headers storage.Headers // see comments on getHeaderByHeight for why we need it
execState state.ExecutionState
}

func NewLoader(
func NewUnexecutedLoader(
log zerolog.Logger,
state protocol.State,
headers storage.Headers,
execState state.ExecutionState,
) *Loader {
return &Loader{
log: log.With().Str("component", "ingestion_engine_block_loader").Logger(),
) *UnexecutedLoader {
return &UnexecutedLoader{
log: log.With().Str("component", "ingestion_engine_unexecuted_loader").Logger(),
state: state,
headers: headers,
execState: execState,
}
}

func (e *Loader) LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error) {
// LoadUnexecuted loads all unexecuted and validated blocks
// any error returned are exceptions
func (e *UnexecutedLoader) LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error) {
// saving an executed block is currently not transactional, so it's possible
// the block is marked as executed but the receipt might not be saved during a crash.
// in order to mitigate this problem, we always re-execute the last executed and finalized
Expand Down Expand Up @@ -104,7 +107,7 @@ func (e *Loader) LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error)
return blockIDs, nil
}

func (e *Loader) unexecutedBlocks(ctx context.Context) (
func (e *UnexecutedLoader) unexecutedBlocks(ctx context.Context) (
finalized []flow.Identifier,
pending []flow.Identifier,
err error,
Expand All @@ -126,7 +129,7 @@ func (e *Loader) unexecutedBlocks(ctx context.Context) (
return finalized, pending, nil
}

func (e *Loader) finalizedUnexecutedBlocks(ctx context.Context, finalized protocol.Snapshot) (
func (e *UnexecutedLoader) finalizedUnexecutedBlocks(ctx context.Context, finalized protocol.Snapshot) (
[]flow.Identifier,
error,
) {
Expand Down Expand Up @@ -196,7 +199,7 @@ func (e *Loader) finalizedUnexecutedBlocks(ctx context.Context, finalized protoc
return unexecuted, nil
}

func (e *Loader) pendingUnexecutedBlocks(ctx context.Context, finalized protocol.Snapshot) (
func (e *UnexecutedLoader) pendingUnexecutedBlocks(ctx context.Context, finalized protocol.Snapshot) (
[]flow.Identifier,
error,
) {
Expand Down Expand Up @@ -224,7 +227,7 @@ func (e *Loader) pendingUnexecutedBlocks(ctx context.Context, finalized protocol
// if the EN is dynamically bootstrapped, the finalized blocks at height range:
// [ sealedRoot.Height, finalizedRoot.Height - 1] can not be retrieved from
// protocol state, but only from headers
func (e *Loader) getHeaderByHeight(height uint64) (*flow.Header, error) {
func (e *UnexecutedLoader) getHeaderByHeight(height uint64) (*flow.Header, error) {
// we don't use protocol state because for dynamic boostrapped execution node
// the last executed and sealed block is below the finalized root block
return e.headers.ByHeight(height)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/onflow/flow-go/utils/unittest/mocks"
)

var _ ingestion.BlockLoader = (*loader.Loader)(nil)
var _ ingestion.BlockLoader = (*loader.UnexecutedLoader)(nil)

// ExecutionState is a mocked version of execution state that
// simulates some of its behavior for testing purpose
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
headers := storage.NewMockHeaders(ctrl)
headers.EXPECT().ByBlockID(genesis.ID()).Return(genesis.Header, nil)
log := unittest.Logger()
loader := loader.NewLoader(log, ps, headers, es)
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

unexecuted, err := loader.LoadUnexecuted(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
headers := storage.NewMockHeaders(ctrl)
headers.EXPECT().ByBlockID(genesis.ID()).Return(genesis.Header, nil)
log := unittest.Logger()
loader := loader.NewLoader(log, ps, headers, es)
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

unexecuted, err := loader.LoadUnexecuted(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
headers := storage.NewMockHeaders(ctrl)
headers.EXPECT().ByBlockID(genesis.ID()).Return(genesis.Header, nil)
log := unittest.Logger()
loader := loader.NewLoader(log, ps, headers, es)
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

es.ExecuteBlock(t, blockA)
es.ExecuteBlock(t, blockB)
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
headers := storage.NewMockHeaders(ctrl)
headers.EXPECT().ByBlockID(genesis.ID()).Return(genesis.Header, nil)
log := unittest.Logger()
loader := loader.NewLoader(log, ps, headers, es)
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block C is the only finalized block, index its header by its height
headers.EXPECT().ByHeight(blockC.Header.Height).Return(blockC.Header, nil)
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
headers := storage.NewMockHeaders(ctrl)
headers.EXPECT().ByBlockID(genesis.ID()).Return(genesis.Header, nil)
log := unittest.Logger()
loader := loader.NewLoader(log, ps, headers, es)
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block C is finalized, index its header by its height
headers.EXPECT().ByHeight(blockC.Header.Height).Return(blockC.Header, nil)
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
headers := storage.NewMockHeaders(ctrl)
headers.EXPECT().ByBlockID(genesis.ID()).Return(genesis.Header, nil)
log := unittest.Logger()
loader := loader.NewLoader(log, ps, headers, es)
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block A is finalized, index its header by its height
headers.EXPECT().ByHeight(blockA.Header.Height).Return(blockA.Header, nil)
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
headers := storage.NewMockHeaders(ctrl)
headers.EXPECT().ByBlockID(genesis.ID()).Return(genesis.Header, nil)
log := unittest.Logger()
loader := loader.NewLoader(log, ps, headers, es)
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block C is finalized, index its header by its height
headers.EXPECT().ByHeight(blockC.Header.Height).Return(blockC.Header, nil)
Expand Down
91 changes: 91 additions & 0 deletions engine/execution/ingestion/loader/unfinalized_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package loader

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/execution/state"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)

type UnfinalizedLoader struct {
log zerolog.Logger
state protocol.State
headers storage.Headers // see comments on getHeaderByHeight for why we need it
execState state.FinalizedExecutionState
}

// NewUnfinalizedLoader creates a new loader that loads all unfinalized and validated blocks
func NewUnfinalizedLoader(
log zerolog.Logger,
state protocol.State,
headers storage.Headers,
execState state.FinalizedExecutionState,
) *UnfinalizedLoader {
return &UnfinalizedLoader{
log: log.With().Str("component", "ingestion_engine_unfinalized_loader").Logger(),
state: state,
headers: headers,
execState: execState,
}
}

// LoadUnexecuted loads all unfinalized and validated blocks
// any error returned are exceptions
func (e *UnfinalizedLoader) LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error) {
lastExecuted := e.execState.GetHighestFinalizedExecuted()

// get finalized height
finalized := e.state.Final()
final, err := finalized.Head()
if err != nil {
return nil, fmt.Errorf("could not get finalized block: %w", err)
}

// TODO: dynamically bootstrapped execution node will reload blocks from
unexecutedFinalized := make([]flow.Identifier, 0)

// starting from the first unexecuted block, go through each unexecuted and finalized block
// reload its block to execution queues
// loading finalized blocks
for height := lastExecuted + 1; height <= final.Height; height++ {
header, err := e.getHeaderByHeight(height)
if err != nil {
return nil, fmt.Errorf("could not get header at height: %v, %w", height, err)
}

unexecutedFinalized = append(unexecutedFinalized, header.ID())
}

// loaded all pending blocks
pendings, err := finalized.Descendants()
if err != nil {
return nil, fmt.Errorf("could not get descendants of finalized block: %w", err)
}

unexecuted := append(unexecutedFinalized, pendings...)

e.log.Info().
Uint64("last_finalized", final.Height).
Uint64("last_finalized_executed", lastExecuted).
// Uint64("sealed_root_height", rootBlock.Height).
// Hex("sealed_root_id", logging.Entity(rootBlock)).
Int("total_finalized_unexecuted", len(unexecutedFinalized)).
Int("total_unexecuted", len(unexecuted)).
Msgf("finalized unexecuted blocks")

return unexecuted, nil
}

// if the EN is dynamically bootstrapped, the finalized blocks at height range:
// [ sealedRoot.Height, finalizedRoot.Height - 1] can not be retrieved from
// protocol state, but only from headers
func (e *UnfinalizedLoader) getHeaderByHeight(height uint64) (*flow.Header, error) {
// we don't use protocol state because for dynamic boostrapped execution node
// the last executed and sealed block is below the finalized root block
return e.headers.ByHeight(height)
}
55 changes: 55 additions & 0 deletions engine/execution/ingestion/loader/unfinalized_loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package loader_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/engine/execution/ingestion"
"github.com/onflow/flow-go/engine/execution/ingestion/loader"
stateMock "github.com/onflow/flow-go/engine/execution/state/mock"
"github.com/onflow/flow-go/model/flow"
storage "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
)

var _ ingestion.BlockLoader = (*loader.UnfinalizedLoader)(nil)

func TestLoadingUnfinalizedBlocks(t *testing.T) {
ps := mocks.NewProtocolState()

// Genesis <- A <- B <- C (finalized) <- D
chain, result, seal := unittest.ChainFixture(5)
genesis, blockA, blockB, blockC, blockD :=
chain[0], chain[1], chain[2], chain[3], chain[4]

logChain(chain)

require.NoError(t, ps.Bootstrap(genesis, result, seal))
require.NoError(t, ps.Extend(blockA))
require.NoError(t, ps.Extend(blockB))
require.NoError(t, ps.Extend(blockC))
require.NoError(t, ps.Extend(blockD))
require.NoError(t, ps.Finalize(blockC.ID()))

es := new(stateMock.FinalizedExecutionState)
es.On("GetHighestFinalizedExecuted").Return(genesis.Header.Height)
headers := new(storage.Headers)
headers.On("ByHeight", blockA.Header.Height).Return(blockA.Header, nil)
headers.On("ByHeight", blockB.Header.Height).Return(blockB.Header, nil)
headers.On("ByHeight", blockC.Header.Height).Return(blockC.Header, nil)

loader := loader.NewUnfinalizedLoader(unittest.Logger(), ps, headers, es)

unexecuted, err := loader.LoadUnexecuted(context.Background())
require.NoError(t, err)

unittest.IDsEqual(t, []flow.Identifier{
blockA.ID(),
blockB.ID(),
blockC.ID(),
blockD.ID(),
}, unexecuted)
}
39 changes: 39 additions & 0 deletions engine/execution/state/mock/finalized_execution_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type ScriptExecutionState interface {
HasState(flow.StateCommitment) bool
}

// FinalizedExecutionState is an interface used to access the finalized execution state
type FinalizedExecutionState interface {
GetHighestFinalizedExecuted() uint64
}

// TODO Many operations here are should be transactional, so we need to refactor this
// to store a reference to DB and compose operations and procedures rather then
// just being amalgamate of proxies for single transactions operation
Expand Down
2 changes: 1 addition & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit
)

fetcher := exeFetcher.NewCollectionFetcher(node.Log, requestEngine, node.State, false)
loader := loader.NewLoader(node.Log, node.State, node.Headers, execState)
loader := loader.NewUnexecutedLoader(node.Log, node.State, node.Headers, execState)
rootHead, rootQC := getRoot(t, &node)
ingestionEngine, err := ingestion.New(
unit,
Expand Down

0 comments on commit fec85ee

Please sign in to comment.