Skip to content

Commit

Permalink
Revert "Introduces cache to TSDB postings (#9621)"
Browse files Browse the repository at this point in the history
This reverts commit 1221658.
  • Loading branch information
DylanGuedes committed Nov 7, 2023
1 parent cea2e0e commit abaf558
Show file tree
Hide file tree
Showing 25 changed files with 84 additions and 1,288 deletions.
5 changes: 0 additions & 5 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2229,11 +2229,6 @@ tsdb_shipper:

[ingesterdbretainperiod: <duration>]

# Experimental. Whether TSDB should cache postings or not. The
# index-read-cache will be used as the backend.
# CLI flag: -tsdb.enable-postings-cache
[enable_postings_cache: <boolean> | default = false]

# Configures Bloom Shipper.
bloom_shipper:
# Working directory to store downloaded Bloom Blocks.
Expand Down
137 changes: 0 additions & 137 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,142 +579,6 @@ func TestSchedulerRing(t *testing.T) {
})
}

func TestQueryTSDB_WithCachedPostings(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDB)

defer func() {
assert.NoError(t, clu.Cleanup())
}()

var (
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
"-tsdb.enable-postings-cache=true",
"-store.index-cache-read.embedded-cache.enabled=true",
)
)
require.NoError(t, clu.Run())

var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now

// initial cache state.
igwMetrics, err := cliIndexGateway.Metrics()
require.NoError(t, err)
assertCacheState(t, igwMetrics, &expectedCacheState{
cacheName: "store.index-cache-read.embedded-cache",
misses: 0,
added: 0,
})

t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLine("lineA", time.Now().Add(-72*time.Hour), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineB", time.Now().Add(-48*time.Hour), nil, map[string]string{"job": "fake"}))
})

// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())

// Query lines
t.Run("query to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}

assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines)
})

igwMetrics, err = cliIndexGateway.Metrics()
require.NoError(t, err)
assertCacheState(t, igwMetrics, &expectedCacheState{
cacheName: "store.index-cache-read.embedded-cache",
misses: 1,
added: 1,
})

// ingest logs with ts=now.
require.NoError(t, cliDistributor.PushLogLine("lineC", now, nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", now, nil, map[string]string{"job": "fake"}))

// default length is 7 days.
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
// expect lines from both, ingesters memory and from the store.
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)

}

func TestOTLPLogsIngestQuery(t *testing.T) {
clu := cluster.New(nil, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
Expand Down Expand Up @@ -859,7 +723,6 @@ func TestCategorizedLabels(t *testing.T) {
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
"-tsdb.enable-postings-cache=true",
"-store.index-cache-read.embedded-cache.enabled=true",
)
)
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/gatewayclient"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
)
Expand Down Expand Up @@ -336,7 +335,7 @@ type Config struct {

MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."`
TSDBShipperConfig tsdb.IndexCfg `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."`
TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."`
BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures Bloom Shipper."`

// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String()))

if p.IndexType == config.TSDBType {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig.Config) {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger, s.metricsNamespace)
if err != nil {
Expand All @@ -284,7 +284,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
}

name := fmt.Sprintf("%s_%s", p.ObjectType, p.From.String())
indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(name, p.IndexTables.PathPrefix, s.cfg.TSDBShipperConfig, s.schemaCfg, f, objectClient, s.limits, tableRange, indexClientReg, indexClientLogger, s.indexReadCache)
indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(name, p.IndexTables.PathPrefix, s.cfg.TSDBShipperConfig, s.schemaCfg, f, objectClient, s.limits, tableRange, indexClientReg, indexClientLogger)
if err != nil {
return nil, nil, nil, err
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/util/constants"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
Expand Down Expand Up @@ -1016,7 +1015,7 @@ func TestStore_indexPrefixChange(t *testing.T) {

cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig},
TSDBShipperConfig: shipperConfig,
NamedStores: NamedStores{
Filesystem: map[string]NamedFSConfig{
"named-store": {Directory: path.Join(tempDir, "named-store")},
Expand Down Expand Up @@ -1197,7 +1196,7 @@ func TestStore_MultiPeriod(t *testing.T) {
cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
BoltDBShipperConfig: boltdb.IndexCfg{Config: shipperConfig},
TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig, CachePostings: false},
TSDBShipperConfig: shipperConfig,
NamedStores: NamedStores{
Filesystem: map[string]NamedFSConfig{
"named-store": {Directory: path.Join(tempDir, "named-store")},
Expand Down Expand Up @@ -1521,7 +1520,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
boltdbShipperConfig.IngesterName = ingesterName

// config for tsdb Shipper
tsdbShipperConfig := tsdb.IndexCfg{}
tsdbShipperConfig := indexshipper.Config{}
flagext.DefaultValues(&tsdbShipperConfig)
tsdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "tsdb-index")
tsdbShipperConfig.CacheLocation = path.Join(tempDir, "tsdb-shipper-cache")
Expand Down
157 changes: 0 additions & 157 deletions pkg/storage/stores/shipper/indexshipper/tsdb/cached_postings_index.go

This file was deleted.

Loading

0 comments on commit abaf558

Please sign in to comment.