Skip to content

Commit

Permalink
Update squasher loading logic (#454)
Browse files Browse the repository at this point in the history
* Update squasher loading logic
* improve flow for loading kv vs partial store

---------

Co-authored-by: arnaudberger <[email protected]>
Co-authored-by: Stéphane Duchesneau <[email protected]>
  • Loading branch information
3 people authored Apr 11, 2024
1 parent 43a8245 commit 0924f35
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* fix missing error handling when writing output data to files. This could result in tier1 request just "hanging" waiting for the file never produced by tier2.
* fix handling of dstore error in tier1 'execout walker' causing stalling issues on S3 or on unexpected storage errors
* increase number of retries on storage when writing states or execouts (5 -> 10)
* prevent slow squashing when loading each segment from full KV store (can happen when a stage contains multiple stores)

### Gui

Expand Down
55 changes: 44 additions & 11 deletions orchestrator/stage/squash.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"go.uber.org/zap"

"github.com/hashicorp/go-multierror"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/store"
)
Expand Down Expand Up @@ -46,6 +47,12 @@ func (s *Stages) multiSquash(stage *Stage, mergeUnit Unit) error {
return stage.syncWork.Wait()
}

type Result struct {
partialKVStore *store.PartialKV
fullKVStore *store.FullKV
error error
}

// The singleSquash operation's goal is to take the up-most contiguous unit
// tha is compete, and take the very next partial, squash it and produce a FullKV
// store.
Expand All @@ -63,7 +70,6 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni
rng := modState.segmenter.Range(mergeUnit.Segment)
metrics.blockRange = rng
partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock)
partialKV := modState.derivePartialKV(rng.StartBlock)
segmentEndsOnInterval := modState.segmenter.EndsOnInterval(mergeUnit.Segment)

// Retrieve store to merge, from cache or load from storage. Allows skipping of segments
Expand All @@ -75,19 +81,46 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni

// Load
metrics.loadStart = time.Now()
if err := partialKV.Load(s.ctx, partialFile); err != nil {
if nextFull, err := modState.getStore(s.ctx, rng.ExclusiveEndBlock); err == nil { // try to load an already-merged file

modState.cachedStore = nextFull
modState.lastBlockInStore = rng.ExclusiveEndBlock
metrics.loadEnd = time.Now()

s.logger.Info("squashing time metrics", metrics.logFields()...)

return nil
ctx, cancel := context.WithCancel(s.ctx)

results := make(chan Result, 2)
go func() {
partial := modState.derivePartialKV(rng.StartBlock)
err := partial.Load(ctx, partialFile)
results <- Result{partialKVStore: partial, error: err}
}()

go func() {
nextFull, err := modState.getStore(ctx, rng.ExclusiveEndBlock)
results <- Result{fullKVStore: nextFull, error: err}
}()

var partialKV *store.PartialKV
loop:
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
break loop
case result := <-results:
if result.error != nil {
err = multierror.Append(err, result.error)
continue loop
}
if result.fullKVStore != nil {
modState.cachedStore = result.fullKVStore
modState.lastBlockInStore = rng.ExclusiveEndBlock
metrics.loadEnd = time.Now()
s.logger.Info("squashing time metrics", metrics.logFields()...)
cancel()
return nil
}
partialKV = result.partialKVStore
break loop
}
return fmt.Errorf("loading partial: %q: %w", partialFile.Filename, err)
}
cancel()

metrics.loadEnd = time.Now()

// Merge
Expand Down

0 comments on commit 0924f35

Please sign in to comment.