diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 27d84a0a..27bd8dea 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * fix missing error handling when writing output data to files. This could result in tier1 request just "hanging" waiting for the file never produced by tier2. * fix handling of dstore error in tier1 'execout walker' causing stalling issues on S3 or on unexpected storage errors * increase number of retries on storage when writing states or execouts (5 -> 10) +* prevent slow squashing when loading each segment from full KV store (can happen when a stage contains multiple stores) ### Gui diff --git a/orchestrator/stage/squash.go b/orchestrator/stage/squash.go index 2513aa6f..e5cb0c2f 100644 --- a/orchestrator/stage/squash.go +++ b/orchestrator/stage/squash.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" + "github.com/hashicorp/go-multierror" "github.com/streamingfast/substreams/reqctx" "github.com/streamingfast/substreams/storage/store" ) @@ -46,6 +47,12 @@ func (s *Stages) multiSquash(stage *Stage, mergeUnit Unit) error { return stage.syncWork.Wait() } +type Result struct { + partialKVStore *store.PartialKV + fullKVStore *store.FullKV + error error +} + // The singleSquash operation's goal is to take the up-most contiguous unit // tha is compete, and take the very next partial, squash it and produce a FullKV // store. @@ -63,7 +70,6 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni rng := modState.segmenter.Range(mergeUnit.Segment) metrics.blockRange = rng partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock) - partialKV := modState.derivePartialKV(rng.StartBlock) segmentEndsOnInterval := modState.segmenter.EndsOnInterval(mergeUnit.Segment) // Retrieve store to merge, from cache or load from storage. Allows skipping of segments @@ -75,19 +81,46 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni // Load metrics.loadStart = time.Now() - if err := partialKV.Load(s.ctx, partialFile); err != nil { - if nextFull, err := modState.getStore(s.ctx, rng.ExclusiveEndBlock); err == nil { // try to load an already-merged file - - modState.cachedStore = nextFull - modState.lastBlockInStore = rng.ExclusiveEndBlock - metrics.loadEnd = time.Now() - - s.logger.Info("squashing time metrics", metrics.logFields()...) - return nil + ctx, cancel := context.WithCancel(s.ctx) + + results := make(chan Result, 2) + go func() { + partial := modState.derivePartialKV(rng.StartBlock) + err := partial.Load(ctx, partialFile) + results <- Result{partialKVStore: partial, error: err} + }() + + go func() { + nextFull, err := modState.getStore(ctx, rng.ExclusiveEndBlock) + results <- Result{fullKVStore: nextFull, error: err} + }() + + var partialKV *store.PartialKV +loop: + for i := 0; i < 2; i++ { + select { + case <-ctx.Done(): + break loop + case result := <-results: + if result.error != nil { + err = multierror.Append(err, result.error) + continue loop + } + if result.fullKVStore != nil { + modState.cachedStore = result.fullKVStore + modState.lastBlockInStore = rng.ExclusiveEndBlock + metrics.loadEnd = time.Now() + s.logger.Info("squashing time metrics", metrics.logFields()...) + cancel() + return nil + } + partialKV = result.partialKVStore + break loop } - return fmt.Errorf("loading partial: %q: %w", partialFile.Filename, err) } + cancel() + metrics.loadEnd = time.Now() // Merge