diff --git a/pkg/storage/chunk/cache/background_test.go b/pkg/storage/chunk/cache/background_test.go index af9827865ee7a..ffc2a7bf1712a 100644 --- a/pkg/storage/chunk/cache/background_test.go +++ b/pkg/storage/chunk/cache/background_test.go @@ -24,17 +24,13 @@ func TestBackground(t *testing.T) { WriteBackSizeLimit: flagext.ByteSize(limit), }, cache.NewMockCache(), nil) - s := config.SchemaConfig{ - Configs: []config.PeriodConfig{ - { - From: config.DayTime{Time: 0}, - Schema: "v11", - RowShards: 16, - }, - }, + p := config.PeriodConfig{ + From: config.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, } - keys, chunks := fillCache(t, s, c) + keys, chunks := fillCache(t, p, c) cache.Flush(c) testCacheSingle(t, c, keys, chunks) diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index e65339066ad44..f4d5e04f19ae2 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -24,7 +24,7 @@ import ( const userID = "1" -func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]string, []chunk.Chunk) { +func fillCache(t *testing.T, p config.PeriodConfig, cache cache.Cache) ([]string, []chunk.Chunk) { const chunkLen = 13 * 3600 // in seconds // put a set of chunks, larger than background batch size, with varying timestamps and values @@ -74,7 +74,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str err = cleanChunk.Decode(chunk.NewDecodeContext(), buf) require.NoError(t, err) - keys = append(keys, scfg.ExternalKey(c.ChunkRef)) + keys = append(keys, p.ExternalKey(c.ChunkRef)) bufs = append(bufs, buf) chunks = append(chunks, cleanChunk) } @@ -120,37 +120,28 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks [] require.Equal(t, chunks, result) } -func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) { - s := config.SchemaConfig{ - Configs: []config.PeriodConfig{ - { - From: config.DayTime{Time: 0}, - Schema: "v11", - RowShards: 16, - }, - }, - } +func testChunkFetcher(t *testing.T, p config.PeriodConfig, c cache.Cache, chunks []chunk.Chunk) { - fetcher, err := fetcher.New(c, nil, false, s, nil, 10, 100, 0) + fetcher, err := fetcher.New(c, nil, false, p, nil, 10, 100, 0) require.NoError(t, err) defer fetcher.Stop() found, err := fetcher.FetchChunks(context.Background(), chunks) require.NoError(t, err) - sort.Sort(byExternalKey{found, s}) - sort.Sort(byExternalKey{chunks, s}) + sort.Sort(byExternalKey{found, p}) + sort.Sort(byExternalKey{chunks, p}) require.Equal(t, chunks, found) } type byExternalKey struct { chunks []chunk.Chunk - scfg config.SchemaConfig + cfg config.PeriodConfig } func (a byExternalKey) Len() int { return len(a.chunks) } func (a byExternalKey) Swap(i, j int) { a.chunks[i], a.chunks[j] = a.chunks[j], a.chunks[i] } func (a byExternalKey) Less(i, j int) bool { - return a.scfg.ExternalKey(a.chunks[i].ChunkRef) < a.scfg.ExternalKey(a.chunks[j].ChunkRef) + return a.cfg.ExternalKey(a.chunks[i].ChunkRef) < a.cfg.ExternalKey(a.chunks[j].ChunkRef) } func testCacheMiss(t *testing.T, cache cache.Cache) { @@ -164,16 +155,13 @@ func testCacheMiss(t *testing.T, cache cache.Cache) { } func testCache(t *testing.T, cache cache.Cache) { - s := config.SchemaConfig{ - Configs: []config.PeriodConfig{ - { - From: config.DayTime{Time: 0}, - Schema: "v11", - RowShards: 16, - }, - }, + p := config.PeriodConfig{ + From: config.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, } - keys, chunks := fillCache(t, s, cache) + + keys, chunks := fillCache(t, p, cache) t.Run("Single", func(t *testing.T) { testCacheSingle(t, cache, keys, chunks) }) @@ -184,7 +172,7 @@ func testCache(t *testing.T, cache cache.Cache) { testCacheMiss(t, cache) }) t.Run("Fetcher", func(t *testing.T) { - testChunkFetcher(t, cache, chunks) + testChunkFetcher(t, p, cache, chunks) }) } diff --git a/pkg/storage/chunk/fetcher/fetcher.go b/pkg/storage/chunk/fetcher/fetcher.go index fd90f685e981e..1c6679a2155d2 100644 --- a/pkg/storage/chunk/fetcher/fetcher.go +++ b/pkg/storage/chunk/fetcher/fetcher.go @@ -58,7 +58,7 @@ const chunkDecodeParallelism = 16 // and writing back any misses to the cache. Also responsible for decoding // chunks from the cache, in parallel. type Fetcher struct { - schema config.SchemaConfig + schema config.PeriodConfig storage client.Client cache cache.Cache cachel2 cache.Cache @@ -89,7 +89,7 @@ type decodeResponse struct { } // New makes a new ChunkFetcher. -func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) { +func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.PeriodConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) { c := &Fetcher{ schema: schema, storage: storage, diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index d451372eec124..3504a467bc42e 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -193,7 +193,7 @@ func Test(t *testing.T) { assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart)) // Build fetcher - f, err := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff) + f, err := New(c1, c2, false, sc.Configs[0], chunkClient, 1, 1, test.handoff) assert.NoError(t, err) // Run the test @@ -290,7 +290,7 @@ func BenchmarkFetch(b *testing.B) { _ = chunkClient.PutChunks(context.Background(), test.storeStart) // Build fetcher - f, _ := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff) + f, _ := New(c1, c2, false, sc.Configs[0], chunkClient, 1, 1, test.handoff) for i := 0; i < b.N; i++ { _, err := f.FetchChunks(context.Background(), test.fetch) diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index f2eaa9f3733db..d4a9fe380eb8b 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -241,6 +241,20 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.fileName, "schema-config-file", "", "The path to the schema config file. The schema config is used only when running Cortex with the chunks storage.") } +// Load the yaml file, or build the config from legacy command-line flags +func (cfg *SchemaConfig) Load() error { + if len(cfg.Configs) > 0 { + return nil + } + + // Load config from file. + if err := cfg.loadFromFile(); err != nil { + return err + } + + return cfg.Validate() +} + // loadFromFile loads the schema config from a yaml file func (cfg *SchemaConfig) loadFromFile() error { if cfg.fileName == "" { @@ -448,20 +462,6 @@ func (cfg PeriodConfig) validate() error { return nil } -// Load the yaml file, or build the config from legacy command-line flags -func (cfg *SchemaConfig) Load() error { - if len(cfg.Configs) > 0 { - return nil - } - - // Load config from file. - if err := cfg.loadFromFile(); err != nil { - return err - } - - return cfg.Validate() -} - func (cfg *PeriodConfig) VersionAsInt() (int, error) { // Read memoized schema version. This is called during unmarshaling, // but may be nil in the case of testware. @@ -507,6 +507,15 @@ func ValidatePathPrefix(prefix string) error { return nil } +// Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From +func (cfg *PeriodConfig) ExternalKey(ref logproto.ChunkRef) string { + v, err := cfg.VersionAsInt() + if err == nil && v >= 12 { + return newerExternalKey(ref) + } + return newExternalKey(ref) +} + // PeriodicTableConfig is configuration for a set of time-sharded tables. type PeriodicTableConfig struct { Prefix string `yaml:"prefix" doc:"description=Table prefix for all period tables."` @@ -672,11 +681,10 @@ func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string { // Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From func (cfg SchemaConfig) ExternalKey(ref logproto.ChunkRef) string { p, err := cfg.SchemaForTime(ref.From) - v, _ := p.VersionAsInt() - if err == nil && v >= 12 { - return newerExternalKey(ref) + if err != nil { + return newExternalKey(ref) } - return newExternalKey(ref) + return p.ExternalKey(ref) } // VersionForChunk will return the schema version associated with the `From` timestamp of a chunk. diff --git a/pkg/storage/factory_test.go b/pkg/storage/factory_test.go index 2588c9dc69dd1..8916791fa1ddf 100644 --- a/pkg/storage/factory_test.go +++ b/pkg/storage/factory_test.go @@ -161,6 +161,7 @@ func TestNamedStores(t *testing.T) { t.Run("period config referring to unrecognized store", func(t *testing.T) { schemaConfig := schemaConfig + cfg := cfg schemaConfig.Configs[0].ObjectType = "not-found" _, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger, constants.Loki) require.Error(t, err) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index cb07929eeabd1..4a425f20ef65e 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -190,7 +190,7 @@ func (s *LokiStore) init() error { if err != nil { return err } - f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff) + f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), p, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff) if err != nil { return err } @@ -294,7 +294,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl } indexReaderWriter = index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg) - chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, indexReaderWriter, s.storeCfg.DisableIndexDeduplication) + chunkWriter := stores.NewChunkWriter(f, p, indexReaderWriter, s.storeCfg.DisableIndexDeduplication) return chunkWriter, indexReaderWriter, func() { @@ -320,7 +320,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl indexReaderWriter := series.NewIndexReaderWriter(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize, s.writeDedupeCache) monitoredReaderWriter := index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg) - chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, monitoredReaderWriter, s.storeCfg.DisableIndexDeduplication) + chunkWriter := stores.NewChunkWriter(f, p, monitoredReaderWriter, s.storeCfg.DisableIndexDeduplication) return chunkWriter, monitoredReaderWriter, diff --git a/pkg/storage/stores/series_store_write.go b/pkg/storage/stores/series_store_write.go index db22c5caa1202..a9445673ef1d2 100644 --- a/pkg/storage/stores/series_store_write.go +++ b/pkg/storage/stores/series_store_write.go @@ -38,17 +38,18 @@ var ( }) ) +// Writer implements ChunkWriter type Writer struct { - schemaCfg config.SchemaConfig + periodCfg config.PeriodConfig DisableIndexDeduplication bool indexWriter index.Writer fetcher *fetcher.Fetcher } -func NewChunkWriter(fetcher *fetcher.Fetcher, schemaCfg config.SchemaConfig, indexWriter index.Writer, disableIndexDeduplication bool) ChunkWriter { +func NewChunkWriter(fetcher *fetcher.Fetcher, periodCfg config.PeriodConfig, indexWriter index.Writer, disableIndexDeduplication bool) *Writer { return &Writer{ - schemaCfg: schemaCfg, + periodCfg: periodCfg, DisableIndexDeduplication: disableIndexDeduplication, fetcher: fetcher, indexWriter: indexWriter, @@ -83,7 +84,7 @@ func (c *Writer) PutOne(ctx context.Context, from, through model.Time, chk chunk } // If this chunk is in cache it must already be in the database so we don't need to write it again - found, _, _, _ := c.fetcher.Cache().Fetch(ctx, []string{c.schemaCfg.ExternalKey(chk.ChunkRef)}) + found, _, _, _ := c.fetcher.Cache().Fetch(ctx, []string{c.periodCfg.ExternalKey(chk.ChunkRef)}) if len(found) > 0 && !overlap { writeChunk = false diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index 9c8c2f4069333..d2ce3f5c10fe0 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -81,10 +81,6 @@ func TestChunkWriter_PutOne(t *testing.T) { Schema: "v13", } - schemaConfig := config.SchemaConfig{ - Configs: []config.PeriodConfig{periodConfig}, - } - chunkfmt, headfmt, err := periodConfig.ChunkFormat() require.NoError(t, err) @@ -144,7 +140,7 @@ func TestChunkWriter_PutOne(t *testing.T) { t.Run(name, func(t *testing.T) { cache := &mockCache{} if tc.populateCache { - cacheKey := schemaConfig.ExternalKey(chk.ChunkRef) + cacheKey := periodConfig.ExternalKey(chk.ChunkRef) cache = &mockCache{ data: map[string]string{ cacheKey: "foo", @@ -155,10 +151,10 @@ func TestChunkWriter_PutOne(t *testing.T) { idx := &mockIndexWriter{} client := &mockChunksClient{} - f, err := fetcher.New(cache, nil, false, schemaConfig, client, 1, 1, 0) + f, err := fetcher.New(cache, nil, false, periodConfig, client, 1, 1, 0) require.NoError(t, err) - cw := NewChunkWriter(f, schemaConfig, idx, true) + cw := NewChunkWriter(f, periodConfig, idx, true) err = cw.PutOne(context.Background(), tc.from, tc.through, chk) require.NoError(t, err) diff --git a/pkg/storage/stores/testutils.go b/pkg/storage/stores/testutils.go index 1a721b19ce626..54a4c392e1417 100644 --- a/pkg/storage/stores/testutils.go +++ b/pkg/storage/stores/testutils.go @@ -46,10 +46,18 @@ func NewMockChunkStore(chunkFormat byte, headfmt chunkenc.HeadBlockFmt, streams } func NewMockChunkStoreWithChunks(chunks []chunk.Chunk) *MockChunkStore { + schemas := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + {From: config.DayTime{Time: 0}, Schema: "v13"}, + }, + } return &MockChunkStore{ - schemas: config.SchemaConfig{}, + schemas: schemas, chunks: chunks, - client: &mockChunkStoreClient{chunks: chunks, scfg: config.SchemaConfig{}}, + client: &mockChunkStoreClient{ + chunks: chunks, + schemas: schemas, + }, } } @@ -113,10 +121,11 @@ func (m *MockChunkStore) GetChunkFetcher(_ model.Time) *fetcher.Fetcher { } func (m *MockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { + periodCfg := m.schemas.Configs[0] refs := make([]chunk.Chunk, 0, len(m.chunks)) // transform real chunks into ref chunks. for _, c := range m.chunks { - r, err := chunk.ParseExternalKey("fake", m.schemas.ExternalKey(c.ChunkRef)) + r, err := chunk.ParseExternalKey("fake", periodCfg.ExternalKey(c.ChunkRef)) if err != nil { panic(err) } @@ -128,7 +137,7 @@ func (m *MockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, panic(err) } - f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 10, 100, 0) + f, err := fetcher.New(cache, nil, false, periodCfg, m.client, 10, 100, 0) if err != nil { panic(err) } @@ -144,8 +153,8 @@ func (m *MockChunkStore) Volume(_ context.Context, _ string, _, _ model.Time, _ } type mockChunkStoreClient struct { - chunks []chunk.Chunk - scfg config.SchemaConfig + schemas config.SchemaConfig + chunks []chunk.Chunk } func (m mockChunkStoreClient) Stop() { @@ -161,7 +170,7 @@ func (m mockChunkStoreClient) GetChunks(_ context.Context, chunks []chunk.Chunk) for _, c := range chunks { for _, sc := range m.chunks { // only returns chunks requested using the external key - if m.scfg.ExternalKey(c.ChunkRef) == m.scfg.ExternalKey(sc.ChunkRef) { + if m.schemas.ExternalKey(c.ChunkRef) == m.schemas.ExternalKey(sc.ChunkRef) { res = append(res, sc) } }