diff --git a/cmd/util/cmd/root.go b/cmd/util/cmd/root.go index 22fd4b86ae4..30d8d70e9ad 100644 --- a/cmd/util/cmd/root.go +++ b/cmd/util/cmd/root.go @@ -38,6 +38,7 @@ import ( "github.com/onflow/flow-go/cmd/util/cmd/snapshot" system_addresses "github.com/onflow/flow-go/cmd/util/cmd/system-addresses" truncate_database "github.com/onflow/flow-go/cmd/util/cmd/truncate-database" + verify_evm_offchain_replay "github.com/onflow/flow-go/cmd/util/cmd/verify-evm-offchain-replay" "github.com/onflow/flow-go/cmd/util/cmd/version" "github.com/onflow/flow-go/module/profiler" ) @@ -120,6 +121,8 @@ func addCommands() { rootCmd.AddCommand(system_addresses.Cmd) rootCmd.AddCommand(check_storage.Cmd) rootCmd.AddCommand(generate_authorization_fixes.Cmd) + rootCmd.AddCommand(evm_state_exporter.Cmd) + rootCmd.AddCommand(verify_evm_offchain_replay.Cmd) } func initConfig() { diff --git a/cmd/util/cmd/verify-evm-offchain-replay/main.go b/cmd/util/cmd/verify-evm-offchain-replay/main.go new file mode 100644 index 00000000000..cc1c45c7500 --- /dev/null +++ b/cmd/util/cmd/verify-evm-offchain-replay/main.go @@ -0,0 +1,79 @@ +package verify + +import ( + "fmt" + "strconv" + "strings" + + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/onflow/flow-go/model/flow" +) + +var ( + flagLastK uint64 + flagDatadir string + flagExecutionDataDir string + flagChain string + flagFromTo string +) + +// # verify the blocks from height 2000 to 3000 +// ./util verify_execution_result --chain flow-testnet --datadir /var/flow/data/protocol --chunk_data_pack_dir /var/flow/data/chunk_data_pack --from_to 2000-3000 +var Cmd = &cobra.Command{ + Use: "verify_evm_offchain_replay", + Short: "verify evm offchain replay with execution data", + Run: run, +} + +func init() { + Cmd.Flags().StringVar(&flagChain, "chain", "", "Chain name") + _ = Cmd.MarkFlagRequired("chain") + + Cmd.Flags().StringVar(&flagDatadir, "datadir", "/var/flow/data/protocol", + "directory that stores the protocol state") + + Cmd.Flags().StringVar(&flagExecutionDataDir, "execution_data_dir", "/var/flow/data/execution_data", + "directory that stores the execution state") +} + +func run(*cobra.Command, []string) { + _ = flow.ChainID(flagChain).Chain() + + from, to, err := parseFromTo(flagFromTo) + if err != nil { + log.Fatal().Err(err).Msg("could not parse from_to") + } + + log.Info().Msgf("verifying range from %d to %d", from, to) + err = Verify(from, to, flow.Testnet, flagDatadir, flagExecutionDataDir) + if err != nil { + log.Fatal().Err(err).Msg("could not verify last k height") + } + log.Info().Msgf("successfully verified range from %d to %d", from, to) + +} + +func parseFromTo(fromTo string) (from, to uint64, err error) { + parts := strings.Split(fromTo, "-") + if len(parts) != 2 { + return 0, 0, fmt.Errorf("invalid format: expected 'from-to', got '%s'", fromTo) + } + + from, err = strconv.ParseUint(strings.TrimSpace(parts[0]), 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid 'from' value: %w", err) + } + + to, err = strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid 'to' value: %w", err) + } + + if from > to { + return 0, 0, fmt.Errorf("'from' value (%d) must be less than or equal to 'to' value (%d)", from, to) + } + + return from, to, nil +} diff --git a/cmd/util/cmd/verify-evm-offchain-replay/verify.go b/cmd/util/cmd/verify-evm-offchain-replay/verify.go new file mode 100644 index 00000000000..4390c347118 --- /dev/null +++ b/cmd/util/cmd/verify-evm-offchain-replay/verify.go @@ -0,0 +1,69 @@ +package verify + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "github.com/dgraph-io/badger/v2" + badgerds "github.com/ipfs/go-ds-badger2" + + "github.com/onflow/flow-go/cmd/util/cmd/common" + "github.com/onflow/flow-go/fvm/evm/offchain/utils" + "github.com/onflow/flow-go/fvm/evm/testutils" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/blobs" + "github.com/onflow/flow-go/module/executiondatasync/execution_data" + "github.com/onflow/flow-go/storage" +) + +func Verify(from uint64, to uint64, chainID flow.ChainID, dataDir string, executionDataDir string) error { + db, storages, executionDataStore, dsStore, err := initStorages(chainID, dataDir, executionDataDir) + if err != nil { + return fmt.Errorf("could not initialize storages: %w", err) + } + + defer db.Close() + defer dsStore.Close() + + store := testutils.GetSimpleValueStore() + + return utils.OffchainReplayBackwardCompatibilityTest( + chainID, + from, + to, + storages.Headers, + storages.Results, + executionDataStore, + store, + ) +} + +func initStorages(chainID flow.ChainID, dataDir string, executionDataDir string) ( + *badger.DB, + *storage.All, + execution_data.ExecutionDataGetter, + io.Closer, + error, +) { + db := common.InitStorage(dataDir) + + storages := common.InitStorages(db) + + datastoreDir := filepath.Join(executionDataDir, "blobstore") + err := os.MkdirAll(datastoreDir, 0700) + if err != nil { + return nil, nil, nil, nil, err + } + dsOpts := &badgerds.DefaultOptions + ds, err := badgerds.NewDatastore(datastoreDir, dsOpts) + if err != nil { + return nil, nil, nil, nil, err + } + + executionDataBlobstore := blobs.NewBlobstore(ds) + executionDataStore := execution_data.NewExecutionDataStore(executionDataBlobstore, execution_data.DefaultSerializer) + + return db, storages, executionDataStore, ds, nil +} diff --git a/fvm/evm/offchain/utils/collection_test.go b/fvm/evm/offchain/utils/collection_test.go index d4fe05dcf8f..9e9efcc2ea1 100644 --- a/fvm/evm/offchain/utils/collection_test.go +++ b/fvm/evm/offchain/utils/collection_test.go @@ -41,7 +41,7 @@ func TestTestnetBackwardCompatibility(t *testing.T) { // > ~/Downloads/events_devnet51_1.jsonl // ... // - // 2) comment the above t.Skip, and update the events file paths and checkpoint dir + // 2) comment the above t.Skip, and update the events file paths and evmStateGob dir // to run the tests BackwardCompatibleSinceEVMGenesisBlock( t, flow.Testnet, []string{ @@ -78,47 +78,47 @@ func TestTestnetBackwardCompatibility(t *testing.T) { // --start 211176670 --end 211176770 --network testnet --host access-001.devnet51.nodes.onflow.org:9000 // // During the replay process, it will generate `values_.gob` and -// `allocators_.gob` checkpoint files for each height. If these checkpoint files exist, +// `allocators_.gob` checkpoint files for each height. If these checkpoint gob files exist, // the corresponding event JSON files will be skipped to optimize replay. func BackwardCompatibleSinceEVMGenesisBlock( t *testing.T, chainID flow.ChainID, eventsFilePaths []string, // ordered EVM events in JSONL format - checkpointDir string, - checkpointEndHeight uint64, // EVM height of an EVM state that a checkpoint was created for + evmStateGob string, + evmStateEndHeight uint64, // EVM height of an EVM state that a evmStateGob file was created for ) { - // ensure that checkpoints are not more than the event files + // ensure that event files is not an empty array require.True(t, len(eventsFilePaths) > 0) - log.Info().Msgf("replaying EVM events from %v to %v, with checkpoints in %s, and checkpointEndHeight: %v", + log.Info().Msgf("replaying EVM events from %v to %v, with evmStateGob file in %s, and evmStateEndHeight: %v", eventsFilePaths[0], eventsFilePaths[len(eventsFilePaths)-1], - checkpointDir, checkpointEndHeight) + evmStateGob, evmStateEndHeight) - store, checkpointEndHeightOrZero := initStorageWithCheckpoints(t, chainID, checkpointDir, checkpointEndHeight) + store, evmStateEndHeightOrZero := initStorageWithEVMStateGob(t, chainID, evmStateGob, evmStateEndHeight) // the events to replay - nextHeight := checkpointEndHeightOrZero + 1 + nextHeight := evmStateEndHeightOrZero + 1 // replay each event files for _, eventsFilePath := range eventsFilePaths { log.Info().Msgf("replaying events from %v, nextHeight: %v", eventsFilePath, nextHeight) - checkpointEndHeight := replayEvents(t, chainID, store, eventsFilePath, checkpointDir, nextHeight) - nextHeight = checkpointEndHeight + 1 + evmStateEndHeight := replayEvents(t, chainID, store, eventsFilePath, evmStateGob, nextHeight) + nextHeight = evmStateEndHeight + 1 } log.Info(). Msgf("succhessfully replayed all events and state changes are consistent with onchain state change. nextHeight: %v", nextHeight) } -func initStorageWithCheckpoints(t *testing.T, chainID flow.ChainID, checkpointDir string, checkpointEndHeight uint64) ( +func initStorageWithEVMStateGob(t *testing.T, chainID flow.ChainID, evmStateGob string, evmStateEndHeight uint64) ( *TestValueStore, uint64, ) { rootAddr := evm.StorageAccountAddress(chainID) - // if there is no checkpoint, create a empty store and initialize the account status, + // if there is no evmStateGob file, create a empty store and initialize the account status, // return 0 as the genesis height - if checkpointEndHeight == 0 { + if evmStateEndHeight == 0 { store := GetSimpleValueStore() as := environment.NewAccountStatus() require.NoError(t, store.SetValue(rootAddr[:], []byte(flow.AccountStatusKey), as.ToBytes())) @@ -126,19 +126,19 @@ func initStorageWithCheckpoints(t *testing.T, chainID flow.ChainID, checkpointDi return store, 0 } - valueFileName, allocatorFileName := checkpointFileNamesByEndHeight(checkpointDir, checkpointEndHeight) + valueFileName, allocatorFileName := evmStateGobFileNamesByEndHeight(evmStateGob, evmStateEndHeight) values, err := deserialize(valueFileName) require.NoError(t, err) allocators, err := deserializeAllocator(allocatorFileName) require.NoError(t, err) store := GetSimpleValueStorePopulated(values, allocators) - return store, checkpointEndHeight + return store, evmStateEndHeight } func replayEvents( t *testing.T, chainID flow.ChainID, - store *TestValueStore, eventsFilePath string, checkpointDir string, initialNextHeight uint64) uint64 { + store *TestValueStore, eventsFilePath string, evmStateGob string, initialNextHeight uint64) uint64 { rootAddr := evm.StorageAccountAddress(chainID) @@ -185,22 +185,22 @@ func replayEvents( return nil }) - checkpointEndHeight := nextHeight - 1 + evmStateEndHeight := nextHeight - 1 - log.Info().Msgf("finished replaying events from %v to %v, creating checkpoint", initialNextHeight, checkpointEndHeight) - valuesFile, allocatorsFile := dumpCheckpoint(t, store, checkpointDir, checkpointEndHeight) - log.Info().Msgf("checkpoint created: %v, %v", valuesFile, allocatorsFile) + log.Info().Msgf("finished replaying events from %v to %v, creating evm state gobs", initialNextHeight, evmStateEndHeight) + valuesFile, allocatorsFile := dumpEVMStateToGobFiles(t, store, evmStateGob, evmStateEndHeight) + log.Info().Msgf("evm state gobs created: %v, %v", valuesFile, allocatorsFile) - return checkpointEndHeight + return evmStateEndHeight } -func checkpointFileNamesByEndHeight(dir string, endHeight uint64) (string, string) { +func evmStateGobFileNamesByEndHeight(dir string, endHeight uint64) (string, string) { return filepath.Join(dir, fmt.Sprintf("values_%d.gob", endHeight)), filepath.Join(dir, fmt.Sprintf("allocators_%d.gob", endHeight)) } -func dumpCheckpoint(t *testing.T, store *TestValueStore, dir string, checkpointEndHeight uint64) (string, string) { - valuesFileName, allocatorsFileName := checkpointFileNamesByEndHeight(dir, checkpointEndHeight) +func dumpEVMStateToGobFiles(t *testing.T, store *TestValueStore, dir string, evmStateEndHeight uint64) (string, string) { + valuesFileName, allocatorsFileName := evmStateGobFileNamesByEndHeight(dir, evmStateEndHeight) values, allocators := store.Dump() require.NoError(t, serialize(valuesFileName, values)) @@ -208,40 +208,6 @@ func dumpCheckpoint(t *testing.T, store *TestValueStore, dir string, checkpointE return valuesFileName, allocatorsFileName } -const resume_height = 6559268 - -func decodeFullKey(encoded string) ([]byte, []byte, error) { - // Split the encoded string at the first occurrence of "~" - parts := strings.SplitN(encoded, "~", 2) - if len(parts) != 2 { - return nil, nil, fmt.Errorf("invalid encoded key: no delimiter found") - } - - // Convert the split parts back to byte slices - owner := []byte(parts[0]) - key := []byte(parts[1]) - return owner, key, nil -} - -type Subscription[T any] struct { - ch chan T - err error -} - -func NewSubscription[T any]() *Subscription[T] { - return &Subscription[T]{ - ch: make(chan T), - } -} - -func (s *Subscription[T]) Channel() <-chan T { - return s.ch -} - -func (s *Subscription[T]) Err() error { - return s.err -} - // scanEventFilesAndRun func scanEventFilesAndRun( t *testing.T, diff --git a/fvm/evm/offchain/utils/verify.go b/fvm/evm/offchain/utils/verify.go new file mode 100644 index 00000000000..bf3ed506adf --- /dev/null +++ b/fvm/evm/offchain/utils/verify.go @@ -0,0 +1,168 @@ +package utils + +import ( + "context" + "errors" + "strings" + + "github.com/rs/zerolog/log" + + "github.com/onflow/cadence" + "github.com/onflow/cadence/encoding/ccf" + + "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/fvm/evm" + "github.com/onflow/flow-go/fvm/evm/events" + "github.com/onflow/flow-go/fvm/evm/offchain/blocks" + evmStorage "github.com/onflow/flow-go/fvm/evm/offchain/storage" + "github.com/onflow/flow-go/fvm/evm/offchain/sync" + "github.com/onflow/flow-go/fvm/evm/testutils" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/executiondatasync/execution_data" + "github.com/onflow/flow-go/storage" +) + +func OffchainReplayBackwardCompatibilityTest( + chainID flow.ChainID, + flowStartHeight uint64, + flowEndHeight uint64, + headers storage.Headers, + results storage.ExecutionResults, + executionDataStore execution_data.ExecutionDataGetter, + store environment.ValueStore, +) error { + rootAddr := evm.StorageAccountAddress(chainID) + rootAddrStr := string(rootAddr.Bytes()) + + bpStorage := evmStorage.NewEphemeralStorage(store) + bp, err := blocks.NewBasicProvider(chainID, bpStorage, rootAddr) + if err != nil { + return err + } + + for height := flowStartHeight; height <= flowEndHeight; height++ { + blockID, err := headers.BlockIDByHeight(height) + if err != nil { + return err + } + + result, err := results.ByBlockID(blockID) + if err != nil { + return err + } + + executionData, err := executionDataStore.Get(context.Background(), result.ExecutionDataID) + if err != nil { + return err + } + + events := flow.EventsList{} + payloads := []*ledger.Payload{} + + for _, chunkData := range executionData.ChunkExecutionDatas { + events = append(events, chunkData.Events...) + payloads = append(payloads, chunkData.TrieUpdate.Payloads...) + } + + updates := make(map[flow.RegisterID]flow.RegisterValue, len(payloads)) + for i := len(payloads) - 1; i >= 0; i-- { + regID, regVal, err := convert.PayloadToRegister(payloads[i]) + if err != nil { + return err + } + + // skip non-evm-account registers + if regID.Owner != rootAddrStr { + continue + } + + // when iterating backwards, duplicated register updates are stale updates, + // so skipping them + if _, ok := updates[regID]; !ok { + updates[regID] = regVal + } + } + + // parse events + evmBlockEvent, evmTxEvents, err := parseEVMEvents(events) + if err != nil { + return err + } + + err = bp.OnBlockReceived(evmBlockEvent) + if err != nil { + return err + } + + sp := testutils.NewTestStorageProvider(store, evmBlockEvent.Height) + cr := sync.NewReplayer(chainID, rootAddr, sp, bp, log.Logger, nil, true) + res, err := cr.ReplayBlock(evmTxEvents, evmBlockEvent) + if err != nil { + return err + } + + // commit all changes + for k, v := range res.StorageRegisterUpdates() { + err = store.SetValue([]byte(k.Owner), []byte(k.Key), v) + if err != nil { + return err + } + } + + err = bp.OnBlockExecuted(evmBlockEvent.Height, res) + if err != nil { + return err + } + + // verify and commit all block hash list changes + for k, v := range bpStorage.StorageRegisterUpdates() { + // verify the block hash list changes are included in the trie update + + err = store.SetValue([]byte(k.Owner), []byte(k.Key), v) + if err != nil { + return err + } + } + } + + return nil +} + +func parseEVMEvents(evts flow.EventsList) (*events.BlockEventPayload, []events.TransactionEventPayload, error) { + var blockEvent *events.BlockEventPayload + txEvents := make([]events.TransactionEventPayload, 0) + + for _, e := range evts { + evtType := string(e.Type) + if strings.Contains(evtType, "BlockExecuted") { + if blockEvent != nil { + return nil, nil, errors.New("multiple block events in a single block") + } + + ev, err := ccf.Decode(nil, e.Payload) + if err != nil { + return nil, nil, err + } + + blockEventPayload, err := events.DecodeBlockEventPayload(ev.(cadence.Event)) + if err != nil { + return nil, nil, err + } + blockEvent = blockEventPayload + } else if strings.Contains(evtType, "TransactionExecuted") { + ev, err := ccf.Decode(nil, e.Payload) + if err != nil { + return nil, nil, err + } + txEv, err := events.DecodeTransactionEventPayload(ev.(cadence.Event)) + if err != nil { + return nil, nil, err + } + txEvents = append(txEvents, *txEv) + } + } + + return blockEvent, txEvents, nil +}