From 2b6bdc2a29de8d94d78d1c735104f124436a92e5 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 12 Sep 2023 12:00:15 +0200 Subject: [PATCH] chore(storage): Simplify store interfaces and abstractions (pt 2) (#10454) **What this PR does / why we need it**: * Remove preemptive interface from indexReaderWriter and make it an exported struct * Compose IngesterStore and QuerierStore The ingester and querier do not require a store that implements the full interface, but can accept stores that are a subset of the interfaces. Therefore we can make specific interfaces that are accepted by the ingester and querier constructors. * Split up ChunkFetcher interface This commit splits the ChunkFetcher interface into a ChunkFetcher and a ChunkFetcherProvider interface. This has the advantage that methods can allow accepting smaller interfaces. This commit also renames the `GetChunkRefs()` method of the ChunkFetcher interface into `GetChunks()`, because it returns a `[][]chunk.Chunk`, not a `[]chunk.ChunkRef` type. Signed-off-by: Christian Haudum --- clients/pkg/promtail/client/client_test.go | 1 + cmd/migrate/main.go | 2 +- .../loki_micro_services_delete_test.go | 1 + pkg/ingester/flush_test.go | 10 +++- pkg/ingester/ingester.go | 25 +++++----- pkg/ingester/ingester_test.go | 4 +- pkg/querier/querier.go | 12 ++++- pkg/querier/querier_mock_test.go | 2 +- pkg/storage/async_store.go | 4 +- pkg/storage/async_store_test.go | 10 ++-- pkg/storage/store.go | 14 ++++-- pkg/storage/store_test.go | 8 ++-- pkg/storage/stores/composite_store.go | 13 +++-- pkg/storage/stores/composite_store_entry.go | 4 +- pkg/storage/stores/composite_store_test.go | 2 +- pkg/storage/stores/index/index.go | 26 +++++----- .../series/series_index_gateway_store.go | 3 +- .../stores/series/series_index_store.go | 48 +++++++++---------- .../stores/series/series_store_test.go | 4 +- .../shipper/index/compactor/util_test.go | 2 +- .../stores/shipper/indexgateway/gateway.go | 3 +- pkg/storage/util_test.go | 2 +- 22 files changed, 115 insertions(+), 85 deletions(-) diff --git a/clients/pkg/promtail/client/client_test.go b/clients/pkg/promtail/client/client_test.go index 472b43cd0b241..01cbb87cc1116 100644 --- a/clients/pkg/promtail/client/client_test.go +++ b/clients/pkg/promtail/client/client_test.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/utils" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/push" lokiflag "github.com/grafana/loki/pkg/util/flagext" diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 2057cf4d06950..e70f0359a3dbd 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -310,7 +310,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh < var totalBytes uint64 var totalChunks uint64 //log.Printf("%d processing sync range %d - Start: %v, End: %v\n", threadID, sr.number, time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC()) - schemaGroups, fetchers, err := m.source.GetChunkRefs(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), m.matchers...) + schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), m.matchers...) if err != nil { log.Println(threadID, "Error querying index for chunk refs:", err) errCh <- err diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index 7744ad6fb4540..4643d3054e86b 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/integration/client" "github.com/grafana/loki/integration/cluster" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/push" diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 904b4d5824966..f7e61238036e1 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -339,6 +339,10 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil } +func (s *testStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { + return nil +} + func (s *testStore) IsLocal() bool { return false } @@ -351,7 +355,11 @@ func (s *testStore) SelectSamples(_ context.Context, _ logql.SelectSampleParams) return nil, nil } -func (s *testStore) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (s *testStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { + return nil, nil +} + +func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { return nil, nil, nil } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dd89ece9527d7..1f0e870352c6b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -40,8 +40,9 @@ import ( "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores" + indexstore "github.com/grafana/loki/pkg/storage/stores/index" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/util" @@ -166,15 +167,13 @@ type Wrapper interface { Wrap(wrapped Interface) Interface } -// ChunkStore is the interface we need to store chunks. -type ChunkStore interface { - Put(ctx context.Context, chunks []chunk.Chunk) error - SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) - SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) - GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) - GetSchemaConfigs() []config.PeriodConfig - Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error) - Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) +// Store is the store interface we need on the ingester. +type Store interface { + stores.ChunkWriter + stores.ChunkFetcher + storage.SelectStore + storage.SchemaConfigProvider + indexstore.StatsReader } // Interface is an interface for the Ingester @@ -211,7 +210,7 @@ type Ingester struct { lifecycler *ring.Lifecycler lifecyclerWatcher *services.FailureWatcher - store ChunkStore + store Store periodicConfigs []config.PeriodConfig loopDone sync.WaitGroup @@ -248,7 +247,7 @@ type Ingester struct { } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } @@ -1006,7 +1005,7 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq } // get chunk references - chunksGroups, _, err := i.store.GetChunkRefs(ctx, orgID, start, end, matchers...) + chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, matchers...) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b453e5a9ea0ab..5f2f788e6feb3 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -433,7 +433,7 @@ func (s *mockStore) SelectSamples(_ context.Context, _ logql.SelectSampleParams) return nil, nil } -func (s *mockStore) GetSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { +func (s *mockStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { return nil, nil } @@ -449,7 +449,7 @@ func (s *mockStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) er return nil } -func (s *mockStore) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (s *mockStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { return nil, nil, nil } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 0688b140261ee..df2664db5d484 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -8,6 +8,7 @@ import ( "github.com/opentracing/opentracing-go" + "github.com/grafana/loki/pkg/storage/stores/index" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" "github.com/go-kit/log/level" @@ -100,10 +101,17 @@ type Limits interface { MaxEntriesLimitPerQuery(context.Context, string) int } +// Store is the store interface we need on the querier. +type Store interface { + storage.SelectStore + index.BaseReader + index.StatsReader +} + // SingleTenantQuerier handles single tenant queries. type SingleTenantQuerier struct { cfg Config - store storage.Store + store Store limits Limits ingesterQuerier *IngesterQuerier deleteGetter deleteGetter @@ -115,7 +123,7 @@ type deleteGetter interface { } // New makes a new Querier. -func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) { +func New(cfg Config, store Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) { return &SingleTenantQuerier{ cfg: cfg, store: store, diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index c623e3391d0ce..0255ac90e7737 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -319,7 +319,7 @@ func (s *storeMock) SelectSamples(ctx context.Context, req logql.SelectSamplePar return res.(iter.SampleIterator), args.Error(1) } -func (s *storeMock) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { args := s.Called(ctx, userID, from, through, matchers) return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*fetcher.Fetcher), args.Error(2) } diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index 6f02c41cae44b..f41cc1b4e729a 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -63,7 +63,7 @@ func (a *AsyncStore) shouldQueryIngesters(through, now model.Time) bool { return a.queryIngestersWithin == 0 || through.After(now.Add(-a.queryIngestersWithin)) } -func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (a *AsyncStore) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { spanLogger := spanlogger.FromContext(ctx) errs := make(chan error) @@ -72,7 +72,7 @@ func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, thro var fetchers []*fetcher.Fetcher go func() { var err error - storeChunks, fetchers, err = a.Store.GetChunkRefs(ctx, userID, from, through, matchers...) + storeChunks, fetchers, err = a.Store.GetChunks(ctx, userID, from, through, matchers...) errs <- err }() diff --git a/pkg/storage/async_store_test.go b/pkg/storage/async_store_test.go index 18e14164be432..83aab239ea10d 100644 --- a/pkg/storage/async_store_test.go +++ b/pkg/storage/async_store_test.go @@ -29,7 +29,7 @@ func newStoreMock() *storeMock { return &storeMock{} } -func (s *storeMock) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { args := s.Called(ctx, userID, from, through, matchers) return args.Get(0).([][]chunk.Chunk), args.Get(1).([]*fetcher.Fetcher), args.Error(2) } @@ -233,7 +233,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { store := newStoreMock() - store.On("GetChunkRefs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.storeChunks, tc.storeFetcher, nil) + store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.storeChunks, tc.storeFetcher, nil) store.On("GetChunkFetcher", mock.Anything).Return(tc.ingesterFetcher) ingesterQuerier := newIngesterQuerierMock() @@ -242,7 +242,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { asyncStoreCfg := AsyncStoreCfg{IngesterQuerier: ingesterQuerier} asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{}) - chunks, fetchers, err := asyncStore.GetChunkRefs(context.Background(), "fake", model.Now(), model.Now(), nil) + chunks, fetchers, err := asyncStore.GetChunks(context.Background(), "fake", model.Now(), model.Now(), nil) require.NoError(t, err) require.Equal(t, tc.expectedChunks, chunks) @@ -293,7 +293,7 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) { t.Run(tc.name, func(t *testing.T) { store := newStoreMock() - store.On("GetChunkRefs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([][]chunk.Chunk{}, []*fetcher.Fetcher{}, nil) + store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([][]chunk.Chunk{}, []*fetcher.Fetcher{}, nil) ingesterQuerier := newIngesterQuerierMock() ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) @@ -304,7 +304,7 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) { } asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{}) - _, _, err := asyncStore.GetChunkRefs(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil) + _, _, err := asyncStore.GetChunks(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil) require.NoError(t, err) expectedNumCalls := 0 diff --git a/pkg/storage/store.go b/pkg/storage/store.go index d07c3ffe4ef0d..5cca9deb62048 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -52,10 +52,14 @@ type SelectStore interface { SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) } +type SchemaConfigProvider interface { + GetSchemaConfigs() []config.PeriodConfig +} + type Store interface { stores.Store SelectStore - GetSchemaConfigs() []config.PeriodConfig + SchemaConfigProvider } type LokiStore struct { @@ -306,11 +310,11 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl } indexReaderWriter := series.NewIndexReaderWriter(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize, s.writeDedupeCache) - indexReaderWriter = index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg) - chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, indexReaderWriter, s.storeCfg.DisableIndexDeduplication) + monitoredReaderWriter := index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg) + chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, monitoredReaderWriter, s.storeCfg.DisableIndexDeduplication) return chunkWriter, - indexReaderWriter, + monitoredReaderWriter, func() { chunkClient.Stop() f.Stop() @@ -382,7 +386,7 @@ func (s *LokiStore) lazyChunks(ctx context.Context, matchers []*labels.Matcher, stats := stats.FromContext(ctx) start := time.Now() - chks, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, matchers...) + chks, fetchers, err := s.GetChunks(ctx, userID, from, through, matchers...) stats.AddChunkRefsFetchTime(time.Since(start)) if err != nil { diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 13efd8ffa538f..eb9ea6306cc8f 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1080,7 +1080,7 @@ func TestStore_indexPrefixChange(t *testing.T) { } // get all the chunks from the first period - chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate), newMatchers(fooLabelsWithName.String())...) + chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate), newMatchers(fooLabelsWithName.String())...) require.NoError(t, err) var totalChunks int for _, chks := range chunks { @@ -1148,7 +1148,7 @@ func TestStore_indexPrefixChange(t *testing.T) { } // get all the chunks from both the stores - chunks, _, err = store.GetChunkRefs(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) + chunks, _, err = store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) require.NoError(t, err) totalChunks = 0 @@ -1281,7 +1281,7 @@ func TestStore_MultiPeriod(t *testing.T) { defer store.Stop() // get all the chunks from both the stores - chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) + chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) require.NoError(t, err) var totalChunks int for _, chks := range chunks { @@ -1627,7 +1627,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { defer store.Stop() // get all the chunks from both the stores - chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) + chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) require.NoError(t, err) var totalChunks int for _, chks := range chunks { diff --git a/pkg/storage/stores/composite_store.go b/pkg/storage/stores/composite_store.go index 81fb4eb58a4aa..f5bf6328de561 100644 --- a/pkg/storage/stores/composite_store.go +++ b/pkg/storage/stores/composite_store.go @@ -22,16 +22,21 @@ type ChunkWriter interface { PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error } -type ChunkFetcher interface { +type ChunkFetcherProvider interface { GetChunkFetcher(tm model.Time) *fetcher.Fetcher - GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) +} + +type ChunkFetcher interface { + GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) } type Store interface { index.BaseReader + index.StatsReader index.Filterable ChunkWriter ChunkFetcher + ChunkFetcherProvider Stop() } @@ -149,11 +154,11 @@ func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID stri return result.Strings(), err } -func (c CompositeStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (c CompositeStore) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { chunkIDs := [][]chunk.Chunk{} fetchers := []*fetcher.Fetcher{} err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { - ids, fetcher, err := store.GetChunkRefs(innerCtx, userID, from, through, matchers...) + ids, fetcher, err := store.GetChunks(innerCtx, userID, from, through, matchers...) if err != nil { return err } diff --git a/pkg/storage/stores/composite_store_entry.go b/pkg/storage/stores/composite_store_entry.go index 60a9eef38d01f..1bdc5e9013c5e 100644 --- a/pkg/storage/stores/composite_store_entry.go +++ b/pkg/storage/stores/composite_store_entry.go @@ -42,11 +42,11 @@ type storeEntry struct { ChunkWriter } -func (c *storeEntry) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (c *storeEntry) GetChunks(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { if ctx.Err() != nil { return nil, nil, ctx.Err() } - sp, ctx := opentracing.StartSpanFromContext(ctx, "GetChunkRefs") + sp, ctx := opentracing.StartSpanFromContext(ctx, "GetChunks") defer sp.Finish() log := spanlogger.FromContext(ctx) defer log.Span.Finish() diff --git a/pkg/storage/stores/composite_store_test.go b/pkg/storage/stores/composite_store_test.go index 3bb1c09e46b6f..903a3e54de6b4 100644 --- a/pkg/storage/stores/composite_store_test.go +++ b/pkg/storage/stores/composite_store_test.go @@ -36,7 +36,7 @@ func (m mockStore) LabelValuesForMetricName(_ context.Context, _ string, _, _ mo func (m mockStore) SetChunkFilterer(_ chunk.RequestChunkFilterer) {} -func (m mockStore) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (m mockStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { return nil, nil, nil } diff --git a/pkg/storage/stores/index/index.go b/pkg/storage/stores/index/index.go index 1bdce80b3cc9d..6883a841f0e84 100644 --- a/pkg/storage/stores/index/index.go +++ b/pkg/storage/stores/index/index.go @@ -23,12 +23,16 @@ type BaseReader interface { GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) +} + +type StatsReader interface { Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) } type Reader interface { BaseReader + StatsReader GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) Filterable } @@ -42,19 +46,19 @@ type ReaderWriter interface { Writer } -type monitoredReaderWriter struct { +type MonitoredReaderWriter struct { rw ReaderWriter metrics *metrics } -func NewMonitoredReaderWriter(rw ReaderWriter, reg prometheus.Registerer) ReaderWriter { - return &monitoredReaderWriter{ +func NewMonitoredReaderWriter(rw ReaderWriter, reg prometheus.Registerer) *MonitoredReaderWriter { + return &MonitoredReaderWriter{ rw: rw, metrics: newMetrics(reg), } } -func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) { +func (m MonitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) { var chunks []logproto.ChunkRef if err := loki_instrument.TimeRequest(ctx, "chunk_refs", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { @@ -68,7 +72,7 @@ func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, return chunks, nil } -func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { +func (m MonitoredReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { var lbls []labels.Labels if err := loki_instrument.TimeRequest(ctx, "series", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error @@ -81,7 +85,7 @@ func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, fro return lbls, nil } -func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { +func (m MonitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { var values []string if err := loki_instrument.TimeRequest(ctx, "label_values", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error @@ -94,7 +98,7 @@ func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, use return values, nil } -func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { +func (m MonitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { var values []string if err := loki_instrument.TimeRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error @@ -107,7 +111,7 @@ func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, user return values, nil } -func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { +func (m MonitoredReaderWriter) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { var sts *stats.Stats if err := loki_instrument.TimeRequest(ctx, "stats", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error @@ -120,7 +124,7 @@ func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, t return sts, nil } -func (m monitoredReaderWriter) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) { +func (m MonitoredReaderWriter) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) { var vol *logproto.VolumeResponse if err := loki_instrument.TimeRequest(ctx, "volume", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error @@ -133,11 +137,11 @@ func (m monitoredReaderWriter) Volume(ctx context.Context, userID string, from, return vol, nil } -func (m monitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { +func (m MonitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { m.rw.SetChunkFilterer(chunkFilter) } -func (m monitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { +func (m MonitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { return loki_instrument.TimeRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { return m.rw.IndexChunk(ctx, from, through, chk) }) diff --git a/pkg/storage/stores/series/series_index_gateway_store.go b/pkg/storage/stores/series/series_index_gateway_store.go index 0ab67464f624a..55639cdf67e39 100644 --- a/pkg/storage/stores/series/series_index_gateway_store.go +++ b/pkg/storage/stores/series/series_index_gateway_store.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/stores/index" "github.com/grafana/loki/pkg/storage/stores/index/stats" ) @@ -22,7 +21,7 @@ type IndexGatewayClientStore struct { logger log.Logger } -func NewIndexGatewayClientStore(client logproto.IndexGatewayClient, logger log.Logger) index.ReaderWriter { +func NewIndexGatewayClientStore(client logproto.IndexGatewayClient, logger log.Logger) *IndexGatewayClientStore { return &IndexGatewayClientStore{ client: client, logger: logger, diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index 1b445572bd99c..9498f3a16be42 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -24,7 +24,6 @@ import ( "github.com/grafana/loki/pkg/storage/config" storageerrors "github.com/grafana/loki/pkg/storage/errors" "github.com/grafana/loki/pkg/storage/stores" - "github.com/grafana/loki/pkg/storage/stores/index" "github.com/grafana/loki/pkg/storage/stores/index/stats" series_index "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/util" @@ -63,7 +62,8 @@ var ( }) ) -type indexReaderWriter struct { +// IndexReaderWriter implements pkg/storage/stores/index.ReaderWriter +type IndexReaderWriter struct { schema series_index.SeriesStoreSchema index series_index.Client schemaCfg config.SchemaConfig @@ -74,8 +74,8 @@ type indexReaderWriter struct { } func NewIndexReaderWriter(schemaCfg config.SchemaConfig, schema series_index.SeriesStoreSchema, index series_index.Client, - fetcher *fetcher.Fetcher, chunkBatchSize int, writeDedupeCache cache.Cache) index.ReaderWriter { - return &indexReaderWriter{ + fetcher *fetcher.Fetcher, chunkBatchSize int, writeDedupeCache cache.Cache) *IndexReaderWriter { + return &IndexReaderWriter{ schema: schema, index: index, schemaCfg: schemaCfg, @@ -85,7 +85,7 @@ func NewIndexReaderWriter(schemaCfg config.SchemaConfig, schema series_index.Ser } } -func (c *indexReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { +func (c *IndexReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { writeReqs, keysToCache, err := c.calculateIndexEntries(ctx, from, through, chk) if err != nil { return err @@ -104,7 +104,7 @@ func (c *indexReaderWriter) IndexChunk(ctx context.Context, from, through model. } // calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. -func (c *indexReaderWriter) calculateIndexEntries(ctx context.Context, from, through model.Time, chunk chunk.Chunk) (series_index.WriteBatch, []string, error) { +func (c *IndexReaderWriter) calculateIndexEntries(ctx context.Context, from, through model.Time, chunk chunk.Chunk) (series_index.WriteBatch, []string, error) { seenIndexEntries := map[string]struct{}{} entries := []series_index.Entry{} @@ -149,7 +149,7 @@ func (c *indexReaderWriter) calculateIndexEntries(ctx context.Context, from, thr return result, missing, nil } -func (c *indexReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) { +func (c *IndexReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) { log := util_log.WithContext(ctx, util_log.Logger) // Check there is a metric name matcher of type equal, metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers) @@ -192,7 +192,7 @@ func (c *indexReaderWriter) GetChunkRefs(ctx context.Context, userID string, fro return chunks, nil } -func (c *indexReaderWriter) SetChunkFilterer(f chunk.RequestChunkFilterer) { +func (c *IndexReaderWriter) SetChunkFilterer(f chunk.RequestChunkFilterer) { c.chunkFilterer = f } @@ -209,7 +209,7 @@ func (c chunkGroup) Less(i, j int) bool { return c.schema.ExternalKey(c.chunks[i].ChunkRef) < c.schema.ExternalKey(c.chunks[j].ChunkRef) } -func (c *indexReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { +func (c *IndexReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { chks, err := c.GetChunkRefs(ctx, userID, from, through, matchers...) if err != nil { return nil, err @@ -218,7 +218,7 @@ func (c *indexReaderWriter) GetSeries(ctx context.Context, userID string, from, return c.chunksToSeries(ctx, chks, matchers) } -func (c *indexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.ChunkRef, matchers []*labels.Matcher) ([]labels.Labels, error) { +func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.ChunkRef, matchers []*labels.Matcher) ([]labels.Labels, error) { // download one per series and merge // group chunks by series chunksBySeries := filterChunkRefsByUniqueFingerprint(in) @@ -313,7 +313,7 @@ func (c *indexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch } // LabelNamesForMetricName retrieves all label names for a metric name. -func (c *indexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { +func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName") defer sp.Finish() log := spanlogger.FromContext(ctx) @@ -341,7 +341,7 @@ func (c *indexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID return labelNames, nil } -func (c *indexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { +func (c *IndexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName") defer sp.Finish() log := spanlogger.FromContext(ctx) @@ -377,7 +377,7 @@ func (c *indexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID } // LabelValuesForMetricName retrieves all label values for a single label name and metric name. -func (c *indexReaderWriter) labelValuesForMetricNameWithMatchers(ctx context.Context, userID string, from, through model.Time, metricName, labelName string, matchers ...*labels.Matcher) ([]string, error) { +func (c *IndexReaderWriter) labelValuesForMetricNameWithMatchers(ctx context.Context, userID string, from, through model.Time, metricName, labelName string, matchers ...*labels.Matcher) ([]string, error) { // Otherwise get series which include other matchers seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers) if err != nil { @@ -419,7 +419,7 @@ func (c *indexReaderWriter) labelValuesForMetricNameWithMatchers(ctx context.Con return result.Strings(), nil } -func (c *indexReaderWriter) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) { +func (c *IndexReaderWriter) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) { // Check if one of the labels is a shard annotation, pass that information to lookupSeriesByMetricNameMatcher, // and remove the label. shard, shardLabelIndex, err := astmapper.ShardFromMatchers(matchers) @@ -502,13 +502,13 @@ func (c *indexReaderWriter) lookupSeriesByMetricNameMatchers(ctx context.Context return ids, nil } -func (c *indexReaderWriter) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, shard *astmapper.ShardAnnotation) ([]string, error) { +func (c *IndexReaderWriter) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, shard *astmapper.ShardAnnotation) ([]string, error) { return c.lookupIdsByMetricNameMatcher(ctx, from, through, userID, metricName, matcher, func(queries []series_index.Query) []series_index.Query { return c.schema.FilterReadQueries(queries, shard) }) } -func (c *indexReaderWriter) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]series_index.Query) []series_index.Query) ([]string, error) { +func (c *IndexReaderWriter) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]series_index.Query) []series_index.Query) ([]string, error) { var err error var queries []series_index.Query var labelName string @@ -600,7 +600,7 @@ var entriesPool = sync.Pool{ }, } -func (c *indexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries []series_index.Query, entries *[]series_index.Entry) error { +func (c *IndexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries []series_index.Query, entries *[]series_index.Entry) error { *entries = (*entries)[:0] // Nothing to do if there are no queries. if len(queries) == 0 { @@ -628,7 +628,7 @@ func (c *indexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries return err } -func (c *indexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { +func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesBySeries") defer sp.Finish() log := spanlogger.FromContext(ctx) @@ -665,7 +665,7 @@ func (c *indexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, return result.Strings(), nil } -func (c *indexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { +func (c *IndexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesByChunks") defer sp.Finish() log := spanlogger.FromContext(ctx) @@ -701,7 +701,7 @@ func (c *indexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, return labelNamesFromChunks(allChunks), nil } -func (c *indexReaderWriter) lookupChunksBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { +func (c *IndexReaderWriter) lookupChunksBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { queries := make([]series_index.Query, 0, len(seriesIDs)) for _, seriesID := range seriesIDs { qs, err := c.schema.GetChunksForSeries(from, through, userID, []byte(seriesID)) @@ -722,7 +722,7 @@ func (c *indexReaderWriter) lookupChunksBySeries(ctx context.Context, from, thro return result, err } -func (c *indexReaderWriter) convertChunkIDsToChunks(_ context.Context, userID string, chunkIDs []string) ([]chunk.Chunk, error) { +func (c *IndexReaderWriter) convertChunkIDsToChunks(_ context.Context, userID string, chunkIDs []string) ([]chunk.Chunk, error) { chunkSet := make([]chunk.Chunk, 0, len(chunkIDs)) for _, chunkID := range chunkIDs { chunk, err := chunk.ParseExternalKey(userID, chunkID) @@ -735,7 +735,7 @@ func (c *indexReaderWriter) convertChunkIDsToChunks(_ context.Context, userID st return chunkSet, nil } -func (c *indexReaderWriter) convertChunkIDsToChunkRefs(_ context.Context, userID string, chunkIDs []string) ([]logproto.ChunkRef, error) { +func (c *IndexReaderWriter) convertChunkIDsToChunkRefs(_ context.Context, userID string, chunkIDs []string) ([]logproto.ChunkRef, error) { chunkSet := make([]logproto.ChunkRef, 0, len(chunkIDs)) for _, chunkID := range chunkIDs { chunk, err := chunk.ParseExternalKey(userID, chunkID) @@ -749,11 +749,11 @@ func (c *indexReaderWriter) convertChunkIDsToChunkRefs(_ context.Context, userID } // old index stores do not implement stats -- skip -func (c *indexReaderWriter) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) { +func (c *IndexReaderWriter) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) { return nil, nil } // old index stores do not implement label volume -- skip -func (c *indexReaderWriter) Volume(_ context.Context, _ string, _, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) { +func (c *IndexReaderWriter) Volume(_ context.Context, _ string, _, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) { return nil, nil } diff --git a/pkg/storage/stores/series/series_store_test.go b/pkg/storage/stores/series/series_store_test.go index ed7e45e9ccd67..fd68299a86223 100644 --- a/pkg/storage/stores/series/series_store_test.go +++ b/pkg/storage/stores/series/series_store_test.go @@ -395,7 +395,7 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { t.Fatal(err) } - chunks, fetchers, err := store.GetChunkRefs(ctx, userID, now.Add(-time.Hour), now, matchers...) + chunks, fetchers, err := store.GetChunks(ctx, userID, now.Add(-time.Hour), now, matchers...) require.NoError(t, err) fetchedChunk := []chunk.Chunk{} for _, f := range fetchers { @@ -655,7 +655,7 @@ func TestChunkStoreError(t *testing.T) { require.NoError(t, err) // Query with ordinary time-range - _, _, err = store.GetChunkRefs(ctx, userID, tc.from, tc.through, matchers...) + _, _, err = store.GetChunks(ctx, userID, tc.from, tc.through, matchers...) require.EqualError(t, err, tc.err) }) } diff --git a/pkg/storage/stores/shipper/index/compactor/util_test.go b/pkg/storage/stores/shipper/index/compactor/util_test.go index b6e2299ff76ed..f8e83595e61c8 100644 --- a/pkg/storage/stores/shipper/index/compactor/util_test.go +++ b/pkg/storage/stores/shipper/index/compactor/util_test.go @@ -168,7 +168,7 @@ func (t *testStore) GetChunks(userID string, from, through model.Time, metric la matchers = append(matchers, labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value)) } ctx := user.InjectOrgID(context.Background(), userID) - chunks, fetchers, err := t.Store.GetChunkRefs(ctx, userID, from, through, matchers...) + chunks, fetchers, err := t.Store.GetChunks(ctx, userID, from, through, matchers...) require.NoError(t.t, err) fetchedChunk := []chunk.Chunk{} for _, f := range fetchers { diff --git a/pkg/storage/stores/shipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexgateway/gateway.go index 223aca888c7d2..565712c59ca0f 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway.go @@ -33,6 +33,7 @@ const ( type IndexQuerier interface { stores.ChunkFetcher index.BaseReader + index.StatsReader Stop() } @@ -195,7 +196,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ if err != nil { return nil, err } - chunks, _, err := g.indexQuerier.GetChunkRefs(ctx, instanceID, req.From, req.Through, matchers...) + chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, matchers...) if err != nil { return nil, err } diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 915509a1cfa35..72ae05d4e99ba 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -232,7 +232,7 @@ func (m *mockChunkStore) GetChunkFetcher(_ model.Time) *fetcher.Fetcher { return nil } -func (m *mockChunkStore) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { +func (m *mockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { refs := make([]chunk.Chunk, 0, len(m.chunks)) // transform real chunks into ref chunks. for _, c := range m.chunks {