Skip to content

Commit

Permalink
Merge branch 'master' into gregor/script-execution/get-block-2
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue authored Oct 30, 2023
2 parents 66a0d02 + 735ee16 commit dc17022
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 23 deletions.
8 changes: 3 additions & 5 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"os"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -65,14 +64,13 @@ type ExecutionConfig struct {
}

func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
homedir, _ := os.UserHomeDir()
datadir := filepath.Join(homedir, ".flow", "execution")
datadir := "/data"

flags.StringVarP(&exeConf.rpcConf.ListenAddr, "rpc-addr", "i", "localhost:9000", "the address the gRPC server listens on")
flags.UintVar(&exeConf.rpcConf.MaxMsgSize, "rpc-max-message-size", grpcutils.DefaultMaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
flags.BoolVar(&exeConf.rpcConf.RpcMetricsEnabled, "rpc-metrics-enabled", false, "whether to enable the rpc metrics")
flags.StringVar(&exeConf.triedir, "triedir", datadir, "directory to store the execution State")
flags.StringVar(&exeConf.executionDataDir, "execution-data-dir", filepath.Join(homedir, ".flow", "execution_data"), "directory to use for storing Execution Data")
flags.StringVar(&exeConf.executionDataDir, "execution-data-dir", filepath.Join(datadir, "execution_data"), "directory to use for storing Execution Data")
flags.Uint32Var(&exeConf.mTrieCacheSize, "mtrie-cache-size", 500, "cache size for MTrie")
flags.UintVar(&exeConf.checkpointDistance, "checkpoint-distance", 20, "number of WAL segments between checkpoints")
flags.UintVar(&exeConf.checkpointsToKeep, "checkpoints-to-keep", 5, "number of recent checkpoints to keep (0 to keep all)")
Expand All @@ -81,7 +79,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.BoolVar(&exeConf.computationConfig.ExtensiveTracing, "extensive-tracing", false, "adds high-overhead tracing to execution")
flags.BoolVar(&exeConf.computationConfig.CadenceTracing, "cadence-tracing", false, "enables cadence runtime level tracing")
flags.IntVar(&exeConf.computationConfig.MaxConcurrency, "computer-max-concurrency", 1, "set to greater than 1 to enable concurrent transaction execution")
flags.StringVar(&exeConf.chunkDataPackDir, "chunk-data-pack-dir", filepath.Join(homedir, ".flow", "chunk_data_packs"), "directory to use for storing chunk data packs")
flags.StringVar(&exeConf.chunkDataPackDir, "chunk-data-pack-dir", filepath.Join(datadir, "chunk_data_packs"), "directory to use for storing chunk data packs")
flags.UintVar(&exeConf.chunkDataPackCacheSize, "chdp-cache", storage.DefaultCacheSize, "cache size for chunk data packs")
flags.Uint32Var(&exeConf.chunkDataPackRequestsCacheSize, "chdp-request-queue", mempool.DefaultChunkDataPackRequestQueueSize, "queue size for chunk data pack requests")
flags.DurationVar(&exeConf.requestInterval, "request-interval", 60*time.Second, "the interval between requests for the requester engine")
Expand Down
5 changes: 1 addition & 4 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package cmd

import (
"context"
"os"
"path/filepath"
"time"

"github.com/dgraph-io/badger/v2"
Expand Down Expand Up @@ -243,8 +241,7 @@ type StateExcerptAtBoot struct {
}

func DefaultBaseConfig() *BaseConfig {
homedir, _ := os.UserHomeDir()
datadir := filepath.Join(homedir, ".flow", "database")
datadir := "/data/protocol"

// NOTE: if the codec used in the network component is ever changed any code relying on
// the message format specific to the codec must be updated. i.e: the AuthorizedSenderValidator.
Expand Down
9 changes: 5 additions & 4 deletions cmd/util/cmd/truncate-database/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func init() {

Cmd.Flags().StringVar(&flagChunkDataPackDir, "chunk-data-pack-dir", "",
"directory that stores the chunk data pack")
_ = Cmd.MarkFlagRequired("chunk-data-pack-dir")
}

func run(*cobra.Command, []string) {
Expand All @@ -39,8 +38,10 @@ func run(*cobra.Command, []string) {

log.Info().Msg("ProtocolDB Truncated")

chunkdb := common.InitStorageWithTruncate(flagChunkDataPackDir, true)
defer chunkdb.Close()
if flagChunkDataPackDir != "" {
chunkdb := common.InitStorageWithTruncate(flagChunkDataPackDir, true)
defer chunkdb.Close()

log.Info().Msg("Truncated")
log.Info().Msg("Chunk Data Pack database Truncated")
}
}
79 changes: 69 additions & 10 deletions integration/tests/access/access_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"testing"
"time"

"google.golang.org/grpc/codes"

"google.golang.org/grpc/status"

"github.com/rs/zerolog"
"github.com/stretchr/testify/suite"

Expand Down Expand Up @@ -174,21 +178,27 @@ func (s *AccessAPISuite) testGetAccount(client *client.Client) {
serviceAddress := s.serviceClient.SDKServiceAddress()

s.Run("get account at latest block", func() {
account, err := client.GetAccount(s.ctx, serviceAddress)
account, err := s.waitAccountsUntilIndexed(func() (*sdk.Account, error) {
return client.GetAccount(s.ctx, serviceAddress)
})
s.Require().NoError(err)
s.Assert().Equal(serviceAddress, account.Address)
s.Assert().NotZero(serviceAddress, account.Balance)
})

s.Run("get account block ID", func() {
account, err := client.GetAccountAtLatestBlock(s.ctx, serviceAddress)
account, err := s.waitAccountsUntilIndexed(func() (*sdk.Account, error) {
return client.GetAccountAtLatestBlock(s.ctx, serviceAddress)
})
s.Require().NoError(err)
s.Assert().Equal(serviceAddress, account.Address)
s.Assert().NotZero(serviceAddress, account.Balance)
})

s.Run("get account block height", func() {
account, err := client.GetAccountAtBlockHeight(s.ctx, serviceAddress, header.Height)
account, err := s.waitAccountsUntilIndexed(func() (*sdk.Account, error) {
return client.GetAccountAtBlockHeight(s.ctx, serviceAddress, header.Height)
})
s.Require().NoError(err)
s.Assert().Equal(serviceAddress, account.Address)
s.Assert().NotZero(serviceAddress, account.Balance)
Expand All @@ -200,19 +210,25 @@ func (s *AccessAPISuite) testExecuteScriptWithSimpleScript(client *client.Client
s.Require().NoError(err)

s.Run("execute at latest block", func() {
result, err := client.ExecuteScriptAtLatestBlock(s.ctx, []byte(simpleScript), nil)
result, err := s.waitScriptExecutionUntilIndexed(func() (cadence.Value, error) {
return client.ExecuteScriptAtLatestBlock(s.ctx, []byte(simpleScript), nil)
})
s.Require().NoError(err)
s.Assert().Equal(simpleScriptResult, result)
})

s.Run("execute at block height", func() {
result, err := client.ExecuteScriptAtBlockHeight(s.ctx, header.Height, []byte(simpleScript), nil)
result, err := s.waitScriptExecutionUntilIndexed(func() (cadence.Value, error) {
return client.ExecuteScriptAtBlockHeight(s.ctx, header.Height, []byte(simpleScript), nil)
})
s.Require().NoError(err)
s.Assert().Equal(simpleScriptResult, result)
})

s.Run("execute at block ID", func() {
result, err := client.ExecuteScriptAtBlockID(s.ctx, header.ID, []byte(simpleScript), nil)
result, err := s.waitScriptExecutionUntilIndexed(func() (cadence.Value, error) {
return client.ExecuteScriptAtBlockID(s.ctx, header.ID, []byte(simpleScript), nil)
})
s.Require().NoError(err)
s.Assert().Equal(simpleScriptResult, result)
})
Expand All @@ -227,20 +243,25 @@ func (s *AccessAPISuite) testExecuteScriptWithSimpleContract(client *client.Clie
script := lib.ReadCounterScript(serviceAccount.Address, serviceAccount.Address).ToCadence()

s.Run("execute at latest block", func() {
result, err := client.ExecuteScriptAtLatestBlock(s.ctx, []byte(script), nil)
result, err := s.waitScriptExecutionUntilIndexed(func() (cadence.Value, error) {
return client.ExecuteScriptAtLatestBlock(s.ctx, []byte(script), nil)
})
s.Require().NoError(err)
s.Assert().Equal(lib.CounterInitializedValue, result.(cadence.Int).Int())
})

s.Run("execute at block height", func() {
result, err := client.ExecuteScriptAtBlockHeight(s.ctx, header.Height, []byte(script), nil)
result, err := s.waitScriptExecutionUntilIndexed(func() (cadence.Value, error) {
return client.ExecuteScriptAtBlockHeight(s.ctx, header.Height, []byte(script), nil)
})
s.Require().NoError(err)

s.Assert().Equal(lib.CounterInitializedValue, result.(cadence.Int).Int())
})

s.Run("execute at block ID", func() {
result, err := client.ExecuteScriptAtBlockID(s.ctx, header.ID, []byte(script), nil)
result, err := s.waitScriptExecutionUntilIndexed(func() (cadence.Value, error) {
return client.ExecuteScriptAtBlockID(s.ctx, header.ID, []byte(script), nil)
})
s.Require().NoError(err)
s.Assert().Equal(lib.CounterInitializedValue, result.(cadence.Int).Int())
})
Expand Down Expand Up @@ -288,6 +309,35 @@ func (s *AccessAPISuite) deployContract() *sdk.TransactionResult {
return result
}

type getAccount func() (*sdk.Account, error)
type executeScript func() (cadence.Value, error)

var indexDelay = 10 * time.Second
var indexRetry = 100 * time.Millisecond

// wait for sealed block to get indexed, as there is a delay in syncing blocks between nodes
func (s *AccessAPISuite) waitAccountsUntilIndexed(get getAccount) (*sdk.Account, error) {
var account *sdk.Account
var err error
s.Require().Eventually(func() bool {
account, err = get()
return notOutOfRangeError(err)
}, indexDelay, indexRetry)

return account, err
}

func (s *AccessAPISuite) waitScriptExecutionUntilIndexed(execute executeScript) (cadence.Value, error) {
var val cadence.Value
var err error
s.Require().Eventually(func() bool {
val, err = execute()
return notOutOfRangeError(err)
}, indexDelay, indexRetry)

return val, err
}

func (s *AccessAPISuite) waitUntilIndexed(height uint64) {
// wait until the block is indexed
// This relying on the fact that the API is configured to only use the local db, and will return
Expand All @@ -300,3 +350,12 @@ func (s *AccessAPISuite) waitUntilIndexed(height uint64) {
return err == nil
}, 30*time.Second, 1*time.Second)
}

// make sure we either don't have an error or the error is not out of range error, since in that case we have to wait a bit longer for index to get synced
func notOutOfRangeError(err error) bool {
statusErr, ok := status.FromError(err)
if !ok || err == nil {
return true
}
return statusErr.Code() != codes.OutOfRange
}

0 comments on commit dc17022

Please sign in to comment.