diff --git a/CHANGELOG.md b/CHANGELOG.md index 2040738..826be35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,14 +8,22 @@ Operators, you should copy/paste content of this content straight to your `fireh If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository. -## Next +## v0.1.3 +This release bumps substreams to v1.1.9 ### Highlights +#### Substreams Scheduler Improvements for Parallel Processing + +The `substreams` scheduler has been improved to reduce the number of required jobs for parallel processing. This affects `backprocessing` (preparing the states of modules up to a "start-block") and `forward processing` (preparing the states and the outputs to speed up streaming in production-mode). + +Jobs on `tier2` workers are now divided in "stages", each stage generating the partial states for all the modules that have the same dependencies. A `substreams` that has a single store won't be affected, but one that has 3 top-level stores, which used to run 3 jobs for every segment now only runs a single job per segment to get all the states ready. + + #### Substreams State Store Selection -The `substreams` server now accepts `X-Sf-Substreams-Cache-Tag` header to select which Substreams state store URL should be used by the request. When performing a Substreams request, the servers will pick the state store based on the header. This enable consumers to stay on the same cache version when the operators needs to bump the data version (reasons for this could be a bug in Substreams software that caused some cached data to be corrupted on invalid). +The `substreams` server now accepts `X-Sf-Substreams-Cache-Tag` header to select which Substreams state store URL should be used by the request. When performing a Substreams request, the servers will optionally pick the state store based on the header. This enable consumers to stay on the same cache version when the operators needs to bump the data version (reasons for this could be a bug in Substreams software that caused some cached data to be corrupted on invalid). To benefit from this, operators that have a version currently in their state store URL should move the version part from `--substreams-state-store-url` to the new flag `--substreams-state-store-default-tag`. For example if today you have in your config: @@ -36,17 +44,11 @@ start: substreams-state-store-default-tag: v3 ``` -#### Substreams Scheduler Improvements for Parallel Processing - -The `substreams` scheduler has been improved to reduce the number of required jobs for parallel processing. This affects `backprocessing` (preparing the states of modules up to a "start-block") and `forward processing` (preparing the states and the outputs to speed up streaming in production-mode). - -Jobs on `tier2` workers are now divided in "stages", each stage generating the partial states for all the modules that have the same dependencies. A `substreams` that has a single store won't be affected, but one that has 3 top-level stores, which used to run 3 jobs for every segment now only runs a single job per segment to get all the states ready. - ### Operators Upgrade The app `substreams-tier1` and `substreams-tier2` should be upgraded concurrently. Some calls will fail while versions are misaligned. -### CLI Changes +### Backend Changes * Authentication plugin `trust` can now specify an exclusive list of `allowed` headers (all lowercase), ex: `trust://?allowed=x-sf-user-id,x-sf-api-key-id,x-real-ip,x-sf-substreams-cache-tag` diff --git a/go.mod b/go.mod index ad725fc..8b2694e 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/streamingfast/pbgo v0.0.6-0.20221020131607-255008258d28 github.com/streamingfast/relayer v0.0.2-0.20220909122435-e67fbc964fd9 github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 - github.com/streamingfast/substreams v1.1.9-0.20230720151436-6d47b7b88fc4 + github.com/streamingfast/substreams v1.1.9 github.com/stretchr/testify v1.8.3 go.uber.org/multierr v1.9.0 go.uber.org/zap v1.24.0 diff --git a/go.sum b/go.sum index 13dae36..cdd2613 100644 --- a/go.sum +++ b/go.sum @@ -616,8 +616,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.1.9-0.20230720151436-6d47b7b88fc4 h1:rywcKNxH1bR79/uYnhGq2kvodCsZMU1aTPC6oz+A+40= -github.com/streamingfast/substreams v1.1.9-0.20230720151436-6d47b7b88fc4/go.mod h1:U/wDfXapixXmpnBwzQRMGBXhXJGaLZe6XbFhyh5dF18= +github.com/streamingfast/substreams v1.1.9 h1:477zJWpvADeZL8s9gUuB830Di3mzx8b24AjN+o8Nrpk= +github.com/streamingfast/substreams v1.1.9/go.mod h1:U/wDfXapixXmpnBwzQRMGBXhXJGaLZe6XbFhyh5dF18= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/substreams_common.go b/substreams_common.go index a032ce0..17f6e67 100644 --- a/substreams_common.go +++ b/substreams_common.go @@ -16,6 +16,7 @@ func registerCommonSubstreamsFlags(cmd *cobra.Command) { registerSSOnce.Do(func() { cmd.Flags().Uint64("substreams-state-bundle-size", uint64(1_000), "Interval in blocks at which to save store snapshots and output caches") cmd.Flags().String("substreams-state-store-url", "{sf-data-dir}/localdata", "where substreams state data are stored") + cmd.Flags().String("substreams-state-store-default-tag", "", "If non-empty, will be appended to {substreams-state-store-url} (ex: 'v1'). Can be overriden per-request with 'X-Sf-Substreams-Cache-Tag' header") cmd.Flags().StringArray("substreams-rpc-endpoints", nil, "Remote endpoints to contact to satisfy Substreams 'eth_call's") cmd.Flags().String("substreams-rpc-cache-store-url", "{sf-data-dir}/rpc-cache", "where rpc cache will be store call responses") cmd.Flags().Uint64("substreams-rpc-cache-chunk-size", uint64(1_000), "RPC cache chunk size in block") diff --git a/substreams_tier1.go b/substreams_tier1.go index d123d51..a2b9dbb 100644 --- a/substreams_tier1.go +++ b/substreams_tier1.go @@ -75,6 +75,8 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) { grpcListenAddr := viper.GetString("substreams-tier1-grpc-listen-addr") stateStoreURL := MustReplaceDataDir(sfDataDir, viper.GetString("substreams-state-store-url")) + stateStoreDefaultTag := viper.GetString("substreams-state-store-default-tag") + stateBundleSize := viper.GetUint64("substreams-state-bundle-size") subrequestsEndpoint := viper.GetString("substreams-tier1-subrequests-endpoint") @@ -112,6 +114,7 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) { BlockStreamAddr: blockstreamAddr, StateStoreURL: stateStoreURL, + StateStoreDefaultTag: stateStoreDefaultTag, StateBundleSize: stateBundleSize, BlockType: getSubstreamsBlockMessageType(chain), MaxSubrequests: maxSubrequests, @@ -129,7 +132,7 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) { GRPCListenAddr: grpcListenAddr, GRPCShutdownGracePeriod: time.Second, ServiceDiscoveryURL: serviceDiscoveryURL, - }, &app.Modules{ + }, &app.Tier1Modules{ Authenticator: authenticator, HeadTimeDriftMetric: ss1HeadTimeDriftmetric, HeadBlockNumberMetric: ss1HeadBlockNumMetric, diff --git a/substreams_tier2.go b/substreams_tier2.go index b96223a..8013cb3 100644 --- a/substreams_tier2.go +++ b/substreams_tier2.go @@ -21,7 +21,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - "github.com/streamingfast/dauth" discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service" "github.com/streamingfast/dlauncher/launcher" "github.com/streamingfast/logging" @@ -49,11 +48,6 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) { }, FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) { - authenticator, err := dauth.New(viper.GetString("common-auth-plugin")) - if err != nil { - return nil, fmt.Errorf("unable to initialize dauth: %w", err) - } - mergedBlocksStoreURL, _, _, err := GetCommonStoresURLs(runtime.AbsDataDir) if err != nil { return nil, err @@ -66,6 +60,8 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) { substreamsRequestsStats := viper.GetBool("substreams-tier2-request-stats") stateStoreURL := MustReplaceDataDir(sfDataDir, viper.GetString("substreams-state-store-url")) + stateStoreDefaultTag := viper.GetString("substreams-state-store-default-tag") + stateBundleSize := viper.GetUint64("substreams-state-bundle-size") tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec" @@ -91,9 +87,10 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) { &app.Tier2Config{ MergedBlocksStoreURL: mergedBlocksStoreURL, - StateStoreURL: stateStoreURL, - StateBundleSize: stateBundleSize, - BlockType: getSubstreamsBlockMessageType(chain), + StateStoreURL: stateStoreURL, + StateStoreDefaultTag: stateStoreDefaultTag, + StateBundleSize: stateBundleSize, + BlockType: getSubstreamsBlockMessageType(chain), WASMExtensions: wasmExtensions, PipelineOptions: pipelineOptioner, @@ -103,10 +100,6 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) { GRPCListenAddr: grpcListenAddr, ServiceDiscoveryURL: serviceDiscoveryURL, - }, &app.Modules{ - Authenticator: authenticator, - HeadTimeDriftMetric: ss2HeadTimeDriftmetric, - HeadBlockNumberMetric: ss2HeadBlockNumMetric, }), nil }, })