From 7d7c7b132e061164337298784177809df29fdd62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 26 Aug 2024 12:01:28 -0400 Subject: [PATCH] Add wazero tempdir to cache precompiled modules --- app/tier1.go | 6 ++++++ app/tier2.go | 5 +++++ docs/release-notes/change-log.md | 8 ++++++-- go.mod | 2 +- go.sum | 4 ++-- wasm/wazero/module.go | 24 +++++++++++++++++------- 6 files changed, 37 insertions(+), 12 deletions(-) diff --git a/app/tier1.go b/app/tier1.go index 4b82a728..bcee0e01 100644 --- a/app/tier1.go +++ b/app/tier1.go @@ -10,6 +10,7 @@ import ( "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcconnect" ssconnect "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcconnect" "github.com/streamingfast/substreams/reqctx" + "github.com/streamingfast/substreams/wasm/wazero" "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/blockstream" @@ -53,6 +54,7 @@ type Tier1Config struct { GRPCShutdownGracePeriod time.Duration // The duration we allow for gRPC connections to terminate gracefully prior forcing shutdown ServiceDiscoveryURL *url.URL BlockExecutionTimeout time.Duration + TmpDir string StateStoreURL string StateStoreDefaultTag string @@ -166,6 +168,10 @@ func (a *Tier1App) Run() error { opts = append(opts, service.WithBlockExecutionTimeout(a.config.BlockExecutionTimeout)) } + if a.config.TmpDir != "" { + wazero.SetTempDir(a.config.TmpDir) + } + var wasmModules map[string]string if a.config.WASMExtensions != nil { wasmModules = a.config.WASMExtensions.Params() diff --git a/app/tier2.go b/app/tier2.go index 5bdb81ac..7d52bd75 100644 --- a/app/tier2.go +++ b/app/tier2.go @@ -13,6 +13,7 @@ import ( "github.com/streamingfast/substreams/pipeline" "github.com/streamingfast/substreams/service" "github.com/streamingfast/substreams/wasm" + "github.com/streamingfast/substreams/wasm/wazero" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -26,6 +27,7 @@ type Tier2Config struct { MaximumConcurrentRequests uint64 WASMExtensions wasm.WASMExtensioner BlockExecutionTimeout time.Duration + TmpDir string Tracing bool } @@ -80,6 +82,9 @@ func (a *Tier2App) Run() error { opts = append(opts, service.WithReadinessFunc(a.setReadiness)) + if a.config.TmpDir != "" { + wazero.SetTempDir(a.config.TmpDir) + } if a.config.WASMExtensions != nil { opts = append(opts, service.WithWASMExtensioner(a.config.WASMExtensions)) } diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 6cb483f3..a1378c43 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -11,15 +11,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +### Server * Add `sf.substreams.rpc.v2.EndpointInfo/Info` endpoint (if the infoserver is given as a module, i.e. from firehose-core) +* Add an execution timeout of 3 minutes per block by default (can be overriden in tier1/tier2 Configs) -- this is useful when an external (eth_call) is stuck on a forked block hash. +* Revert 'initialBlocks' changes from v1.9.1 because a 'changing module hash' causes more trouble. +* Wazero: bump v1.8.0 and activate caching of precompiled wasm modules in `/tmp/wazero` to decrease compilation time + +### Client * Add `substreams auth` command, to authenticate via `thegraph.market` and to get a dev API Key. * Rename `--discovery-endpoint` into `codegen-endpoint` in `substreams init` command. * Add `substreams codegen subgraph` command that takes a substreams `module` and an `spkg` and that generates a simple `subgraph` from the `module` output. * On `substreams init` command, if flag `--state-file` is provided, the state file is used by default for project generation. * In `substreams init` command, the state file is named using a `Date format` and not using `Unix` anymore. -* Added an execution timeout of 3 minutes per block by default (can be overriden in tier1/tier2 Configs) -- this is useful when an external (eth_call) is stuck on a forked block hash. * Tools->prometheus: added the possibility to override the start-block on an endpoint -* Revert 'initialBlocks' changes from v1.9.1 because a 'changing module hash' causes more trouble. ## v1.9.3 diff --git a/go.mod b/go.mod index d14bafad..661dfe12 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/streamingfast/substreams-sdk-go v0.0.0-20240110154316-5fb21a7a330b github.com/streamingfast/substreams-sink-sql v1.0.1-0.20231127153906-acf5f3e34330 github.com/test-go/testify v1.1.4 - github.com/tetratelabs/wazero v1.7.1 + github.com/tetratelabs/wazero v1.8.0 github.com/tidwall/pretty v1.2.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 go.opentelemetry.io/otel v1.24.0 diff --git a/go.sum b/go.sum index 6e420adb..23b74c35 100644 --- a/go.sum +++ b/go.sum @@ -585,8 +585,8 @@ github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ7 github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= -github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= -github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= +github.com/tetratelabs/wazero v1.8.0 h1:iEKu0d4c2Pd+QSRieYbnQC9yiFlMS9D+Jr0LsRmcF4g= +github.com/tetratelabs/wazero v1.8.0/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo= github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= diff --git a/wasm/wazero/module.go b/wasm/wazero/module.go index 4277a26d..b74aa47f 100644 --- a/wasm/wazero/module.go +++ b/wasm/wazero/module.go @@ -3,6 +3,8 @@ package wazero import ( "context" "fmt" + "os" + "path" "sync" "github.com/streamingfast/substreams/reqctx" @@ -24,19 +26,27 @@ type Module struct { runtimeExtensions wasm.RuntimeExtensions } +var wazeroTmpDir = path.Join(os.TempDir(), "wazero") // default value can be overridden by setting this variable from the app + +func SetTempDir(dir string) { + wazeroTmpDir = path.Join(dir, "wazero") +} + func init() { wasm.RegisterModuleFactory("wazero", wasm.ModuleFactoryFunc(newModule)) } func newModule(ctx context.Context, wasmCode []byte, wasmCodeType string, registry *wasm.Registry) (wasm.Module, error) { - // What's the effect of `ctx` here? Will it kill all the WASM if it cancels? - // TODO: try with: wazero.NewRuntimeConfigCompiler() - // TODO: try config := wazero.NewRuntimeConfig().WithCompilationCache(cache) - runtimeConfig := wazero.NewRuntimeConfigCompiler() - // TODO: can we use some caching in the RuntimeConfig so perhaps we reuse - // things across runtimes creations? - runtime := wazero.NewRuntimeWithConfig(ctx, runtimeConfig) + // The CacheWithDir offers a way to share the cache between runtimes concurrently + // we can't just share the 'cache': we would get concurrency issues + cache, err := wazero.NewCompilationCacheWithDir(wazeroTmpDir) + if err != nil { + return nil, err + } + + // What's the effect of `ctx` here? Will it kill all the WASM if it cancels? + runtime := wazero.NewRuntimeWithConfig(ctx, wazero.NewRuntimeConfig().WithCompilationCache(cache)) wasmCodeTypeID, runtimeExtensions, err := wasm.ParseWASMCodeType(wasmCodeType) if err != nil {