Skip to content

Commit

Permalink
error group (#7822)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadavsteindler committed Jul 8, 2024
1 parent e89cab6 commit 67dbd52
Showing 1 changed file with 11 additions and 15 deletions.
26 changes: 11 additions & 15 deletions pkg/block/gs/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package gs

import (
"fmt"
"sync"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
)

const MaxPartsInCompose = 32

type ComposeFunc func(target string, parts []string) (*storage.ObjectAttrs, error)

func ComposeAll(target string, parts []string, composeFunc ComposeFunc) (*storage.ObjectAttrs, error) {
var wg sync.WaitGroup
group := errgroup.Group{}

for layer := 1; len(parts) > MaxPartsInCompose; layer++ {
var nextParts []string
for i := 0; i < len(parts); i += MaxPartsInCompose {
Expand All @@ -25,24 +26,19 @@ func ComposeAll(target string, parts []string, composeFunc ComposeFunc) (*storag
nextParts = append(nextParts, chunk...)
} else {
targetName := fmt.Sprintf("%s_%d", chunk[0], layer)
wg.Add(1)
go composeChunkConcurrent(targetName, chunk, composeFunc, &wg)
group.Go(func() error {
_, err := composeFunc(targetName, chunk)
return err
})
nextParts = append(nextParts, targetName)
}
}
parts = nextParts
}
wg.Wait()

// no compose the chunks we made
return composeFunc(target, parts)
}

func composeChunkConcurrent(target string, parts []string, composeFunc ComposeFunc, wg *sync.WaitGroup) {
// ctx context.Context, a *Adapter, target string, parts []string, bucketName string, bucket *storage.BucketHandle, wg *sync.WaitGroup) {
_, err := composeFunc(target, parts)
if err != nil {
fmt.Println("Compose error: ", err)
// wait for 1st round of composes to complete
if err := group.Wait(); err != nil {
return nil, err
}
wg.Done()
return composeFunc(target, parts) // 2nd round of composes
}

0 comments on commit 67dbd52

Please sign in to comment.