Skip to content

Commit

Permalink
fix: do not retain span logger created with index set initialized at …
Browse files Browse the repository at this point in the history
…query time (#14027)
  • Loading branch information
sandeepsukhani authored Sep 2, 2024
1 parent bf60455 commit 4e41744
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 14 deletions.
11 changes: 4 additions & 7 deletions pkg/storage/stores/shipper/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

Expand All @@ -32,7 +31,7 @@ const (
var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")

type IndexSet interface {
Init(forQuerying bool) error
Init(forQuerying bool, logger log.Logger) error
Close()
ForEach(ctx context.Context, callback index.ForEachIndexCallback) error
ForEachConcurrent(ctx context.Context, callback index.ForEachIndexCallback) error
Expand Down Expand Up @@ -94,14 +93,12 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
}

// Init downloads all the db files for the table from object storage.
func (t *indexSet) Init(forQuerying bool) (err error) {
func (t *indexSet) Init(forQuerying bool, logger log.Logger) (err error) {
// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
ctx := context.Background()
ctx, t.cancelFunc = context.WithTimeout(ctx, downloadTimeout)

logger, ctx := spanlogger.NewWithLogger(ctx, t.logger, "indexSet.Init")

defer func() {
if err != nil {
level.Error(logger).Log("msg", "failed to initialize table, cleaning it up", "table", t.tableName, "err", err)
Expand Down Expand Up @@ -186,7 +183,7 @@ func (t *indexSet) ForEach(ctx context.Context, callback index.ForEachIndexCallb
}
defer t.indexMtx.rUnlock()

logger := util_log.WithContext(ctx, t.logger)
logger := spanlogger.FromContextWithFallback(ctx, t.logger)
level.Debug(logger).Log("index-files-count", len(t.index))

for _, idx := range t.index {
Expand All @@ -205,7 +202,7 @@ func (t *indexSet) ForEachConcurrent(ctx context.Context, callback index.ForEach
}
defer t.indexMtx.rUnlock()

logger := util_log.WithContext(ctx, t.logger)
logger := spanlogger.FromContextWithFallback(ctx, t.logger)
level.Debug(logger).Log("index-files-count", len(t.index))

if len(t.index) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc)
}, util_log.Logger)
require.NoError(t, err)

require.NoError(t, idxSet.Init(false))
require.NoError(t, idxSet.Init(false, util_log.Logger))

return idxSet.(*indexSet), idxSet.Close
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/storage/stores/shipper/indexshipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
}

userID := entry.Name()
logger := loggerWithUserID(table.logger, userID)
userIndexSet, err := NewIndexSet(name, userID, filepath.Join(cacheLocation, userID),
table.baseUserIndexSet, openIndexFileFunc, loggerWithUserID(table.logger, userID))
table.baseUserIndexSet, openIndexFileFunc, logger)
if err != nil {
return nil, err
}

err = userIndexSet.Init(false)
err = userIndexSet.Init(false, logger)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +130,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}

err = commonIndexSet.Init(false)
err = commonIndexSet.Init(false, table.logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,7 +288,7 @@ func (t *table) Sync(ctx context.Context) error {
// forQuerying must be set to true only getting the index for querying since
// it captures the amount of time it takes to download the index at query time.
func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) {
logger := spanlogger.FromContextWithFallback(ctx, log.With(t.logger, "user", id, "table", t.name))
logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id))

t.indexSetsMtx.RLock()
indexSet, ok := t.indexSets[id]
Expand All @@ -311,7 +312,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
}

// instantiate the index set, add it to the map
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, logger)
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, loggerWithUserID(t.logger, id))
if err != nil {
return nil, err
}
Expand All @@ -321,7 +322,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
// it is up to the caller to wait for its readiness using IndexSet.AwaitReady()
go func() {
start := time.Now()
err := indexSet.Init(forQuerying)
err := indexSet.Init(forQuerying, logger)
duration := time.Since(start)

level.Info(logger).Log("msg", "init index set", "duration", duration, "success", err == nil)
Expand Down

0 comments on commit 4e41744

Please sign in to comment.