diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index c7e483c6..681c6af9 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -9,17 +9,31 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## v1.9.0 -- Expose a new intrinsic to modules: `skip_empty_output`, which causes the module output to be skipped if it has zero bytes. Be careful, a protobuf object with all its default values will have zero bytes. +### Important BUG FIX + +* Fix a bug introduced in v1.6.0 that could result in corrupted store "state" file if all + the "outputs" were already cached for a module in a given segment (rare occurence) +* We recommend clearing your substreams cache after this upgrade and re-processing or + validating your data if you use stores. + +### Fixed + +* substreams 'tools decode state' now correctly prints the `kvops` when pointing to store output files + +### Added + +* Expose a new intrinsic to modules: `skip_empty_output`, which causes the module output to be skipped if it has zero bytes. (Watch out, a protobuf object with all its default values will have zero bytes) +* Improve schedule order (faster time to first block) for substreams with multiple stages when starting mid-chain ## v1.8.2 -- `substreams init` (code generation): fix displaying of saved path in filenames +* `substreams init` (code generation): fix displaying of saved path in filenames ## v1.8.1 -- Add a `NoopMode` to the `Tier1` enabling to avoid sending data back to requester while processing live. +* Add a `NoopMode` to the `Tier1` enabling to avoid sending data back to requester while processing live. ## v1.8.0 diff --git a/service/tier2.go b/service/tier2.go index 04130af6..c96c479b 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -337,8 +337,21 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P excludable: for _, stage := range pipe.ModuleExecutors { for _, executor := range stage { - if executionPlan.ExistingExecOuts[executor.Name()] != nil { - continue + switch executor := executor.(type) { + case *exec.MapperModuleExecutor: + if executionPlan.ExistingExecOuts[executor.Name()] != nil { + continue + } + case *exec.IndexModuleExecutor: + if executionPlan.ExistingIndices[executor.Name()] != nil { + continue + } + case *exec.StoreModuleExecutor: + if executionPlan.ExistingExecOuts[executor.Name()] != nil { + if _, ok := executionPlan.StoresToWrite[executor.Name()]; !ok { + continue + } + } } if !executor.BlockIndex().ExcludesAllBlocks() { allExecutorsExcludedByBlockIndex = false diff --git a/storage/execout/file.go b/storage/execout/file.go index 1c5f559e..7ac145a6 100644 --- a/storage/execout/file.go +++ b/storage/execout/file.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "path" "sort" "strconv" "sync" @@ -35,6 +36,9 @@ type File struct { loadedSize uint64 } +func (c *File) FullFilename() string { + return path.Join(c.store.BaseURL().String(), c.Filename()) +} func (c *File) Filename() string { return computeDBinFilename(c.Range.StartBlock, c.Range.ExclusiveEndBlock) } diff --git a/tools/decode.go b/tools/decode.go index 1e5f7724..decf18e8 100644 --- a/tools/decode.go +++ b/tools/decode.go @@ -3,6 +3,7 @@ package tools import ( "context" "encoding/hex" + "encoding/json" "fmt" "strconv" "strings" @@ -14,10 +15,12 @@ import ( "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dstore" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/descriptorpb" "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/manifest" + pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/streamingfast/substreams/storage/execout" "github.com/streamingfast/substreams/storage/index" @@ -312,7 +315,7 @@ func runDecodeOutputsModuleRunE(cmd *cobra.Command, args []string) error { case *pbsubstreams.Module_KindMap_: return searchOutputsModule(ctx, requestedBlocks, startBlock, saveInterval, moduleHash, matchingModule, s, protoFiles) case *pbsubstreams.Module_KindStore_: - return searchOutputsModule(ctx, requestedBlocks, startBlock, saveInterval, moduleHash, matchingModule, s, protoFiles) + return searchOutputsModuleKvOps(ctx, requestedBlocks, startBlock, saveInterval, moduleHash, matchingModule, s) } return fmt.Errorf("module has an unknown") } @@ -340,6 +343,7 @@ func searchOutputsModule( rng := block.NewRange(startBlock, startBlock-startBlock%saveInterval+saveInterval) outputCache := modStore.NewFile(rng) + fmt.Println("filename:", outputCache.FullFilename()) zlog.Info("loading block from store", zap.Uint64("start_block", startBlock), zap.Stringer("requested_block_range", requestedBlocks)) if err := outputCache.Load(ctx); err != nil { if err == dstore.ErrNotFound { @@ -366,6 +370,55 @@ func searchOutputsModule( return nil } +func searchOutputsModuleKvOps( + ctx context.Context, + requestedBlocks *block.Range, + startBlock, + saveInterval uint64, + moduleHash string, + module *pbsubstreams.Module, + stateStore dstore.Store, +) error { + modStore, err := execout.NewConfig(module.Name, module.InitialBlock, pbsubstreams.ModuleKindMap, moduleHash, stateStore, zlog) + if err != nil { + return fmt.Errorf("execout new config: %w", err) + } + + moduleStore, err := stateStore.SubStore(moduleHash + "/outputs") + if err != nil { + return fmt.Errorf("can't find substore for hash %q: %w", moduleHash, err) + } + + rng := block.NewRange(startBlock, startBlock-startBlock%saveInterval+saveInterval) + + outputCache := modStore.NewFile(rng) + fmt.Println("filename:", outputCache.FullFilename()) + zlog.Info("loading block from store", zap.Uint64("start_block", startBlock), zap.Stringer("requested_block_range", requestedBlocks)) + if err := outputCache.Load(ctx); err != nil { + if err == dstore.ErrNotFound { + return fmt.Errorf("can't find cache at block %d storeURL %q", startBlock, moduleStore.BaseURL().String()) + } + + return fmt.Errorf("loading cache %s file %s : %w", moduleStore.BaseURL(), outputCache.String(), err) + } + + for i := requestedBlocks.StartBlock; i < requestedBlocks.ExclusiveEndBlock; i++ { + payloadBytes, found := outputCache.GetAtBlock(i) + if !found { + continue + } + + fmt.Println("Block", i) + if len(payloadBytes) == 0 { + continue + } + if err := printKVOps(payloadBytes); err != nil { + return fmt.Errorf("printing object: %w", err) + } + } + return nil +} + func searchStateModule( ctx context.Context, startBlock uint64, @@ -386,6 +439,8 @@ func searchStateModule( return fmt.Errorf("unable to load file: %w", err) } + fmt.Println("filename:", stateStore.BaseURL().JoinPath(moduleHash, "states", file.Filename+".zst").String()) + bytes, found := moduleStore.GetLast(key) if !found { return fmt.Errorf("no data found for %q", key) @@ -393,6 +448,20 @@ func searchStateModule( return printObject(module, protoFiles, bytes) } +func printKVOps(data []byte) error { + kvOps := &pbssinternal.Operations{} + if err := proto.Unmarshal(data, kvOps); err != nil { + return fmt.Errorf("unmarshalling kvOps: %w", err) + } + + asJSON, err := json.Marshal(kvOps) + if err != nil { + return fmt.Errorf("marshalling back as json: %w", err) + } + fmt.Println(string(asJSON)) + return nil +} + func printObject(module *pbsubstreams.Module, protoFiles []*descriptorpb.FileDescriptorProto, data []byte) error { protoDefinition := "" valuePrinted := false