diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go index 8eae835b441c3..8edd121071c5e 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/index" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" - util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/spanlogger" ) @@ -32,7 +31,7 @@ const ( var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale") type IndexSet interface { - Init(forQuerying bool) error + Init(forQuerying bool, logger log.Logger) error Close() ForEach(ctx context.Context, callback index.ForEachIndexCallback) error ForEachConcurrent(ctx context.Context, callback index.ForEachIndexCallback) error @@ -94,14 +93,12 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I } // Init downloads all the db files for the table from object storage. -func (t *indexSet) Init(forQuerying bool) (err error) { +func (t *indexSet) Init(forQuerying bool, logger log.Logger) (err error) { // Using background context to avoid cancellation of download when request times out. // We would anyways need the files for serving next requests. ctx := context.Background() ctx, t.cancelFunc = context.WithTimeout(ctx, downloadTimeout) - logger, ctx := spanlogger.NewWithLogger(ctx, t.logger, "indexSet.Init") - defer func() { if err != nil { level.Error(logger).Log("msg", "failed to initialize table, cleaning it up", "table", t.tableName, "err", err) @@ -186,7 +183,7 @@ func (t *indexSet) ForEach(ctx context.Context, callback index.ForEachIndexCallb } defer t.indexMtx.rUnlock() - logger := util_log.WithContext(ctx, t.logger) + logger := spanlogger.FromContextWithFallback(ctx, t.logger) level.Debug(logger).Log("index-files-count", len(t.index)) for _, idx := range t.index { @@ -205,7 +202,7 @@ func (t *indexSet) ForEachConcurrent(ctx context.Context, callback index.ForEach } defer t.indexMtx.rUnlock() - logger := util_log.WithContext(ctx, t.logger) + logger := spanlogger.FromContextWithFallback(ctx, t.logger) level.Debug(logger).Log("index-files-count", len(t.index)) if len(t.index) == 0 { diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/index_set_test.go b/pkg/storage/stores/shipper/indexshipper/downloads/index_set_test.go index 5a2f6522de9f2..988c0457fd190 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/index_set_test.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/index_set_test.go @@ -26,7 +26,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc) }, util_log.Logger) require.NoError(t, err) - require.NoError(t, idxSet.Init(false)) + require.NoError(t, idxSet.Init(false, util_log.Logger)) return idxSet.(*indexSet), idxSet.Close } diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/table.go b/pkg/storage/stores/shipper/indexshipper/downloads/table.go index f329c3b41dcd8..1bae83c51e0e9 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/table.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/table.go @@ -109,13 +109,14 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd } userID := entry.Name() + logger := loggerWithUserID(table.logger, userID) userIndexSet, err := NewIndexSet(name, userID, filepath.Join(cacheLocation, userID), - table.baseUserIndexSet, openIndexFileFunc, loggerWithUserID(table.logger, userID)) + table.baseUserIndexSet, openIndexFileFunc, logger) if err != nil { return nil, err } - err = userIndexSet.Init(false) + err = userIndexSet.Init(false, logger) if err != nil { return nil, err } @@ -129,7 +130,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd return nil, err } - err = commonIndexSet.Init(false) + err = commonIndexSet.Init(false, table.logger) if err != nil { return nil, err } @@ -287,7 +288,7 @@ func (t *table) Sync(ctx context.Context) error { // forQuerying must be set to true only getting the index for querying since // it captures the amount of time it takes to download the index at query time. func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying bool) (IndexSet, error) { - logger := spanlogger.FromContextWithFallback(ctx, log.With(t.logger, "user", id, "table", t.name)) + logger := spanlogger.FromContextWithFallback(ctx, loggerWithUserID(t.logger, id)) t.indexSetsMtx.RLock() indexSet, ok := t.indexSets[id] @@ -311,7 +312,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying } // instantiate the index set, add it to the map - indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, logger) + indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.openIndexFileFunc, loggerWithUserID(t.logger, id)) if err != nil { return nil, err } @@ -321,7 +322,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying // it is up to the caller to wait for its readiness using IndexSet.AwaitReady() go func() { start := time.Now() - err := indexSet.Init(forQuerying) + err := indexSet.Init(forQuerying, logger) duration := time.Since(start) level.Info(logger).Log("msg", "init index set", "duration", duration, "success", err == nil)