Skip to content

Commit

Permalink
add verify evm offchain replay util cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Nov 21, 2024
1 parent dffadd8 commit 81578f4
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 59 deletions.
3 changes: 3 additions & 0 deletions cmd/util/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/onflow/flow-go/cmd/util/cmd/snapshot"
system_addresses "github.com/onflow/flow-go/cmd/util/cmd/system-addresses"
truncate_database "github.com/onflow/flow-go/cmd/util/cmd/truncate-database"
verify_evm_offchain_replay "github.com/onflow/flow-go/cmd/util/cmd/verify-evm-offchain-replay"
"github.com/onflow/flow-go/cmd/util/cmd/version"
"github.com/onflow/flow-go/module/profiler"
)
Expand Down Expand Up @@ -120,6 +121,8 @@ func addCommands() {
rootCmd.AddCommand(system_addresses.Cmd)
rootCmd.AddCommand(check_storage.Cmd)
rootCmd.AddCommand(generate_authorization_fixes.Cmd)
rootCmd.AddCommand(evm_state_exporter.Cmd)
rootCmd.AddCommand(verify_evm_offchain_replay.Cmd)
}

func initConfig() {
Expand Down
79 changes: 79 additions & 0 deletions cmd/util/cmd/verify-evm-offchain-replay/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package verify

import (
"fmt"
"strconv"
"strings"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/onflow/flow-go/model/flow"
)

var (
flagLastK uint64
flagDatadir string
flagExecutionDataDir string
flagChain string
flagFromTo string
)

// # verify the blocks from height 2000 to 3000
// ./util verify_execution_result --chain flow-testnet --datadir /var/flow/data/protocol --chunk_data_pack_dir /var/flow/data/chunk_data_pack --from_to 2000-3000
var Cmd = &cobra.Command{
Use: "verify_evm_offchain_replay",
Short: "verify evm offchain replay with execution data",
Run: run,
}

func init() {
Cmd.Flags().StringVar(&flagChain, "chain", "", "Chain name")
_ = Cmd.MarkFlagRequired("chain")

Cmd.Flags().StringVar(&flagDatadir, "datadir", "/var/flow/data/protocol",
"directory that stores the protocol state")

Cmd.Flags().StringVar(&flagExecutionDataDir, "execution_data_dir", "/var/flow/data/execution_data",
"directory that stores the execution state")
}

func run(*cobra.Command, []string) {
_ = flow.ChainID(flagChain).Chain()

from, to, err := parseFromTo(flagFromTo)
if err != nil {
log.Fatal().Err(err).Msg("could not parse from_to")
}

log.Info().Msgf("verifying range from %d to %d", from, to)
err = Verify(from, to, flow.Testnet, flagDatadir, flagExecutionDataDir)
if err != nil {
log.Fatal().Err(err).Msg("could not verify last k height")
}
log.Info().Msgf("successfully verified range from %d to %d", from, to)

}

func parseFromTo(fromTo string) (from, to uint64, err error) {
parts := strings.Split(fromTo, "-")
if len(parts) != 2 {
return 0, 0, fmt.Errorf("invalid format: expected 'from-to', got '%s'", fromTo)
}

from, err = strconv.ParseUint(strings.TrimSpace(parts[0]), 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid 'from' value: %w", err)
}

to, err = strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid 'to' value: %w", err)
}

if from > to {
return 0, 0, fmt.Errorf("'from' value (%d) must be less than or equal to 'to' value (%d)", from, to)
}

return from, to, nil
}
69 changes: 69 additions & 0 deletions cmd/util/cmd/verify-evm-offchain-replay/verify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package verify

import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/dgraph-io/badger/v2"
badgerds "github.com/ipfs/go-ds-badger2"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/fvm/evm/offchain/utils"
"github.com/onflow/flow-go/fvm/evm/testutils"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/storage"
)

func Verify(from uint64, to uint64, chainID flow.ChainID, dataDir string, executionDataDir string) error {
db, storages, executionDataStore, dsStore, err := initStorages(chainID, dataDir, executionDataDir)
if err != nil {
return fmt.Errorf("could not initialize storages: %w", err)
}

defer db.Close()
defer dsStore.Close()

store := testutils.GetSimpleValueStore()

return utils.OffchainReplayBackwardCompatibilityTest(
chainID,
from,
to,
storages.Headers,
storages.Results,
executionDataStore,
store,
)
}

func initStorages(chainID flow.ChainID, dataDir string, executionDataDir string) (
*badger.DB,
*storage.All,
execution_data.ExecutionDataGetter,
io.Closer,
error,
) {
db := common.InitStorage(dataDir)

storages := common.InitStorages(db)

datastoreDir := filepath.Join(executionDataDir, "blobstore")
err := os.MkdirAll(datastoreDir, 0700)
if err != nil {
return nil, nil, nil, nil, err
}
dsOpts := &badgerds.DefaultOptions
ds, err := badgerds.NewDatastore(datastoreDir, dsOpts)
if err != nil {
return nil, nil, nil, nil, err
}

executionDataBlobstore := blobs.NewBlobstore(ds)
executionDataStore := execution_data.NewExecutionDataStore(executionDataBlobstore, execution_data.DefaultSerializer)

return db, storages, executionDataStore, ds, nil
}
84 changes: 25 additions & 59 deletions fvm/evm/offchain/utils/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestTestnetBackwardCompatibility(t *testing.T) {
// > ~/Downloads/events_devnet51_1.jsonl
// ...
//
// 2) comment the above t.Skip, and update the events file paths and checkpoint dir
// 2) comment the above t.Skip, and update the events file paths and evmStateGob dir
// to run the tests
BackwardCompatibleSinceEVMGenesisBlock(
t, flow.Testnet, []string{
Expand Down Expand Up @@ -78,67 +78,67 @@ func TestTestnetBackwardCompatibility(t *testing.T) {
// --start 211176670 --end 211176770 --network testnet --host access-001.devnet51.nodes.onflow.org:9000
//
// During the replay process, it will generate `values_<height>.gob` and
// `allocators_<height>.gob` checkpoint files for each height. If these checkpoint files exist,
// `allocators_<height>.gob` checkpoint files for each height. If these checkpoint gob files exist,
// the corresponding event JSON files will be skipped to optimize replay.
func BackwardCompatibleSinceEVMGenesisBlock(
t *testing.T,
chainID flow.ChainID,
eventsFilePaths []string, // ordered EVM events in JSONL format
checkpointDir string,
checkpointEndHeight uint64, // EVM height of an EVM state that a checkpoint was created for
evmStateGob string,
evmStateEndHeight uint64, // EVM height of an EVM state that a evmStateGob file was created for
) {
// ensure that checkpoints are not more than the event files
// ensure that event files is not an empty array
require.True(t, len(eventsFilePaths) > 0)

log.Info().Msgf("replaying EVM events from %v to %v, with checkpoints in %s, and checkpointEndHeight: %v",
log.Info().Msgf("replaying EVM events from %v to %v, with evmStateGob file in %s, and evmStateEndHeight: %v",
eventsFilePaths[0], eventsFilePaths[len(eventsFilePaths)-1],
checkpointDir, checkpointEndHeight)
evmStateGob, evmStateEndHeight)

store, checkpointEndHeightOrZero := initStorageWithCheckpoints(t, chainID, checkpointDir, checkpointEndHeight)
store, evmStateEndHeightOrZero := initStorageWithEVMStateGob(t, chainID, evmStateGob, evmStateEndHeight)

// the events to replay
nextHeight := checkpointEndHeightOrZero + 1
nextHeight := evmStateEndHeightOrZero + 1

// replay each event files
for _, eventsFilePath := range eventsFilePaths {
log.Info().Msgf("replaying events from %v, nextHeight: %v", eventsFilePath, nextHeight)

checkpointEndHeight := replayEvents(t, chainID, store, eventsFilePath, checkpointDir, nextHeight)
nextHeight = checkpointEndHeight + 1
evmStateEndHeight := replayEvents(t, chainID, store, eventsFilePath, evmStateGob, nextHeight)
nextHeight = evmStateEndHeight + 1
}

log.Info().
Msgf("succhessfully replayed all events and state changes are consistent with onchain state change. nextHeight: %v", nextHeight)
}

func initStorageWithCheckpoints(t *testing.T, chainID flow.ChainID, checkpointDir string, checkpointEndHeight uint64) (
func initStorageWithEVMStateGob(t *testing.T, chainID flow.ChainID, evmStateGob string, evmStateEndHeight uint64) (
*TestValueStore, uint64,
) {
rootAddr := evm.StorageAccountAddress(chainID)

// if there is no checkpoint, create a empty store and initialize the account status,
// if there is no evmStateGob file, create a empty store and initialize the account status,
// return 0 as the genesis height
if checkpointEndHeight == 0 {
if evmStateEndHeight == 0 {
store := GetSimpleValueStore()
as := environment.NewAccountStatus()
require.NoError(t, store.SetValue(rootAddr[:], []byte(flow.AccountStatusKey), as.ToBytes()))

return store, 0
}

valueFileName, allocatorFileName := checkpointFileNamesByEndHeight(checkpointDir, checkpointEndHeight)
valueFileName, allocatorFileName := evmStateGobFileNamesByEndHeight(evmStateGob, evmStateEndHeight)
values, err := deserialize(valueFileName)
require.NoError(t, err)
allocators, err := deserializeAllocator(allocatorFileName)
require.NoError(t, err)
store := GetSimpleValueStorePopulated(values, allocators)
return store, checkpointEndHeight
return store, evmStateEndHeight
}

func replayEvents(
t *testing.T,
chainID flow.ChainID,
store *TestValueStore, eventsFilePath string, checkpointDir string, initialNextHeight uint64) uint64 {
store *TestValueStore, eventsFilePath string, evmStateGob string, initialNextHeight uint64) uint64 {

rootAddr := evm.StorageAccountAddress(chainID)

Expand Down Expand Up @@ -185,63 +185,29 @@ func replayEvents(
return nil
})

checkpointEndHeight := nextHeight - 1
evmStateEndHeight := nextHeight - 1

log.Info().Msgf("finished replaying events from %v to %v, creating checkpoint", initialNextHeight, checkpointEndHeight)
valuesFile, allocatorsFile := dumpCheckpoint(t, store, checkpointDir, checkpointEndHeight)
log.Info().Msgf("checkpoint created: %v, %v", valuesFile, allocatorsFile)
log.Info().Msgf("finished replaying events from %v to %v, creating evm state gobs", initialNextHeight, evmStateEndHeight)
valuesFile, allocatorsFile := dumpEVMStateToGobFiles(t, store, evmStateGob, evmStateEndHeight)
log.Info().Msgf("evm state gobs created: %v, %v", valuesFile, allocatorsFile)

return checkpointEndHeight
return evmStateEndHeight
}

func checkpointFileNamesByEndHeight(dir string, endHeight uint64) (string, string) {
func evmStateGobFileNamesByEndHeight(dir string, endHeight uint64) (string, string) {
return filepath.Join(dir, fmt.Sprintf("values_%d.gob", endHeight)),
filepath.Join(dir, fmt.Sprintf("allocators_%d.gob", endHeight))
}

func dumpCheckpoint(t *testing.T, store *TestValueStore, dir string, checkpointEndHeight uint64) (string, string) {
valuesFileName, allocatorsFileName := checkpointFileNamesByEndHeight(dir, checkpointEndHeight)
func dumpEVMStateToGobFiles(t *testing.T, store *TestValueStore, dir string, evmStateEndHeight uint64) (string, string) {
valuesFileName, allocatorsFileName := evmStateGobFileNamesByEndHeight(dir, evmStateEndHeight)
values, allocators := store.Dump()

require.NoError(t, serialize(valuesFileName, values))
require.NoError(t, serializeAllocator(allocatorsFileName, allocators))
return valuesFileName, allocatorsFileName
}

const resume_height = 6559268

func decodeFullKey(encoded string) ([]byte, []byte, error) {
// Split the encoded string at the first occurrence of "~"
parts := strings.SplitN(encoded, "~", 2)
if len(parts) != 2 {
return nil, nil, fmt.Errorf("invalid encoded key: no delimiter found")
}

// Convert the split parts back to byte slices
owner := []byte(parts[0])
key := []byte(parts[1])
return owner, key, nil
}

type Subscription[T any] struct {
ch chan T
err error
}

func NewSubscription[T any]() *Subscription[T] {
return &Subscription[T]{
ch: make(chan T),
}
}

func (s *Subscription[T]) Channel() <-chan T {
return s.ch
}

func (s *Subscription[T]) Err() error {
return s.err
}

// scanEventFilesAndRun
func scanEventFilesAndRun(
t *testing.T,
Expand Down
Loading

0 comments on commit 81578f4

Please sign in to comment.