From 0f242e7861c8ce226fea0bca5d09a78c98fa4830 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 22 Nov 2024 16:24:09 +0100 Subject: [PATCH] chore(refactor): Avoid goroutine leak in case of fetching errors (#15018) --- pkg/storage/async_store.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index 5d9752f8813ea..212b7a3776206 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -7,6 +7,7 @@ import ( "github.com/c2h5oh/datasize" "github.com/opentracing/opentracing-go" + "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/stores" @@ -72,23 +73,23 @@ func (a *AsyncStore) GetChunks(ctx context.Context, predicate chunk.Predicate, storeChunksOverride *logproto.ChunkRefGroup, ) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { - errs := make(chan error) var storeChunks [][]chunk.Chunk + g, ctx := errgroup.WithContext(ctx) + var fetchers []*fetcher.Fetcher - go func() { + g.Go(func() error { var err error storeChunks, fetchers, err = a.Store.GetChunks(ctx, userID, from, through, predicate, storeChunksOverride) - errs <- err - }() + return err + }) var ingesterChunks []string - go func() { + g.Go(func() error { if !a.shouldQueryIngesters(through, model.Now()) { level.Debug(util_log.Logger).Log("msg", "skipping querying ingesters for chunk ids", "query-from", from, "query-through", through) - errs <- nil - return + return nil } var err error @@ -100,14 +101,11 @@ func (a *AsyncStore) GetChunks(ctx context.Context, } level.Debug(util_log.Logger).Log("msg", "got chunk ids from ingester", "count", len(ingesterChunks)) } - errs <- err - }() + return err + }) - for i := 0; i < 2; i++ { - err := <-errs - if err != nil { - return nil, nil, err - } + if err := g.Wait(); err != nil { + return nil, nil, err } if len(ingesterChunks) == 0 {