Skip to content

Commit

Permalink
Merge pull request #4957 from AndriiDiachuk/AndriiDiachuk/4869-get-pr…
Browse files Browse the repository at this point in the history
…otocol-snapshot-from-any-block-endpoint

[Access] Get protocol snapshot by block id and block height
  • Loading branch information
peterargue authored Dec 11, 2023
2 parents 6a6cd7c + c9b87ba commit aaa8e4e
Show file tree
Hide file tree
Showing 13 changed files with 727 additions and 42 deletions.
2 changes: 2 additions & 0 deletions access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type API interface {
GetEventsForBlockIDs(ctx context.Context, eventType string, blockIDs []flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) ([]flow.BlockEvents, error)

GetLatestProtocolStateSnapshot(ctx context.Context) ([]byte, error)
GetProtocolStateSnapshotByBlockID(ctx context.Context, blockID flow.Identifier) ([]byte, error)
GetProtocolStateSnapshotByHeight(ctx context.Context, blockHeight uint64) ([]byte, error)

GetExecutionResultForBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionResult, error)
GetExecutionResultByID(ctx context.Context, id flow.Identifier) (*flow.ExecutionResult, error)
Expand Down
42 changes: 32 additions & 10 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@ type Handler struct {
me module.Local
}

// TODO: this is implemented in https://github.com/onflow/flow-go/pull/4957, remove when merged
func (h *Handler) GetProtocolStateSnapshotByBlockID(ctx context.Context, request *access.GetProtocolStateSnapshotByBlockIDRequest) (*access.ProtocolStateSnapshotResponse, error) {
panic("implement me")
}

// TODO: this is implemented in https://github.com/onflow/flow-go/pull/4957, remove when merged
func (h *Handler) GetProtocolStateSnapshotByHeight(ctx context.Context, request *access.GetProtocolStateSnapshotByHeightRequest) (*access.ProtocolStateSnapshotResponse, error) {
panic("implement me")
}

// HandlerOption is used to hand over optional constructor parameters
type HandlerOption func(*Handler)

Expand Down Expand Up @@ -642,6 +632,38 @@ func (h *Handler) GetLatestProtocolStateSnapshot(ctx context.Context, req *acces
}, nil
}

// GetProtocolStateSnapshotByBlockID returns serializable Snapshot by blockID
func (h *Handler) GetProtocolStateSnapshotByBlockID(ctx context.Context, req *access.GetProtocolStateSnapshotByBlockIDRequest) (*access.ProtocolStateSnapshotResponse, error) {
metadata := h.buildMetadataResponse()

blockID := convert.MessageToIdentifier(req.GetBlockId())

snapshot, err := h.api.GetProtocolStateSnapshotByBlockID(ctx, blockID)
if err != nil {
return nil, err
}

return &access.ProtocolStateSnapshotResponse{
SerializedSnapshot: snapshot,
Metadata: metadata,
}, nil
}

// GetProtocolStateSnapshotByHeight returns serializable Snapshot by block height
func (h *Handler) GetProtocolStateSnapshotByHeight(ctx context.Context, req *access.GetProtocolStateSnapshotByHeightRequest) (*access.ProtocolStateSnapshotResponse, error) {
metadata := h.buildMetadataResponse()

snapshot, err := h.api.GetProtocolStateSnapshotByHeight(ctx, req.GetBlockHeight())
if err != nil {
return nil, err
}

return &access.ProtocolStateSnapshotResponse{
SerializedSnapshot: snapshot,
Metadata: metadata,
}, nil
}

// GetExecutionResultForBlockID returns the latest received execution result for the given block ID.
// AN might receive multiple receipts with conflicting results for unsealed blocks.
// If this case happens, since AN is not able to determine which result is the correct one until the block is sealed, it has to pick one result to respond to this query. For now, we return the result from the latest received receipt.
Expand Down
52 changes: 52 additions & 0 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,12 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
node.State,
node.Storage.Blocks,
node.Storage.Headers,
backend.NewNetworkAPI(node.State, node.RootChainID, backend.DefaultSnapshotHistoryLimit),
backend.NewNetworkAPI(
node.State,
node.RootChainID,
node.Storage.Headers,
backend.DefaultSnapshotHistoryLimit,
),
)),
}

Expand Down
22 changes: 12 additions & 10 deletions engine/access/apiproxy/access_api_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@ func (h *FlowAccessAPIRouter) log(handler, rpc string, err error) {
logger.Info().Msg("request succeeded")
}

// TODO: this is implemented in https://github.com/onflow/flow-go/pull/4957, remove when merged
func (h *FlowAccessAPIRouter) GetProtocolStateSnapshotByBlockID(ctx context.Context, request *access.GetProtocolStateSnapshotByBlockIDRequest) (*access.ProtocolStateSnapshotResponse, error) {
panic("implement me")
}

// TODO: this is implemented in https://github.com/onflow/flow-go/pull/4957, remove when merged
func (h *FlowAccessAPIRouter) GetProtocolStateSnapshotByHeight(ctx context.Context, request *access.GetProtocolStateSnapshotByHeightRequest) (*access.ProtocolStateSnapshotResponse, error) {
panic("implement me")
}

// Ping pings the service. It is special in the sense that it responds successful,
// only if all underlying services are ready.
func (h *FlowAccessAPIRouter) Ping(context context.Context, req *access.PingRequest) (*access.PingResponse, error) {
Expand Down Expand Up @@ -216,6 +206,18 @@ func (h *FlowAccessAPIRouter) GetLatestProtocolStateSnapshot(context context.Con
return res, err
}

func (h *FlowAccessAPIRouter) GetProtocolStateSnapshotByBlockID(context context.Context, req *access.GetProtocolStateSnapshotByBlockIDRequest) (*access.ProtocolStateSnapshotResponse, error) {
res, err := h.Observer.GetProtocolStateSnapshotByBlockID(context, req)
h.log("observer", "GetProtocolStateSnapshotByBlockID", err)
return res, err
}

func (h *FlowAccessAPIRouter) GetProtocolStateSnapshotByHeight(context context.Context, req *access.GetProtocolStateSnapshotByHeightRequest) (*access.ProtocolStateSnapshotResponse, error) {
res, err := h.Observer.GetProtocolStateSnapshotByHeight(context, req)
h.log("observer", "GetProtocolStateSnapshotByHeight", err)
return res, err
}

func (h *FlowAccessAPIRouter) GetExecutionResultForBlockID(context context.Context, req *access.GetExecutionResultForBlockIDRequest) (*access.ExecutionResultForBlockIDResponse, error) {
res, err := h.Upstream.GetExecutionResultForBlockID(context, req)
h.log("upstream", "GetExecutionResultForBlockID", err)
Expand Down
14 changes: 1 addition & 13 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/onflow/flow-go/cmd/build"
"github.com/onflow/flow-go/engine/access/rpc/connection"
"github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
Expand Down Expand Up @@ -210,6 +209,7 @@ func New(params Params) (*Backend, error) {
backendNetwork: backendNetwork{
state: params.State,
chainID: params.ChainID,
headers: params.Headers,
snapshotHistoryLimit: params.SnapshotHistoryLimit,
},
collections: params.Collections,
Expand Down Expand Up @@ -359,18 +359,6 @@ func (b *Backend) GetNetworkParameters(_ context.Context) access.NetworkParamete
}
}

// GetLatestProtocolStateSnapshot returns the latest finalized snapshot
func (b *Backend) GetLatestProtocolStateSnapshot(_ context.Context) ([]byte, error) {
snapshot := b.state.Final()

validSnapshot, err := b.getValidSnapshot(snapshot, 0)
if err != nil {
return nil, err
}

return convert.SnapshotToBytes(validSnapshot)
}

// executionNodesForBlockID returns upto maxNodesCnt number of randomly chosen execution node identities
// which have executed the given block ID.
// If no such execution node is found, an InsufficientExecutionReceipts error is returned.
Expand Down
122 changes: 118 additions & 4 deletions engine/access/rpc/backend/backend_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package backend

import (
"context"
"errors"
"fmt"

"github.com/onflow/flow-go/state"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -12,13 +15,15 @@ import (
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)

var SnapshotHistoryLimitErr = fmt.Errorf("reached the snapshot history limit")

type backendNetwork struct {
state protocol.State
chainID flow.ChainID
headers storage.Headers
snapshotHistoryLimit int
}

Expand All @@ -29,10 +34,16 @@ The observer and access nodes need to be able to handle GetNetworkParameters
and GetLatestProtocolStateSnapshot RPCs so this logic was split into
the backendNetwork so that we can ignore the rest of the backend logic
*/
func NewNetworkAPI(state protocol.State, chainID flow.ChainID, snapshotHistoryLimit int) *backendNetwork {
func NewNetworkAPI(
state protocol.State,
chainID flow.ChainID,
headers storage.Headers,
snapshotHistoryLimit int,
) *backendNetwork {
return &backendNetwork{
state: state,
chainID: chainID,
headers: headers,
snapshotHistoryLimit: snapshotHistoryLimit,
}
}
Expand Down Expand Up @@ -67,7 +78,7 @@ func (b *backendNetwork) GetNodeVersionInfo(ctx context.Context) (*access.NodeVe
func (b *backendNetwork) GetLatestProtocolStateSnapshot(_ context.Context) ([]byte, error) {
snapshot := b.state.Final()

validSnapshot, err := b.getValidSnapshot(snapshot, 0)
validSnapshot, err := b.getValidSnapshot(snapshot, 0, true)
if err != nil {
return nil, err
}
Expand All @@ -80,6 +91,102 @@ func (b *backendNetwork) GetLatestProtocolStateSnapshot(_ context.Context) ([]by
return data, nil
}

// GetProtocolStateSnapshotByBlockID returns serializable Snapshot for a block, by blockID.
// The requested block must be finalized, otherwise an error is returned.
// Expected errors during normal operation:
// - status.Error[codes.NotFound] - No block with the given ID was found
// - status.Error[codes.InvalidArgument] - Block ID will never have a valid snapshot:
// 1. A block was found, but it is not finalized and is below the finalized height, so it will never be finalized.
// 2. A block was found, however its sealing segment spans an epoch phase transition, yielding an invalid snapshot.
// - status.Error[codes.FailedPrecondition] - A block was found, but it is not finalized and is above the finalized height.
// The block may or may not be finalized in the future; the client can retry later.
func (b *backendNetwork) GetProtocolStateSnapshotByBlockID(_ context.Context, blockID flow.Identifier) ([]byte, error) {
snapshot := b.state.AtBlockID(blockID)
snapshotHeadByBlockId, err := snapshot.Head()
if err != nil {
if errors.Is(err, state.ErrUnknownSnapshotReference) {
return nil, status.Errorf(codes.NotFound, "failed to get a valid snapshot: block not found")
}

return nil, status.Errorf(codes.Internal, "could not get header by blockID: %v", err)
}

// Because there is no index from block ID to finalized height, we separately look up the finalized
// block ID by the height of the queried block, then compare the queried ID to the finalized ID.
// If they match, then the queried block must be finalized.
blockIDFinalizedAtHeight, err := b.headers.BlockIDByHeight(snapshotHeadByBlockId.Height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
// The block exists, but no block has been finalized at its height. Therefore, this block
// may be finalized in the future, and the client can retry.
return nil, status.Errorf(codes.FailedPrecondition,
"failed to retrieve snapshot for block with height %d: block not finalized and is above finalized height",
snapshotHeadByBlockId.Height)
}
return nil, status.Errorf(codes.Internal, "failed to lookup block id by height %d", snapshotHeadByBlockId.Height)
}

if blockIDFinalizedAtHeight != blockID {
// A different block than what was queried has been finalized at this height.
// Therefore, the queried block will never be finalized.
return nil, status.Errorf(codes.InvalidArgument,
"failed to retrieve snapshot for block: block not finalized and is below finalized height")
}

validSnapshot, err := b.getValidSnapshot(snapshot, 0, false)
if err != nil {
if errors.Is(err, ErrSnapshotPhaseMismatch) {
return nil, status.Errorf(codes.InvalidArgument,
"failed to retrieve snapshot for block, try again with different block: "+
"%v", err)
}
return nil, status.Errorf(codes.Internal, "failed to get a valid snapshot: %v", err)
}

data, err := convert.SnapshotToBytes(validSnapshot)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to convert snapshot to bytes: %v", err)
}

return data, nil
}

// GetProtocolStateSnapshotByHeight returns serializable Snapshot by block height.
// The block must be finalized (otherwise the by-height query is ambiguous).
// Expected errors during normal operation:
// - status.Error[codes.NotFound] - No block with the given height was found.
// The block height may or may not be finalized in the future; the client can retry later.
// - status.Error[codes.InvalidArgument] - A block was found, however its sealing segment spans an epoch phase transition,
// yielding an invalid snapshot. Therefore we will never return a snapshot for this block height.
func (b *backendNetwork) GetProtocolStateSnapshotByHeight(_ context.Context, blockHeight uint64) ([]byte, error) {
snapshot := b.state.AtHeight(blockHeight)
_, err := snapshot.Head()
if err != nil {
if errors.Is(err, state.ErrUnknownSnapshotReference) {
return nil, status.Errorf(codes.NotFound, "failed to find snapshot: %v", err)
}

return nil, status.Errorf(codes.Internal, "failed to get a valid snapshot: %v", err)
}

validSnapshot, err := b.getValidSnapshot(snapshot, 0, false)
if err != nil {
if errors.Is(err, ErrSnapshotPhaseMismatch) {
return nil, status.Errorf(codes.InvalidArgument,
"failed to retrieve snapshot for block, try again with different block: "+
"%v", err)
}
return nil, status.Errorf(codes.Internal, "failed to get a valid snapshot: %v", err)
}

data, err := convert.SnapshotToBytes(validSnapshot)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to convert snapshot to bytes: %v", err)
}

return data, nil
}

func (b *backendNetwork) isEpochOrPhaseDifferent(counter1, counter2 uint64, phase1, phase2 flow.EpochPhase) bool {
return counter1 != counter2 || phase1 != phase2
}
Expand All @@ -90,7 +197,10 @@ func (b *backendNetwork) isEpochOrPhaseDifferent(counter1, counter2 uint64, phas
// If a snapshot does contain an invalid sealing segment query the state
// by height of each block in the segment and return a snapshot at the point
// where the transition happens.
func (b *backendNetwork) getValidSnapshot(snapshot protocol.Snapshot, blocksVisited int) (protocol.Snapshot, error) {
// Expected error returns during normal operations:
// * ErrSnapshotPhaseMismatch - snapshot does not contain a valid sealing segment
// All other errors should be treated as exceptions.
func (b *backendNetwork) getValidSnapshot(snapshot protocol.Snapshot, blocksVisited int, findNextValidSnapshot bool) (protocol.Snapshot, error) {
segment, err := snapshot.SealingSegment()
if err != nil {
return nil, fmt.Errorf("failed to get sealing segment: %w", err)
Expand All @@ -109,6 +219,10 @@ func (b *backendNetwork) getValidSnapshot(snapshot protocol.Snapshot, blocksVisi
// Check if the counters and phase are different this indicates that the sealing segment
// of the snapshot requested spans either an epoch transition or phase transition.
if b.isEpochOrPhaseDifferent(counterAtHighest, counterAtLowest, phaseAtHighest, phaseAtLowest) {
if !findNextValidSnapshot {
return nil, ErrSnapshotPhaseMismatch
}

// Visit each node in strict order of decreasing height starting at head
// to find the block that straddles the transition boundary.
for i := len(segment.Blocks) - 1; i >= 0; i-- {
Expand All @@ -129,7 +243,7 @@ func (b *backendNetwork) getValidSnapshot(snapshot protocol.Snapshot, blocksVisi
// Check if this block straddles the transition boundary, if it does return the snapshot
// at that block height.
if b.isEpochOrPhaseDifferent(counterAtHighest, counterAtBlock, phaseAtHighest, phaseAtBlock) {
return b.getValidSnapshot(b.state.AtHeight(segment.Blocks[i].Header.Height), blocksVisited)
return b.getValidSnapshot(b.state.AtHeight(segment.Blocks[i].Header.Height), blocksVisited, true)
}
}
}
Expand Down
Loading

0 comments on commit aaa8e4e

Please sign in to comment.