diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 349b7016a37a8..bc8ebc0b8d8dc 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2229,11 +2229,6 @@ tsdb_shipper: [ingesterdbretainperiod: ] - # Experimental. Whether TSDB should cache postings or not. The - # index-read-cache will be used as the backend. - # CLI flag: -tsdb.enable-postings-cache - [enable_postings_cache: | default = false] - # Configures Bloom Shipper. bloom_shipper: # Working directory to store downloaded Bloom Blocks. diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index 1e1cb36044176..4fd9852234022 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -579,142 +579,6 @@ func TestSchedulerRing(t *testing.T) { }) } -func TestQueryTSDB_WithCachedPostings(t *testing.T) { - clu := cluster.New(nil, cluster.SchemaWithTSDB) - - defer func() { - assert.NoError(t, clu.Cleanup()) - }() - - var ( - tDistributor = clu.AddComponent( - "distributor", - "-target=distributor", - ) - tIndexGateway = clu.AddComponent( - "index-gateway", - "-target=index-gateway", - "-tsdb.enable-postings-cache=true", - "-store.index-cache-read.embedded-cache.enabled=true", - ) - ) - require.NoError(t, clu.Run()) - - var ( - tIngester = clu.AddComponent( - "ingester", - "-target=ingester", - "-ingester.flush-on-shutdown=true", - "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), - ) - tQueryScheduler = clu.AddComponent( - "query-scheduler", - "-target=query-scheduler", - "-query-scheduler.use-scheduler-ring=false", - "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), - ) - tCompactor = clu.AddComponent( - "compactor", - "-target=compactor", - "-boltdb.shipper.compactor.compaction-interval=1s", - "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), - ) - ) - require.NoError(t, clu.Run()) - - // finally, run the query-frontend and querier. - var ( - tQueryFrontend = clu.AddComponent( - "query-frontend", - "-target=query-frontend", - "-frontend.scheduler-address="+tQueryScheduler.GRPCURL(), - "-frontend.default-validity=0s", - "-common.compactor-address="+tCompactor.HTTPURL(), - "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), - ) - _ = clu.AddComponent( - "querier", - "-target=querier", - "-querier.scheduler-address="+tQueryScheduler.GRPCURL(), - "-common.compactor-address="+tCompactor.HTTPURL(), - "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), - ) - ) - require.NoError(t, clu.Run()) - - tenantID := randStringRunes() - - now := time.Now() - cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) - cliDistributor.Now = now - cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) - cliIngester.Now = now - cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL()) - cliQueryFrontend.Now = now - cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL()) - cliIndexGateway.Now = now - - // initial cache state. - igwMetrics, err := cliIndexGateway.Metrics() - require.NoError(t, err) - assertCacheState(t, igwMetrics, &expectedCacheState{ - cacheName: "store.index-cache-read.embedded-cache", - misses: 0, - added: 0, - }) - - t.Run("ingest-logs", func(t *testing.T) { - require.NoError(t, cliDistributor.PushLogLine("lineA", time.Now().Add(-72*time.Hour), nil, map[string]string{"job": "fake"})) - require.NoError(t, cliDistributor.PushLogLine("lineB", time.Now().Add(-48*time.Hour), nil, map[string]string{"job": "fake"})) - }) - - // restart ingester which should flush the chunks and index - require.NoError(t, tIngester.Restart()) - - // Query lines - t.Run("query to verify logs being served from storage", func(t *testing.T) { - resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) - require.NoError(t, err) - assert.Equal(t, "streams", resp.Data.ResultType) - - var lines []string - for _, stream := range resp.Data.Stream { - for _, val := range stream.Values { - lines = append(lines, val[1]) - } - } - - assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines) - }) - - igwMetrics, err = cliIndexGateway.Metrics() - require.NoError(t, err) - assertCacheState(t, igwMetrics, &expectedCacheState{ - cacheName: "store.index-cache-read.embedded-cache", - misses: 1, - added: 1, - }) - - // ingest logs with ts=now. - require.NoError(t, cliDistributor.PushLogLine("lineC", now, nil, map[string]string{"job": "fake"})) - require.NoError(t, cliDistributor.PushLogLine("lineD", now, nil, map[string]string{"job": "fake"})) - - // default length is 7 days. - resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) - require.NoError(t, err) - assert.Equal(t, "streams", resp.Data.ResultType) - - var lines []string - for _, stream := range resp.Data.Stream { - for _, val := range stream.Values { - lines = append(lines, val[1]) - } - } - // expect lines from both, ingesters memory and from the store. - assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) - -} - func TestOTLPLogsIngestQuery(t *testing.T) { clu := cluster.New(nil, func(c *cluster.Cluster) { c.SetSchemaVer("v13") @@ -859,7 +723,6 @@ func TestCategorizedLabels(t *testing.T) { tIndexGateway = clu.AddComponent( "index-gateway", "-target=index-gateway", - "-tsdb.enable-postings-cache=true", "-store.index-cache-read.embedded-cache.enabled=true", ) ) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 931f33234ea87..da687c5ea9c7b 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -38,7 +38,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/gatewayclient" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/indexgateway" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" ) @@ -336,7 +335,7 @@ type Config struct { MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."` - TSDBShipperConfig tsdb.IndexCfg `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."` + TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."` BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures Bloom Shipper."` // Config for using AsyncStore when using async index stores like `boltdb-shipper`. diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 0bd679a361984..6781dbbff8a3b 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -264,7 +264,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String())) if p.IndexType == config.TSDBType { - if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig.Config) { + if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) { // inject the index-gateway client into the index store gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger, s.metricsNamespace) if err != nil { @@ -284,7 +284,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl } name := fmt.Sprintf("%s_%s", p.ObjectType, p.From.String()) - indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(name, p.IndexTables.PathPrefix, s.cfg.TSDBShipperConfig, s.schemaCfg, f, objectClient, s.limits, tableRange, indexClientReg, indexClientLogger, s.indexReadCache) + indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(name, p.IndexTables.PathPrefix, s.cfg.TSDBShipperConfig, s.schemaCfg, f, objectClient, s.limits, tableRange, indexClientReg, indexClientLogger) if err != nil { return nil, nil, nil, err } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 380c410b7b54b..9311de2090bd5 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -31,7 +31,6 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/pkg/util/constants" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/marshal" @@ -1016,7 +1015,7 @@ func TestStore_indexPrefixChange(t *testing.T) { cfg := Config{ FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")}, - TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig}, + TSDBShipperConfig: shipperConfig, NamedStores: NamedStores{ Filesystem: map[string]NamedFSConfig{ "named-store": {Directory: path.Join(tempDir, "named-store")}, @@ -1197,7 +1196,7 @@ func TestStore_MultiPeriod(t *testing.T) { cfg := Config{ FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")}, BoltDBShipperConfig: boltdb.IndexCfg{Config: shipperConfig}, - TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig, CachePostings: false}, + TSDBShipperConfig: shipperConfig, NamedStores: NamedStores{ Filesystem: map[string]NamedFSConfig{ "named-store": {Directory: path.Join(tempDir, "named-store")}, @@ -1521,7 +1520,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { boltdbShipperConfig.IngesterName = ingesterName // config for tsdb Shipper - tsdbShipperConfig := tsdb.IndexCfg{} + tsdbShipperConfig := indexshipper.Config{} flagext.DefaultValues(&tsdbShipperConfig) tsdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "tsdb-index") tsdbShipperConfig.CacheLocation = path.Join(tempDir, "tsdb-shipper-cache") diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/cached_postings_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/cached_postings_index.go deleted file mode 100644 index 71058a0359efa..0000000000000 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/cached_postings_index.go +++ /dev/null @@ -1,157 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only -// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/postings_codec.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Thanos Authors. - -package tsdb - -import ( - "context" - "fmt" - "sort" - "strings" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - - "github.com/grafana/loki/pkg/storage/chunk/cache" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -type PostingsReader interface { - ForPostings(ctx context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error -} - -// NewCachedPostingsReader uses the cache defined by `index_read_cache` to store and read Postings. -// -// The cache key is stored/read as `matchers:reader_checksum`. -// -// The cache value is stored as: `[n, refs...]`, where n is how many series references this entry has, and refs is -// a sequence of series references encoded as the diff between the current series and the previous one. -// -// Example: if the postings for stream "app=kubernetes,env=production" is `[1,7,30,50]` and its reader has `checksum=12345`: -// - The cache key for the entry will be: `app=kubernetes,env=production:12345` -// - The cache value for the entry will be: [4, 1, 6, 23, 20]. -func NewCachedPostingsReader(reader IndexReader, logger log.Logger, cacheClient cache.Cache) PostingsReader { - return &cachedPostingsReader{ - reader: reader, - cacheClient: cacheClient, - log: logger, - } -} - -type cachedPostingsReader struct { - reader IndexReader - - cacheClient cache.Cache - - log log.Logger -} - -func (c *cachedPostingsReader) ForPostings(ctx context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error { - checksum := c.reader.Checksum() - key := fmt.Sprintf("%s:%d", CanonicalLabelMatchersKey(matchers), checksum) - if postings, got := c.fetchPostings(ctx, key); got { - return fn(postings) - } - - p, err := PostingsForMatchers(c.reader, nil, matchers...) - if err != nil { - return fmt.Errorf("failed to evaluate postings for matchers: %w", err) - } - - expandedPosts, err := index.ExpandPostings(p) - if err != nil { - return fmt.Errorf("failed to expand postings: %w", err) - } - - if err := c.storePostings(ctx, expandedPosts, key); err != nil { - level.Error(c.log).Log("msg", "failed to cache postings", "err", err, "matchers", key) - } - - // because `index.ExpandPostings` walks the iterator, we have to reset it current index by instantiating a new ListPostings. - return fn(index.NewListPostings(expandedPosts)) -} - -func (c *cachedPostingsReader) storePostings(ctx context.Context, expandedPostings []storage.SeriesRef, canonicalMatchers string) error { - buf, err := diffVarintSnappyEncode(index.NewListPostings(expandedPostings), len(expandedPostings)) - if err != nil { - return fmt.Errorf("couldn't encode postings: %w", err) - } - - return c.cacheClient.Store(ctx, []string{canonicalMatchers}, [][]byte{buf}) -} - -func (c *cachedPostingsReader) fetchPostings(ctx context.Context, key string) (index.Postings, bool) { - found, bufs, _, err := c.cacheClient.Fetch(ctx, []string{key}) - - if err != nil { - level.Error(c.log).Log("msg", "error on fetching postings", "err", err, "matchers", key) - return nil, false - } - - if len(found) > 0 { - // we only use a single key so we only care about index=0. - p, err := decodeToPostings(bufs[0]) - if err != nil { - level.Error(c.log).Log("msg", "failed to fetch postings", "err", err) - return nil, false - } - return p, true - } - - return nil, false -} - -func decodeToPostings(b []byte) (index.Postings, error) { - p, err := diffVarintSnappyDecode(b) - if err != nil { - return nil, fmt.Errorf("couldn't decode postings: %w", err) - } - - return p, nil -} - -// CanonicalLabelMatchersKey creates a canonical version of LabelMatchersKey -func CanonicalLabelMatchersKey(ms []*labels.Matcher) string { - sorted := make([]labels.Matcher, len(ms)) - for i := range ms { - sorted[i] = labels.Matcher{Type: ms[i].Type, Name: ms[i].Name, Value: ms[i].Value} - } - sort.Sort(sorteableLabelMatchers(sorted)) - - const ( - typeLen = 2 - sepLen = 1 - ) - var size int - for _, m := range sorted { - size += len(m.Name) + len(m.Value) + typeLen + sepLen - } - sb := strings.Builder{} - sb.Grow(size) - for _, m := range sorted { - sb.WriteString(m.Name) - sb.WriteString(m.Type.String()) - sb.WriteString(m.Value) - sb.WriteByte(',') - } - return sb.String() -} - -type sorteableLabelMatchers []labels.Matcher - -func (c sorteableLabelMatchers) Less(i, j int) bool { - if c[i].Name != c[j].Name { - return c[i].Name < c[j].Name - } - if c[i].Type != c[j].Type { - return c[i].Type < c[j].Type - } - return c[i].Value < c[j].Value -} - -func (c sorteableLabelMatchers) Len() int { return len(c) } -func (c sorteableLabelMatchers) Swap(i, j int) { c[i], c[j] = c[j], c[i] } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/cached_postings_index_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/cached_postings_index_test.go deleted file mode 100644 index 4a7b0f8ee1fec..0000000000000 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/cached_postings_index_test.go +++ /dev/null @@ -1,505 +0,0 @@ -package tsdb - -import ( - "context" - "math/rand" - "sort" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/pkg/storage/chunk/cache" - "github.com/grafana/loki/pkg/storage/stores/index/stats" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -func TestSingleIdxCached(t *testing.T) { - // setup cache. - cfg := cache.EmbeddedCacheConfig{MaxSizeMB: 1} - c := cache.NewEmbeddedCache("test-cache", cfg, nil, log.NewNopLogger(), "test") - defer c.Stop() - - cases := []LoadableSeries{ - { - Labels: mustParseLabels(`{foo="bar"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 0, - MaxTime: 3, - Checksum: 0, - }, - { - MinTime: 1, - MaxTime: 4, - Checksum: 1, - }, - { - MinTime: 2, - MaxTime: 5, - Checksum: 2, - }, - }, - }, - { - Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 1, - MaxTime: 10, - Checksum: 3, - }, - }, - }, - { - Labels: mustParseLabels(`{foo="bard", bazz="bozz", bonk="borb"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 1, - MaxTime: 7, - Checksum: 4, - }, - }, - }, - } - - for _, variant := range []struct { - desc string - fn func() Index - }{ - { - desc: "file", - fn: func() Index { - return BuildIndex(t, t.TempDir(), cases, IndexOpts{PostingsCache: c}) - }, - }, - { - desc: "head", - fn: func() Index { - head := NewHead("fake", NewMetrics(nil), log.NewNopLogger()) - for _, x := range cases { - _, _ = head.Append(x.Labels, x.Labels.Hash(), x.Chunks) - } - reader := head.Index() - return NewTSDBIndex(reader, NewPostingsReader(reader)) - }, - }, - } { - t.Run(variant.desc, func(t *testing.T) { - idx := variant.fn() - t.Run("GetChunkRefs", func(t *testing.T) { - var err error - refs := make([]ChunkRef, 0, 8) - refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - require.Nil(t, err) - - expected := []ChunkRef{ - { - User: "fake", - Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), - Start: 0, - End: 3, - Checksum: 0, - }, - { - User: "fake", - Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), - Start: 1, - End: 4, - Checksum: 1, - }, - { - User: "fake", - Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), - Start: 2, - End: 5, - Checksum: 2, - }, - { - User: "fake", - Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash()), - Start: 1, - End: 10, - Checksum: 3, - }, - } - require.Equal(t, expected, refs) - }) - - t.Run("GetChunkRefsSharded", func(t *testing.T) { - shard := index.ShardAnnotation{ - Shard: 1, - Of: 2, - } - var err error - refs := make([]ChunkRef, 0, 8) - refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - - require.Nil(t, err) - - require.Equal(t, []ChunkRef{{ - User: "fake", - Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash()), - Start: 1, - End: 10, - Checksum: 3, - }}, refs) - - }) - - t.Run("Series", func(t *testing.T) { - xs, err := idx.Series(context.Background(), "fake", 8, 9, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - require.Nil(t, err) - - expected := []Series{ - { - Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`), - Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash()), - }, - } - require.Equal(t, expected, xs) - }) - - t.Run("SeriesSharded", func(t *testing.T) { - shard := index.ShardAnnotation{ - Shard: 0, - Of: 2, - } - - xs, err := idx.Series(context.Background(), "fake", 0, 10, nil, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - require.Nil(t, err) - - expected := []Series{ - { - Labels: mustParseLabels(`{foo="bar"}`), - Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), - }, - } - require.Equal(t, expected, xs) - }) - - t.Run("LabelNames", func(t *testing.T) { - // request data at the end of the tsdb range, but it should return all labels present - ls, err := idx.LabelNames(context.Background(), "fake", 9, 10) - require.Nil(t, err) - sort.Strings(ls) - require.Equal(t, []string{"bazz", "bonk", "foo"}, ls) - }) - - t.Run("LabelNamesWithMatchers", func(t *testing.T) { - // request data at the end of the tsdb range, but it should return all labels present - ls, err := idx.LabelNames(context.Background(), "fake", 9, 10, labels.MustNewMatcher(labels.MatchEqual, "bazz", "buzz")) - require.Nil(t, err) - sort.Strings(ls) - require.Equal(t, []string{"bazz", "foo"}, ls) - }) - - t.Run("LabelValues", func(t *testing.T) { - vs, err := idx.LabelValues(context.Background(), "fake", 9, 10, "foo") - require.Nil(t, err) - sort.Strings(vs) - require.Equal(t, []string{"bar", "bard"}, vs) - }) - - t.Run("LabelValuesWithMatchers", func(t *testing.T) { - vs, err := idx.LabelValues(context.Background(), "fake", 9, 10, "foo", labels.MustNewMatcher(labels.MatchEqual, "bazz", "buzz")) - require.Nil(t, err) - require.Equal(t, []string{"bar"}, vs) - }) - - }) - } - -} - -func BenchmarkCacheableTSDBIndex_GetChunkRefs(b *testing.B) { - // setup cache. - cfg := cache.EmbeddedCacheConfig{MaxSizeMB: 1} - c := cache.NewEmbeddedCache("test-cache", cfg, nil, log.NewNopLogger(), "test") - defer c.Stop() - - now := model.Now() - queryFrom, queryThrough := now.Add(3*time.Hour).Add(time.Millisecond), now.Add(5*time.Hour).Add(-time.Millisecond) - queryBounds := newBounds(queryFrom, queryThrough) - numChunksToMatch := 0 - - var chunkMetas []index.ChunkMeta - // build a chunk for every second with randomized chunk length - for from, through := now, now.Add(24*time.Hour); from <= through; from = from.Add(time.Second) { - // randomize chunk length between 1-120 mins - chunkLenMin := rand.Intn(120) - if chunkLenMin == 0 { - chunkLenMin = 1 - } - chunkMeta := index.ChunkMeta{ - MinTime: int64(from), - MaxTime: int64(from.Add(time.Duration(chunkLenMin) * time.Minute)), - Checksum: uint32(from), - Entries: 1, - } - chunkMetas = append(chunkMetas, chunkMeta) - if Overlap(chunkMeta, queryBounds) { - numChunksToMatch++ - } - } - - tempDir := b.TempDir() - tsdbIndex := BuildIndex(b, tempDir, []LoadableSeries{ - { - Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), - Chunks: chunkMetas, - }, - { - Labels: mustParseLabels(`{foo="bar", ping="pong"}`), - Chunks: chunkMetas, - }, - { - Labels: mustParseLabels(`{foo1="bar1", ping="pong"}`), - Chunks: chunkMetas, - }, - }, IndexOpts{PostingsCache: c}) - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - chkRefs := ChunkRefsPool.Get() - chkRefs, _ = tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, chkRefs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - ChunkRefsPool.Put(chkRefs) - } -} - -func TestCacheableTSDBIndex_Stats(t *testing.T) { - // setup cache. - cfg := cache.EmbeddedCacheConfig{MaxSizeMB: 1} - c := cache.NewEmbeddedCache("test-cache", cfg, nil, log.NewNopLogger(), "test") - defer c.Stop() - - series := []LoadableSeries{ - { - Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 0, - MaxTime: 10, - Checksum: 1, - Entries: 10, - KB: 10, - }, - { - MinTime: 10, - MaxTime: 20, - Checksum: 2, - Entries: 20, - KB: 20, - }, - }, - }, - { - Labels: mustParseLabels(`{foo="bar", ping="pong"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 0, - MaxTime: 10, - Checksum: 3, - Entries: 30, - KB: 30, - }, - { - MinTime: 10, - MaxTime: 20, - Checksum: 4, - Entries: 40, - KB: 40, - }, - }, - }, - } - - // Create the TSDB index - tempDir := t.TempDir() - - // Create the test cases - testCases := []struct { - name string - from model.Time - through model.Time - expected stats.Stats - expectedErr error - }{ - { - name: "from at the beginning of one chunk and through at the end of another chunk", - from: 0, - through: 20, - expected: stats.Stats{ - Streams: 2, - Chunks: 4, - Bytes: (10 + 20 + 30 + 40) * 1024, - Entries: 10 + 20 + 30 + 40, - }, - }, - { - name: "from inside one chunk and through inside another chunk", - from: 5, - through: 15, - expected: stats.Stats{ - Streams: 2, - Chunks: 4, - Bytes: (10*0.5 + 20*0.5 + 30*0.5 + 40*0.5) * 1024, - Entries: 10*0.5 + 20*0.5 + 30*0.5 + 40*0.5, - }, - }, - { - name: "from inside one chunk and through at the end of another chunk", - from: 5, - through: 20, - expected: stats.Stats{ - Streams: 2, - Chunks: 4, - Bytes: (10*0.5 + 20 + 30*0.5 + 40) * 1024, - Entries: 10*0.5 + 20 + 30*0.5 + 40, - }, - }, - { - name: "from at the beginning of one chunk and through inside another chunk", - from: 0, - through: 15, - expected: stats.Stats{ - Streams: 2, - Chunks: 4, - Bytes: (10 + 20*0.5 + 30 + 40*0.5) * 1024, - Entries: 10 + 20*0.5 + 30 + 40*0.5, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{PostingsCache: c}) - acc := &stats.Stats{} - err := tsdbIndex.Stats(context.Background(), "fake", tc.from, tc.through, acc, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - require.Equal(t, tc.expectedErr, err) - require.Equal(t, tc.expected, *acc) - }) - } -} - -func BenchmarkSeriesRepetitive(b *testing.B) { - // setup cache. - cfg := cache.EmbeddedCacheConfig{MaxSizeMB: 1} - c := cache.NewEmbeddedCache("test-cache", cfg, nil, log.NewNopLogger(), "test") - defer c.Stop() - - series := []LoadableSeries{ - { - Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 0, - MaxTime: 10, - Checksum: 1, - Entries: 10, - KB: 10, - }, - { - MinTime: 10, - MaxTime: 20, - Checksum: 2, - Entries: 20, - KB: 20, - }, - }, - }, - { - Labels: mustParseLabels(`{foo="bar", ping="pong"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 0, - MaxTime: 10, - Checksum: 3, - Entries: 30, - KB: 30, - }, - { - MinTime: 10, - MaxTime: 20, - Checksum: 4, - Entries: 40, - KB: 40, - }, - }, - }, - } - tempDir := b.TempDir() - tsdbIndex := BuildIndex(b, tempDir, series, IndexOpts{PostingsCache: c}) - acc := &stats.Stats{} - - for i := 0; i < b.N; i++ { - tsdbIndex.Stats(context.Background(), "fake", 5, 15, acc, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck - } -} - -func TestMultipleIndexesFiles(t *testing.T) { - // setup cache. - cfg := cache.EmbeddedCacheConfig{MaxSizeMB: 1} - c := cache.NewEmbeddedCache("test-cache", cfg, nil, log.NewNopLogger(), "test") - defer c.Stop() - - series := []LoadableSeries{ - { - Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 0, - MaxTime: 10, - Checksum: 1, - Entries: 10, - KB: 10, - }, - }, - }, - { - Labels: mustParseLabels(`{foo="bar", ping="pong"}`), - Chunks: []index.ChunkMeta{ - { - MinTime: 0, - MaxTime: 10, - Checksum: 3, - Entries: 30, - KB: 30, - }, - }, - }, - } - tempDir := t.TempDir() - tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{PostingsCache: c}) - - refs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", 5, 10, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck - require.NoError(t, err) - require.Len(t, refs, 2) - - // repeat the same index, it hits the cache. - refs, err = tsdbIndex.GetChunkRefs(context.Background(), "fake", 5, 10, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck - require.NoError(t, err) - require.Len(t, refs, 2) - - // completely change the index now - series = []LoadableSeries{ - { - Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), - Chunks: []index.ChunkMeta{}, - }, - { - Labels: mustParseLabels(`{foo="bar", ping="pong"}`), - Chunks: []index.ChunkMeta{}, - }, - } - - tempDir = t.TempDir() - tsdbIndex = BuildIndex(t, tempDir, series, IndexOpts{PostingsCache: c}) - refs, err = tsdbIndex.GetChunkRefs(context.Background(), "fake", 5, 10, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck - require.NoError(t, err) - require.Len(t, refs, 0) -} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index dac072d7a34c4..15ee7f1a1d675 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -36,7 +36,7 @@ func (i indexProcessor) NewTableCompactor(ctx context.Context, commonIndexSet co } func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableName, userID, workingDir string, periodConfig config.PeriodConfig, logger log.Logger) (compactor.CompactedIndex, error) { - indexFile, err := OpenShippableTSDB(path, IndexOpts{}) + indexFile, err := OpenShippableTSDB(path) if err != nil { return nil, err } @@ -105,7 +105,7 @@ func (t *tableCompactor) CompactTable() error { } downloadPaths[job] = downloadedAt - idx, err := OpenShippableTSDB(downloadedAt, IndexOpts{}) + idx, err := OpenShippableTSDB(downloadedAt) if err != nil { return err } @@ -233,7 +233,7 @@ func setupBuilder(ctx context.Context, indexType int, userID string, sourceIndex } }() - indexFile, err := OpenShippableTSDB(path, IndexOpts{}) + indexFile, err := OpenShippableTSDB(path) if err != nil { return nil, err } @@ -402,7 +402,7 @@ func (c *compactedIndex) ToIndexFile() (shipperindex.Index, error) { return nil, err } - return NewShippableTSDBFile(id, IndexOpts{}) + return NewShippableTSDBFile(id) } func getUnsafeBytes(s string) []byte { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go index 47f102f17c77f..d9f6382c2d2b0 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go @@ -730,8 +730,7 @@ func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx return } - reader := tenant.indexRange(int64(from), int64(through)) - idx = NewTSDBIndex(reader, NewPostingsReader(reader)) + idx = NewTSDBIndex(tenant.indexRange(int64(from), int64(through))) if t.chunkFilter != nil { idx.SetChunkFilterer(t.chunkFilter) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go index 3f8e3af64405c..4eba91e29b036 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go @@ -535,7 +535,7 @@ func TestBuildLegacyWALs(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - store, stop, err := NewStore(tc.store, "index/", IndexCfg{Config: shipperCfg}, schemaCfg, nil, fsObjectClient, &zeroValueLimits{}, tc.tableRange, nil, log.NewNopLogger(), nil) + store, stop, err := NewStore(tc.store, "index/", shipperCfg, schemaCfg, nil, fsObjectClient, &zeroValueLimits{}, tc.tableRange, nil, log.NewNopLogger()) require.Nil(t, err) refs, err := store.GetChunkRefs( context.Background(), diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index.go index 97a5bf8a60d8a..69f4c26765883 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index.go @@ -2,13 +2,11 @@ package tsdb import ( "context" - "flag" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) @@ -24,18 +22,6 @@ type ChunkRef struct { Checksum uint32 } -type IndexCfg struct { - indexshipper.Config `yaml:",inline"` - - CachePostings bool `yaml:"enable_postings_cache" category:"experimental"` -} - -func (cfg *IndexCfg) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.BoolVar(&cfg.CachePostings, prefix+"enable-postings-cache", false, "Experimental. Whether TSDB should cache postings or not. The index-read-cache will be used as the backend.") - - cfg.Config.RegisterFlagsWithPrefix(prefix, f) -} - // Compares by (Start, End) // Assumes User is equivalent func (r ChunkRef) Less(x ChunkRef) bool { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index_client_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index_client_test.go index 92e3aacdba4a2..596e53e62009f 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index_client_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index_client_test.go @@ -59,7 +59,7 @@ func BenchmarkIndexClient_Stats(b *testing.B) { Labels: mustParseLabels(`{foo="bar"}`), Chunks: buildChunkMetas(int64(indexStartToday), int64(indexStartToday+99)), }, - }, IndexOpts{}), + }), }, tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): { @@ -68,7 +68,7 @@ func BenchmarkIndexClient_Stats(b *testing.B) { Labels: mustParseLabels(`{foo="bar"}`), Chunks: buildChunkMetas(int64(indexStartYesterday), int64(indexStartYesterday+99)), }, - }, IndexOpts{}), + }), }, } @@ -118,7 +118,7 @@ func TestIndexClient_Stats(t *testing.T) { Labels: mustParseLabels(`{fizz="buzz"}`), Chunks: buildChunkMetas(int64(indexStartToday), int64(indexStartToday+99), 10), }, - }, IndexOpts{}), + }), }, tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): { @@ -135,7 +135,7 @@ func TestIndexClient_Stats(t *testing.T) { Labels: mustParseLabels(`{ping="pong"}`), Chunks: buildChunkMetas(int64(indexStartYesterday), int64(indexStartYesterday+99), 10), }, - }, IndexOpts{}), + }), }, } @@ -247,7 +247,7 @@ func TestIndexClient_Volume(t *testing.T) { Labels: mustParseLabels(`{fizz="buzz"}`), Chunks: buildChunkMetas(int64(indexStartToday), int64(indexStartToday+99), 10), }, - }, IndexOpts{}), + }), }, tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): { @@ -264,7 +264,7 @@ func TestIndexClient_Volume(t *testing.T) { Labels: mustParseLabels(`{ping="pong"}`), Chunks: buildChunkMetas(int64(indexStartYesterday), int64(indexStartYesterday+99), 10), }, - }, IndexOpts{}), + }), }, } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go b/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go index 3b9c10fc3fed2..78ef447169ccd 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/manager.go @@ -129,7 +129,7 @@ func (m *tsdbManager) Start() (err error) { indices++ prefixed := NewPrefixedIdentifier(id, filepath.Join(mulitenantDir, bucket), "") - loaded, err := NewShippableTSDBFile(prefixed, IndexOpts{}) + loaded, err := NewShippableTSDBFile(prefixed) if err != nil { level.Warn(m.log).Log( @@ -230,7 +230,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start)) - loaded, err := NewShippableTSDBFile(dst, IndexOpts{}) + loaded, err := NewShippableTSDBFile(dst) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index_test.go index 829c942e7b6d5..945402f954f5b 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index_test.go @@ -61,7 +61,7 @@ func TestMultiIndex(t *testing.T) { var indices []Index dir := t.TempDir() for i := 0; i < n; i++ { - indices = append(indices, BuildIndex(t, dir, cases, IndexOpts{})) + indices = append(indices, BuildIndex(t, dir, cases)) } idx := NewMultiIndex(IndexSlice(indices)) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/postings_codec.go b/pkg/storage/stores/shipper/indexshipper/tsdb/postings_codec.go deleted file mode 100644 index 629024c67728d..0000000000000 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/postings_codec.go +++ /dev/null @@ -1,137 +0,0 @@ -package tsdb - -import ( - "bytes" - - "github.com/golang/snappy" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/storage" - promEncoding "github.com/prometheus/prometheus/tsdb/encoding" - - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" - "github.com/grafana/loki/pkg/util/encoding" -) - -type codec string - -const ( - codecHeaderSnappy codec = "dvs" // As in "diff+varint+snappy". -) - -// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. -func isDiffVarintSnappyEncodedPostings(input []byte) bool { - return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) -} - -func diffVarintSnappyDecode(input []byte) (index.Postings, error) { - if !isDiffVarintSnappyEncodedPostings(input) { - return nil, errors.New(string(codecHeaderSnappy) + " header not found") - } - - offset := len(codecHeaderSnappy) - - raw, err := snappy.Decode(nil, input[offset:]) - if err != nil { - return nil, errors.Wrap(err, "snappy decode") - } - - return newDiffVarintPostings(raw), nil -} - -func newDiffVarintPostings(input []byte) *diffVarintPostings { - return &diffVarintPostings{buf: &promEncoding.Decbuf{B: input}} -} - -// diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data. -type diffVarintPostings struct { - buf *promEncoding.Decbuf - cur storage.SeriesRef -} - -func (it *diffVarintPostings) At() storage.SeriesRef { - return it.cur -} - -func (it *diffVarintPostings) Next() bool { - if it.buf.Err() != nil || it.buf.Len() == 0 { - return false - } - - val := it.buf.Uvarint64() - if it.buf.Err() != nil { - return false - } - - it.cur = it.cur + storage.SeriesRef(val) - return true -} - -func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool { - if it.cur >= x { - return true - } - - // We cannot do any search due to how values are stored, - // so we simply advance until we find the right value. - for it.Next() { - if it.At() >= x { - return true - } - } - - return false -} - -func (it *diffVarintPostings) Err() error { - return it.buf.Err() -} - -// diffVarintSnappyEncode encodes postings into diff+varint representation, -// and applies snappy compression on the result. -// Returned byte slice starts with codecHeaderSnappy header. -// Length argument is expected number of postings, used for preallocating buffer. -func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) { - buf, err := diffVarintEncodeNoHeader(p, length) - if err != nil { - return nil, err - } - - // Make result buffer large enough to hold our header and compressed block. - result := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) - copy(result, codecHeaderSnappy) - - compressed := snappy.Encode(result[len(codecHeaderSnappy):], buf) - - // Slice result buffer based on compressed size. - result = result[:len(codecHeaderSnappy)+len(compressed)] - return result, nil -} - -// diffVarintEncodeNoHeader encodes postings into diff+varint representation. -// It doesn't add any header to the output bytes. -func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) { - buf := encoding.Encbuf{} - - // This encoding uses around ~1 bytes per posting, but let's use - // conservative 1.25 bytes per posting to avoid extra allocations. - if length > 0 { - buf.B = make([]byte, 0, 5*length/4) - } - - prev := storage.SeriesRef(0) - for p.Next() { - v := p.At() - if v < prev { - return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev) - } - - // This is the 'diff' part -- compute difference from previous value. - buf.PutUvarint64(uint64(v - prev)) - prev = v - } - if p.Err() != nil { - return nil, p.Err() - } - - return buf.B, nil -} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/postings_codec_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/postings_codec_test.go deleted file mode 100644 index e42724d512ba4..0000000000000 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/postings_codec_test.go +++ /dev/null @@ -1,207 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only -// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/postings_codec_test.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Thanos Authors. -// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/storepb/testutil/series.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Thanos Authors. - -package tsdb - -import ( - "context" - "os" - "testing" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -const ( - // labelLongSuffix is a label with ~50B in size, to emulate real-world high cardinality. - labelLongSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" -) - -func TestDiffVarintCodec(t *testing.T) { - chunksDir := t.TempDir() - - headOpts := tsdb.DefaultHeadOptions() - headOpts.ChunkDirRoot = chunksDir - headOpts.ChunkRange = 1000 - h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, h.Close()) - assert.NoError(t, os.RemoveAll(chunksDir)) - }) - - idx, err := h.Index() - assert.NoError(t, err) - t.Cleanup(func() { - assert.NoError(t, idx.Close()) - }) - - postingsMap := map[string]index.Postings{ - `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+labelLongSuffix)), - `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), - `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), - `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), - `i=~".+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), - `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), - `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), - `i!=""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), - `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+labelLongSuffix)), - `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), - } - - codecs := map[string]struct { - codingFunction func(index.Postings, int) ([]byte, error) - decodingFunction func([]byte) (index.Postings, error) - }{ - "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { - return newDiffVarintPostings(bytes), nil - }}, - "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, - } - - for postingName, postings := range postingsMap { - p, err := toUint64Postings(postings) - require.NoError(t, err) - - for cname, codec := range codecs { - name := cname + "/" + postingName - - t.Run(name, func(t *testing.T) { - p.reset() // We reuse postings between runs, so we need to reset iterator. - - data, err := codec.codingFunction(p, p.len()) - require.NoError(t, err) - - t.Log("encoded size", len(data), "bytes") - t.Logf("ratio: %0.3f", float64(len(data))/float64(4*p.len())) - - decodedPostings, err := codec.decodingFunction(data) - require.NoError(t, err) - - p.reset() - comparePostings(t, p, decodedPostings) - }) - } - } -} - -func TestLabelMatchersTypeValues(t *testing.T) { - expectedValues := map[labels.MatchType]int{ - labels.MatchEqual: 0, - labels.MatchNotEqual: 1, - labels.MatchRegexp: 2, - labels.MatchNotRegexp: 3, - } - - for matcherType, val := range expectedValues { - require.Equal(t, int(labels.MustNewMatcher(matcherType, "", "").Type), val, - "diffVarintSnappyWithMatchersEncode relies on the number values of hte matchers not changing. "+ - "It caches each matcher type as these integer values. "+ - "If the integer values change, then the already cached values in the index cache will be improperly decoded.") - } -} - -func comparePostings(t *testing.T, p1, p2 index.Postings) { - for p1.Next() { - require.True(t, p2.Next()) - require.Equal(t, p1.At(), p2.At()) - } - - if p2.Next() { - t.Fatal("p2 has more values") - return - } - - require.NoError(t, p1.Err()) - require.NoError(t, p2.Err()) -} - -func matchPostings(t testing.TB, ix tsdb.IndexReader, m *labels.Matcher) index.Postings { - vals, err := ix.LabelValues(context.Background(), m.Name) - assert.NoError(t, err) - - matching := []string(nil) - for _, v := range vals { - if m.Matches(v) { - matching = append(matching, v) - } - } - - p, err := ix.Postings(context.Background(), m.Name, matching...) - assert.NoError(t, err) - return p -} - -func toUint64Postings(p index.Postings) (*uint64Postings, error) { - var vals []storage.SeriesRef - for p.Next() { - vals = append(vals, p.At()) - } - return &uint64Postings{vals: vals, ix: -1}, p.Err() -} - -// Postings with no decoding step. -type uint64Postings struct { - vals []storage.SeriesRef - ix int -} - -func (p *uint64Postings) At() storage.SeriesRef { - if p.ix < 0 || p.ix >= len(p.vals) { - return 0 - } - return p.vals[p.ix] -} - -func (p *uint64Postings) Next() bool { - if p.ix < len(p.vals)-1 { - p.ix++ - return true - } - return false -} - -func (p *uint64Postings) Seek(x storage.SeriesRef) bool { - if p.At() >= x { - return true - } - - // We cannot do any search due to how values are stored, - // so we simply advance until we find the right value. - for p.Next() { - if p.At() >= x { - return true - } - } - - return false -} - -func (p *uint64Postings) Err() error { - return nil -} - -func (p *uint64Postings) reset() { - p.ix = -1 -} - -func (p *uint64Postings) len() int { - return len(p.vals) -} - -func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { - k, v := index.AllPostingsKey() - p, err := ix.Postings(context.Background(), k, v) - assert.NoError(t, err) - return p -} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index e6c9866b0227e..448fd2d4bc3b2 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -3,7 +3,6 @@ package tsdb import ( "context" "errors" - "fmt" "io" "math" "path/filepath" @@ -16,7 +15,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" @@ -29,22 +27,17 @@ var ErrAlreadyOnDesiredVersion = errors.New("tsdb file already on desired versio // GetRawFileReaderFunc returns an io.ReadSeeker for reading raw tsdb file from disk type GetRawFileReaderFunc func() (io.ReadSeeker, error) -type IndexOpts struct { - PostingsCache cache.Cache -} - -func OpenShippableTSDB(p string, opts IndexOpts) (shipperindex.Index, error) { +func OpenShippableTSDB(p string) (shipperindex.Index, error) { id, err := identifierFromPath(p) if err != nil { return nil, err } - return NewShippableTSDBFile(id, opts) + return NewShippableTSDBFile(id) } func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipperindex.Index, error) { - opts := IndexOpts{} - indexFile, err := OpenShippableTSDB(path, opts) + indexFile, err := OpenShippableTSDB(path) if err != nil { return nil, err } @@ -83,7 +76,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp if err != nil { return nil, err } - return NewShippableTSDBFile(id, IndexOpts{}) + return NewShippableTSDBFile(id) } // nolint @@ -99,8 +92,8 @@ type TSDBFile struct { getRawFileReader GetRawFileReaderFunc } -func NewShippableTSDBFile(id Identifier, opts IndexOpts) (*TSDBFile, error) { - idx, getRawFileReader, err := NewTSDBIndexFromFile(id.Path(), opts) +func NewShippableTSDBFile(id Identifier) (*TSDBFile, error) { + idx, getRawFileReader, err := NewTSDBIndexFromFile(id.Path()) if err != nil { return nil, err } @@ -125,54 +118,26 @@ func (f *TSDBFile) Reader() (io.ReadSeeker, error) { // and translates the IndexReader to an Index implementation // It loads the file into memory and doesn't keep a file descriptor open type TSDBIndex struct { - reader IndexReader - chunkFilter chunk.RequestChunkFilterer - postingsReader PostingsReader + reader IndexReader + chunkFilter chunk.RequestChunkFilterer } // Return the index as well as the underlying raw file reader which isn't exposed as an index // method but is helpful for building an io.reader for the index shipper -func NewTSDBIndexFromFile(location string, opts IndexOpts) (Index, GetRawFileReaderFunc, error) { +func NewTSDBIndexFromFile(location string) (*TSDBIndex, GetRawFileReaderFunc, error) { reader, err := index.NewFileReader(location) if err != nil { return nil, nil, err } - postingsReader := getPostingsReader(reader, opts.PostingsCache) - tsdbIdx := NewTSDBIndex(reader, postingsReader) - - return tsdbIdx, func() (io.ReadSeeker, error) { + return NewTSDBIndex(reader), func() (io.ReadSeeker, error) { return reader.RawFileReader() }, nil } -func getPostingsReader(reader IndexReader, postingsCache cache.Cache) PostingsReader { - if postingsCache != nil { - return NewCachedPostingsReader(reader, util_log.Logger, postingsCache) - } - return NewPostingsReader(reader) -} - -func NewPostingsReader(reader IndexReader) PostingsReader { - return &defaultPostingsReader{reader: reader} -} - -type defaultPostingsReader struct { - reader IndexReader -} - -func (s *defaultPostingsReader) ForPostings(_ context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error { - p, err := PostingsForMatchers(s.reader, nil, matchers...) - if err != nil { - return err - } - return fn(p) -} - -func NewTSDBIndex(reader IndexReader, postingsReader PostingsReader) *TSDBIndex { +func NewTSDBIndex(reader IndexReader) *TSDBIndex { return &TSDBIndex{ - reader: reader, - postingsReader: postingsReader, + reader: reader, } } @@ -203,7 +168,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, filterer = i.chunkFilter.ForRequest(ctx) } - return i.postingsReader.ForPostings(ctx, matchers, func(p index.Postings) error { + return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { for p.Next() { hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks) if err != nil { @@ -226,6 +191,20 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, } +func (i *TSDBIndex) forPostings( + _ context.Context, + shard *index.ShardAnnotation, + _, _ model.Time, + matchers []*labels.Matcher, + fn func(index.Postings) error, +) error { + p, err := PostingsForMatchers(i.reader, shard, matchers...) + if err != nil { + return err + } + return fn(p) +} + func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { if res == nil { res = ChunkRefsPool.Get() @@ -301,7 +280,7 @@ func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier { } func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, _ shouldIncludeChunk, matchers ...*labels.Matcher) error { - return i.postingsReader.ForPostings(ctx, matchers, func(p index.Postings) error { + return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { // TODO(owen-d): use pool var ls labels.Labels var filterer chunk.Filterer @@ -310,10 +289,9 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim } for p.Next() { - seriesRef := p.At() - fp, stats, err := i.reader.ChunkStats(seriesRef, int64(from), int64(through), &ls) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) if err != nil { - return fmt.Errorf("stats: chunk stats: %w, seriesRef: %d", err, seriesRef) + return err } // skip series that belong to different shards @@ -376,7 +354,7 @@ func (i *TSDBIndex) Volume( aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" - return i.postingsReader.ForPostings(ctx, matchers, func(p index.Postings) error { + return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { var ls labels.Labels var filterer chunk.Filterer if i.chunkFilter != nil { @@ -386,7 +364,7 @@ func (i *TSDBIndex) Volume( for p.Next() { fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) if err != nil { - return fmt.Errorf("series volume: %w", err) + return err } // skip series that belong to different shards diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go index ffc5a0871fed4..9d7b80ce161f6 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go @@ -70,7 +70,7 @@ func TestSingleIdx(t *testing.T) { { desc: "file", fn: func() Index { - return BuildIndex(t, t.TempDir(), cases, IndexOpts{}) + return BuildIndex(t, t.TempDir(), cases) }, }, { @@ -81,7 +81,7 @@ func TestSingleIdx(t *testing.T) { _, _ = head.Append(x.Labels, x.Labels.Hash(), x.Chunks) } reader := head.Index() - return NewTSDBIndex(reader, NewPostingsReader(reader)) + return NewTSDBIndex(reader) }, }, } { @@ -213,6 +213,7 @@ func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { queryFrom, queryThrough := now.Add(3*time.Hour).Add(time.Millisecond), now.Add(5*time.Hour).Add(-time.Millisecond) queryBounds := newBounds(queryFrom, queryThrough) numChunksToMatch := 0 + var chunkMetas []index.ChunkMeta // build a chunk for every second with randomized chunk length for from, through := now, now.Add(24*time.Hour); from <= through; from = from.Add(time.Second) { @@ -247,14 +248,14 @@ func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { Labels: mustParseLabels(`{foo1="bar1", ping="pong"}`), Chunks: chunkMetas, }, - }, IndexOpts{}) + }) b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - chkRefs := ChunkRefsPool.Get() - chkRefs, _ = tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, chkRefs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - ChunkRefsPool.Put(chkRefs) + chkRefs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.NoError(b, err) + require.Len(b, chkRefs, numChunksToMatch*2) } } @@ -302,7 +303,7 @@ func TestTSDBIndex_Stats(t *testing.T) { // Create the TSDB index tempDir := t.TempDir() - tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{}) + tsdbIndex := BuildIndex(t, tempDir, series) // Create the test cases testCases := []struct { @@ -436,7 +437,7 @@ func TestTSDBIndex_Volume(t *testing.T) { // Create the TSDB index tempDir := t.TempDir() - tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{}) + tsdbIndex := BuildIndex(t, tempDir, series) from := model.TimeFromUnixNano(t1.UnixNano()) through := model.TimeFromUnixNano(t2.UnixNano()) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/store.go b/pkg/storage/stores/shipper/indexshipper/tsdb/store.go index 14c6c08c534e9..8f97997c5d401 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/store.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/store.go @@ -14,14 +14,12 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" - shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) @@ -40,7 +38,7 @@ type store struct { // NewStore creates a new tsdb index ReaderWriter. func NewStore( name, prefix string, - indexShipperCfg IndexCfg, + indexShipperCfg indexshipper.Config, schemaCfg config.SchemaConfig, _ *fetcher.Fetcher, objectClient client.ObjectClient, @@ -48,7 +46,6 @@ func NewStore( tableRange config.TableRange, reg prometheus.Registerer, logger log.Logger, - idxCache cache.Cache, ) ( index.ReaderWriter, func(), @@ -59,38 +56,28 @@ func NewStore( logger: logger, } - if err := storeInstance.init(name, prefix, indexShipperCfg, schemaCfg, objectClient, limits, tableRange, reg, idxCache); err != nil { + if err := storeInstance.init(name, prefix, indexShipperCfg, schemaCfg, objectClient, limits, tableRange, reg); err != nil { return nil, nil, err } return storeInstance, storeInstance.Stop, nil } -func (s *store) init(name, prefix string, indexCfg IndexCfg, schemaCfg config.SchemaConfig, objectClient client.ObjectClient, - limits downloads.Limits, tableRange config.TableRange, reg prometheus.Registerer, idxCache cache.Cache) error { - - var sharedCache cache.Cache - if indexCfg.CachePostings && indexCfg.Mode == indexshipper.ModeReadOnly && idxCache != nil { - sharedCache = idxCache - } - - openFn := func(p string) (shipperindex.Index, error) { - return OpenShippableTSDB(p, IndexOpts{PostingsCache: sharedCache}) - } +func (s *store) init(name, prefix string, indexShipperCfg indexshipper.Config, schemaCfg config.SchemaConfig, objectClient client.ObjectClient, + limits downloads.Limits, tableRange config.TableRange, reg prometheus.Registerer) error { var err error s.indexShipper, err = indexshipper.NewIndexShipper( prefix, - indexCfg.Config, + indexShipperCfg, objectClient, limits, nil, - openFn, + OpenShippableTSDB, tableRange, prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", reg), s.logger, ) - if err != nil { return err } @@ -98,7 +85,7 @@ func (s *store) init(name, prefix string, indexCfg IndexCfg, schemaCfg config.Sc var indices []Index opts := DefaultIndexClientOptions() - if indexCfg.Mode == indexshipper.ModeWriteOnly { + if indexShipperCfg.Mode == indexshipper.ModeWriteOnly { // We disable bloom filters on write nodes // for the Stats() methods as it's of relatively little // benefit when compared to the memory cost. The bloom filters @@ -108,8 +95,8 @@ func (s *store) init(name, prefix string, indexCfg IndexCfg, schemaCfg config.Sc opts.UseBloomFilters = false } - if indexCfg.Mode != indexshipper.ModeReadOnly { - nodeName, err := indexCfg.GetUniqueUploaderName() + if indexShipperCfg.Mode != indexshipper.ModeReadOnly { + nodeName, err := indexShipperCfg.GetUniqueUploaderName() if err != nil { return err } @@ -118,7 +105,7 @@ func (s *store) init(name, prefix string, indexCfg IndexCfg, schemaCfg config.Sc tsdbManager := NewTSDBManager( name, nodeName, - indexCfg.ActiveIndexDirectory, + indexShipperCfg.ActiveIndexDirectory, s.indexShipper, tableRange, schemaCfg, @@ -129,7 +116,7 @@ func (s *store) init(name, prefix string, indexCfg IndexCfg, schemaCfg config.Sc headManager := NewHeadManager( name, s.logger, - indexCfg.ActiveIndexDirectory, + indexShipperCfg.ActiveIndexDirectory, tsdbMetrics, tsdbManager, ) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/util_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/util_test.go index 13e2c6f54e0ac..64827a926e466 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/util_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/util_test.go @@ -17,7 +17,7 @@ type LoadableSeries struct { Chunks index.ChunkMetas } -func BuildIndex(t testing.TB, dir string, cases []LoadableSeries, opts IndexOpts) *TSDBFile { +func BuildIndex(t testing.TB, dir string, cases []LoadableSeries) *TSDBFile { b := NewBuilder(index.FormatV3) for _, s := range cases { @@ -35,7 +35,7 @@ func BuildIndex(t testing.TB, dir string, cases []LoadableSeries, opts IndexOpts }) require.Nil(t, err) - idx, err := NewShippableTSDBFile(dst, opts) + idx, err := NewShippableTSDBFile(dst) require.Nil(t, err) return idx } diff --git a/tools/tsdb/bloom-tester/lib.go b/tools/tsdb/bloom-tester/lib.go index 433e8d6badb8b..7eefb56342c40 100644 --- a/tools/tsdb/bloom-tester/lib.go +++ b/tools/tsdb/bloom-tester/lib.go @@ -53,12 +53,12 @@ func execute() { chunkClient := client.NewClient(objectClient, nil, conf.SchemaConfig) openFn := func(p string) (shipperindex.Index, error) { - return tsdb.OpenShippableTSDB(p, tsdb.IndexOpts{}) + return tsdb.OpenShippableTSDB(p) } indexShipper, err := indexshipper.NewIndexShipper( periodCfg.IndexTables.PathPrefix, - conf.StorageConfig.TSDBShipperConfig.Config, + conf.StorageConfig.TSDBShipperConfig, objectClient, overrides, nil, diff --git a/tools/tsdb/bloom-tester/readlib.go b/tools/tsdb/bloom-tester/readlib.go index 77e0e74167ea2..ee0456a6e3ccd 100644 --- a/tools/tsdb/bloom-tester/readlib.go +++ b/tools/tsdb/bloom-tester/readlib.go @@ -72,12 +72,12 @@ func executeRead() { chunkClient := client.NewClient(objectClient, nil, conf.SchemaConfig) openFn := func(p string) (shipperindex.Index, error) { - return tsdb.OpenShippableTSDB(p, tsdb.IndexOpts{}) + return tsdb.OpenShippableTSDB(p) } indexShipper, err := indexshipper.NewIndexShipper( periodCfg.IndexTables.PathPrefix, - conf.StorageConfig.TSDBShipperConfig.Config, + conf.StorageConfig.TSDBShipperConfig, objectClient, overrides, nil, diff --git a/tools/tsdb/index-analyzer/main.go b/tools/tsdb/index-analyzer/main.go index 93aeabd2b895f..fd59bd4792fdf 100644 --- a/tools/tsdb/index-analyzer/main.go +++ b/tools/tsdb/index-analyzer/main.go @@ -7,7 +7,6 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/tools/tsdb/helpers" @@ -28,17 +27,13 @@ func main() { objectClient, err := storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics) helpers.ExitErr("creating object client", err) - openFn := func(p string) (index.Index, error) { - return tsdb.OpenShippableTSDB(p, tsdb.IndexOpts{}) - } - shipper, err := indexshipper.NewIndexShipper( periodCfg.IndexTables.PathPrefix, - conf.StorageConfig.TSDBShipperConfig.Config, + conf.StorageConfig.TSDBShipperConfig, objectClient, overrides, nil, - openFn, + tsdb.OpenShippableTSDB, tableRange, prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", prometheus.DefaultRegisterer), util_log.Logger, diff --git a/tools/tsdb/migrate-versions/main_test.go b/tools/tsdb/migrate-versions/main_test.go index ec009457cd82a..2f4690fde0a7e 100644 --- a/tools/tsdb/migrate-versions/main_test.go +++ b/tools/tsdb/migrate-versions/main_test.go @@ -90,7 +90,7 @@ func TestMigrateTables(t *testing.T) { require.NoError(t, err) tableName := fmt.Sprintf("%s%d", indexPrefix, i) - idx, err := tsdb.NewShippableTSDBFile(id, tsdb.IndexOpts{}) + idx, err := tsdb.NewShippableTSDBFile(id) require.NoError(t, err) require.NoError(t, uploadFile(idx, indexStorageClient, tableName, userID)) diff --git a/tools/tsdb/tsdb-map/main_test.go b/tools/tsdb/tsdb-map/main_test.go index 25e358b51b67a..480c723431e22 100644 --- a/tools/tsdb/tsdb-map/main_test.go +++ b/tools/tsdb/tsdb-map/main_test.go @@ -93,7 +93,7 @@ func BenchmarkQuery_GetChunkRefs(b *testing.B) { if err != nil { panic(err) } - idx := tsdb.NewTSDBIndex(reader, tsdb.NewPostingsReader(reader)) + idx := tsdb.NewTSDBIndex(reader) b.Run(bm.name, func(b *testing.B) { refs := tsdb.ChunkRefsPool.Get() for i := 0; i < b.N; i++ { @@ -118,7 +118,7 @@ func BenchmarkQuery_GetChunkRefsSharded(b *testing.B) { if err != nil { panic(err) } - idx := tsdb.NewTSDBIndex(reader, tsdb.NewPostingsReader(reader)) + idx := tsdb.NewTSDBIndex(reader) shardFactor := 16 b.Run(bm.name, func(b *testing.B) {