Skip to content

Commit

Permalink
Feature/index executor (#453)
Browse files Browse the repository at this point in the history
* First commit

* prevent panic on loadImports when imported package isn't reachable

* Sink Examples Update (#439)

* Minor docs improvements (#440)

* enable reflection api

* fix buf generation

* partial (unsquashed) stores no longer contain traceID in filename, so they can be reused, squasher will handle this gracefully (#441)

the execouts for stores are now only written on a PartialKV, containing
a list of operations to re-apply. They are only output as deltas when
applied to a fullKV.

* First working version of index query language (still in package sqe for now)

* fix dstore metering when reading output cache on tier2

* adjust scheduler to reduce reprocessing of same modules with new cached outputs

* fix tier1 storage read-bytes metering, fix 'gui' header

* Fix conflict

* fix byte copy in store outputs saving

* fix work plan: with new model, don't run all stages together on first segment

* fix encoding/decoding of bigint/bigdecimal

* fix corruption in new cached store implementation: keep deltas for proper ordinal lookup

* Conversion testing (#442)

* Conversion testing
* fix SET_MIN_INT64 not using new conversion helper

* touch changelog a bit

* bump changelog to v1.4.0

* fix broken scheduler test with new behavior

* when scanning store files, find old partial files with a traceID and delete them on sight (they are leftovers, possibly slowing down next runs)

* improve log message when retrying tier2 request

* Add sinks table (#443)

* Handling timeout when getting block type from stream factory

* Update decode tool

* revent creating wrong store outputs on previous stages

* First commit

* Fixing tests

* ensure the 'decode' tool can skip package validation by default

* Handle BlockIndex type in computeStages

* WIP

* Add TODO

* Fix compilation error

* add blockfilter info on module in 'substreams info' command

* add tool to decode index files

* fix typo in decode tools

* fix index handling in outputgraph

* enable skipping some module executions when blockfilter exists (does not work when blockfilter is not already existing)

* Add blockKeys parser

* Add NewFromIndexKeys

* prevent blockfilter fail if index don't exist, prepare skipping the whole run, clean up

* add "on-the-fly" block filtering from index module's outputs

* Disable not operation in query

* remove println

* support modules with non-0 initial blocks with indexes

* remove checks of outputs on 'use' modules -> they're not accepted anyway

* Fix bitmap_test

* cleanup: most go linter warnings and remove dead code

* go mod tidy

* Handle index in integration test and improve test

* Remove not operation tests and fix tier1 sending outpout to client

* wip, take it away Arnaud

* WIP

* wip comments

* WIP move stuff around, prepare refactor

* some more comments

* linear handoff calculation updated according to new spec

* fix build and a few tests

* skip indice module if it exists already, cleanup old stuff

* WIP

* Removing evaluateModuleRequireToRun

* fix different modules outputs

* fix read End Block on unbounded queries

* fix request plan when storeInitialBlock > linearHandoffBlock, also fix integration tests stalling

* fix some integration tests, remove useless ones

* Refactor test and add lowestInitBlock func in graph

* prevent executing modules before their initialBlock

* allow some nil inputs when running a module

* Add complex substreams

* fix complex_substreams

* safer skipping of inputs that have initialBlock in the future

* simplify blockindex lookups

* validate that each module has at least one valid input on its initialBlock

* Add complex_substreams testing

* Fix test

* Rename and refactor testing

* Fix test

* fix another test path

* Add Deltas into testing

* Renaming tests

* fix stages.getState on store modules with initialBlock in the future, fixing tests

* add a few test cases to complex integration substreams

* ensure tier2 "tries" to save fullKV if it has them

* WIP

* Fix test

* Fix test again

* fix test case stores_with_different_initial_blocks_on_the_same_stage

* tweak 'different-initial-blocks-on-same-stage' test to be between boundaries. It now fails on a case that should work

* Modify init block to 52

* Fix previous changes

* Fix again test

* Removing error when setting initial block to block index

* Fix test

* WIP

* Add testing and fix others tests

* Fix all the tests

* Change scheduler logic to reduce reexecution in the first segments by running later stages until the other jobs can complete

* fix new scheduler pattern in some cases

* refactor checkValidBlockFilter

* fix reader and run module func

* Fix tests

* Change loading file logic

* Fix indexes

* tier2: skip full block range when excluded by index

* improve tier1 max output speed by preloading next file(s) in parallel

* disable otelcol:// tracing which affected performance. Add 'clock' output to `run`

* Apply index skipping for output

* WIP

* add support for blockFilterQuery from params

* Change index logic

* WIP

* WIP

* Remove clocks

* Remove unecessary changes

* Handle prefix for filtered imported modules

* fix misaligned readexec on tier1 reader

* make FileWalker on tier1 poll more aggressively on local filesystem

* fix varying hashmodule when blockFilter is imported with another name

* do not "meter" the cache store

* allow index to be queried directly and terminate when index is created

* fix total read bytes in case data already cache

* prevent panic on empty match in index bitmap

* fix panic on StreamTerminate for tier2

* allow skipping existing output ranges when running an index directly

* fix writer test

* remove debug println

* bump changelog for v1.6.0

---------

Co-authored-by: Stéphane Duchesneau <[email protected]>
Co-authored-by: Enol <[email protected]>
Co-authored-by: YaroShkvorets <[email protected]>
Co-authored-by: billettc <[email protected]>
Co-authored-by: Stéphane Duchesneau <[email protected]>
Co-authored-by: Matthieu Vachon <[email protected]>
Co-authored-by: colindickson <[email protected]>
Co-authored-by: Alexandre Bourget <[email protected]>
  • Loading branch information
9 people authored May 9, 2024
1 parent ceb2aa6 commit cc24a20
Show file tree
Hide file tree
Showing 192 changed files with 5,022 additions and 2,270 deletions.
7 changes: 0 additions & 7 deletions bigdecimal/init_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion client/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import (
"github.com/streamingfast/logging"
)

var zlog, tracer = logging.PackageLogger("substreams-clients", "github.com/streamingfast/substreams/client")
var zlog, _ = logging.PackageLogger("substreams-clients", "github.com/streamingfast/substreams/client")
117 changes: 0 additions & 117 deletions cmd/substreams/flags.go

This file was deleted.

26 changes: 16 additions & 10 deletions cmd/substreams/gui.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,18 @@ func runGui(cmd *cobra.Command, args []string) error {
}
}

productionMode := mustGetBool(cmd, "production-mode")
debugModulesOutput := mustGetStringSlice(cmd, "debug-modules-output")
productionMode := sflags.MustGetBool(cmd, "production-mode")
debugModulesOutput := sflags.MustGetStringSlice(cmd, "debug-modules-output")
if len(debugModulesOutput) == 0 {
debugModulesOutput = nil
}
if debugModulesOutput != nil && productionMode {
return fmt.Errorf("cannot set 'debug-modules-output' in 'production-mode'")
}
debugModulesInitialSnapshot := mustGetStringSlice(cmd, "debug-modules-initial-snapshot")
debugModulesInitialSnapshot := sflags.MustGetStringSlice(cmd, "debug-modules-initial-snapshot")
if len(debugModulesInitialSnapshot) == 0 {
debugModulesInitialSnapshot = nil
}

outputModule := args[0]
network := sflags.MustGetString(cmd, "network")
Expand Down Expand Up @@ -108,7 +114,7 @@ func runGui(cmd *cobra.Command, args []string) error {
return fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

endpoint, err := manifest.ExtractNetworkEndpoint(pkg.Network, mustGetString(cmd, "substreams-endpoint"), zlog)
endpoint, err := manifest.ExtractNetworkEndpoint(pkg.Network, sflags.MustGetString(cmd, "substreams-endpoint"), zlog)
if err != nil {
return fmt.Errorf("extracting endpoint: %w", err)
}
Expand All @@ -118,8 +124,8 @@ func runGui(cmd *cobra.Command, args []string) error {
endpoint,
authToken,
authType,
mustGetBool(cmd, "insecure"),
mustGetBool(cmd, "plaintext"),
sflags.MustGetBool(cmd, "insecure"),
sflags.MustGetBool(cmd, "plaintext"),
)

homeDir, err := os.UserHomeDir()
Expand All @@ -134,7 +140,7 @@ func runGui(cmd *cobra.Command, args []string) error {
homeDir = filepath.Join(homeDir, ".config", "substreams")
}

cursor := mustGetString(cmd, "cursor")
cursor := sflags.MustGetString(cmd, "cursor")

fmt.Println("Launching Substreams GUI...")

Expand Down Expand Up @@ -171,12 +177,12 @@ func runGui(cmd *cobra.Command, args []string) error {
OutputModule: outputModule,
SubstreamsClientConfig: substreamsClientConfig,
HomeDir: homeDir,
Vcr: mustGetBool(cmd, "replay"),
Headers: parseHeaders(mustGetStringSlice(cmd, "header")),
Vcr: sflags.MustGetBool(cmd, "replay"),
Headers: parseHeaders(sflags.MustGetStringSlice(cmd, "header")),
Cursor: cursor,
StartBlock: startBlock,
StopBlock: stopBlock,
FinalBlocksOnly: mustGetBool(cmd, "final-blocks-only"),
FinalBlocksOnly: sflags.MustGetBool(cmd, "final-blocks-only"),
Params: params,
ReaderOptions: readerOptions,
}
Expand Down
13 changes: 8 additions & 5 deletions cmd/substreams/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func runInfo(cmd *cobra.Command, args []string) error {
outputModule = args[1]
}

outputSinkconfigFilesPath := mustGetString(cmd, "output-sinkconfig-files-path")
outputSinkconfigFilesPath := sflags.MustGetString(cmd, "output-sinkconfig-files-path")

info, err := info.Extended(manifestPath, outputModule, sflags.MustGetBool(cmd, "skip-package-validation"))
if err != nil {
return err
}

if mustGetBool(cmd, "json") {
if sflags.MustGetBool(cmd, "json") {
res, err := json.MarshalIndent(info, "", " ")
if err != nil {
return err
Expand All @@ -83,8 +83,13 @@ func runInfo(cmd *cobra.Command, args []string) error {
for _, input := range mod.Inputs {
fmt.Printf("Input: %s: %s\n", input.Type, input.Name)
}
if mod.BlockFilter != nil {
fmt.Printf("Block Filter: (using *%s*): `%s`\n", mod.BlockFilter.Module, mod.BlockFilter.Query)
}

switch mod.Kind {
case "index":
fmt.Println("Output Type:", *mod.OutputType)
case "map":
fmt.Println("Output Type:", *mod.OutputType)
case "store":
Expand Down Expand Up @@ -132,9 +137,7 @@ func runInfo(cmd *cobra.Command, args []string) error {
var layerDefs []string
for _, l := range layers {
var mods []string
for _, m := range l {
mods = append(mods, m)
}
mods = append(mods, l...)
layerDefs = append(layerDefs, fmt.Sprintf(`["%s"]`, strings.Join(mods, `","`)))
}
fmt.Printf("Stage %d: [%s]\n", i, strings.Join(layerDefs, `,`))
Expand Down
7 changes: 5 additions & 2 deletions cmd/substreams/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,14 @@ func prompt(label string, opts *promptOptions) (string, error) {
templates.Valid = `{{ "?" | blue}} {{ . | bold }} {{ "[y/N]" | faint}} `
templates.Invalid = templates.Valid
}

def := ""
if opts != nil {
def = opts.Default
}
prompt := promptui.Prompt{
Label: label,
Templates: templates,
Default: opts.Default,
Default: def,
}
if opts != nil && opts.Validate != nil {
prompt.Validate = opts.Validate
Expand Down
4 changes: 3 additions & 1 deletion cmd/substreams/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/spf13/cobra"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/substreams/manifest"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -59,7 +60,8 @@ func runPack(cmd *cobra.Command, args []string) error {
return fmt.Errorf("reading manifest %q: %w", manifestPath, err)
}

originalOutputFile := maybeGetString(cmd, "output-file")
originalOutputFile, _ := sflags.GetString(cmd, "output-file")

resolvedOutputFile := resolveOutputFile(originalOutputFile, map[string]string{
"manifestDir": filepath.Dir(manifestPath),
"spkgDefaultName": fmt.Sprintf("%s-%s.spkg", strings.Replace(pkg.PackageMeta[0].Name, "_", "-", -1), pkg.PackageMeta[0].Version),
Expand Down
9 changes: 5 additions & 4 deletions cmd/substreams/protogen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/spf13/cobra"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/substreams/codegen"
"github.com/streamingfast/substreams/manifest"
"go.uber.org/zap"
Expand Down Expand Up @@ -43,10 +44,10 @@ func init() {
}

func runProtogen(cmd *cobra.Command, args []string) error {
outputPath := mustGetString(cmd, "output-path")
excludePaths := mustGetStringArray(cmd, "exclude-paths")
generateMod := mustGetBool(cmd, "generate-mod-rs")
showGeneratedBufGen := mustGetBool(cmd, "show-generated-buf-gen")
outputPath := sflags.MustGetString(cmd, "output-path")
excludePaths := sflags.MustGetStringArray(cmd, "exclude-paths")
generateMod := sflags.MustGetBool(cmd, "generate-mod-rs")
showGeneratedBufGen := sflags.MustGetBool(cmd, "show-generated-buf-gen")

manifestPath := ""
if len(args) == 1 {
Expand Down
13 changes: 7 additions & 6 deletions cmd/substreams/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
grpcreflect "connectrpc.com/grpcreflect"
"github.com/rs/cors"
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/substreams/client"
"github.com/streamingfast/substreams/manifest"
pbrpcsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
Expand Down Expand Up @@ -113,22 +114,22 @@ func init() {
}

func runProxy(cmd *cobra.Command, args []string) error {
addr := mustGetString(cmd, "listen-addr")
addr := sflags.MustGetString(cmd, "listen-addr")
fmt.Println("listening on", addr)

authToken, authType := tools.GetAuth(cmd, "substreams-api-key-envvar", "substreams-api-token-envvar")
substreamsClientConfig := client.NewSubstreamsClientConfig(
mustGetString(cmd, "substreams-endpoint"),
sflags.MustGetString(cmd, "substreams-endpoint"),
authToken,
authType,
mustGetBool(cmd, "insecure"),
mustGetBool(cmd, "plaintext"),
sflags.MustGetBool(cmd, "insecure"),
sflags.MustGetBool(cmd, "plaintext"),
)

cs := &ConnectServer{
Manifest: mustGetString(cmd, "force-manifest"),
Manifest: sflags.MustGetString(cmd, "force-manifest"),
SubstreamsClientConfig: substreamsClientConfig,
StartBlock: mustGetUint64(cmd, "force-start-block"),
StartBlock: sflags.MustGetUint64(cmd, "force-start-block"),
}

reflector := grpcreflect.NewStaticReflector(
Expand Down
Loading

0 comments on commit cc24a20

Please sign in to comment.