Skip to content

Commit

Permalink
Make ChunkFetcher and ChunkWriter accept PeriodConfig
Browse files Browse the repository at this point in the history
instead of full SchemaConfig

Both stores only ever operate on a single schema period and the schema
was only used to determine the external key of a chunk. This can be done
by the PeriodConfig as well.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Nov 2, 2023
1 parent 2509281 commit d978e47
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 79 deletions.
14 changes: 5 additions & 9 deletions pkg/storage/chunk/cache/background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@ func TestBackground(t *testing.T) {
WriteBackSizeLimit: flagext.ByteSize(limit),
}, cache.NewMockCache(), nil)

s := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
p := config.PeriodConfig{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
}

keys, chunks := fillCache(t, s, c)
keys, chunks := fillCache(t, p, c)
cache.Flush(c)

testCacheSingle(t, c, keys, chunks)
Expand Down
42 changes: 15 additions & 27 deletions pkg/storage/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

const userID = "1"

func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]string, []chunk.Chunk) {
func fillCache(t *testing.T, p config.PeriodConfig, cache cache.Cache) ([]string, []chunk.Chunk) {
const chunkLen = 13 * 3600 // in seconds

// put a set of chunks, larger than background batch size, with varying timestamps and values
Expand Down Expand Up @@ -74,7 +74,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str
err = cleanChunk.Decode(chunk.NewDecodeContext(), buf)
require.NoError(t, err)

keys = append(keys, scfg.ExternalKey(c.ChunkRef))
keys = append(keys, p.ExternalKey(c.ChunkRef))
bufs = append(bufs, buf)
chunks = append(chunks, cleanChunk)
}
Expand Down Expand Up @@ -120,37 +120,28 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []
require.Equal(t, chunks, result)
}

func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) {
s := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
}
func testChunkFetcher(t *testing.T, p config.PeriodConfig, c cache.Cache, chunks []chunk.Chunk) {

fetcher, err := fetcher.New(c, nil, false, s, nil, 10, 100, 0)
fetcher, err := fetcher.New(c, nil, false, p, nil, 10, 100, 0)
require.NoError(t, err)
defer fetcher.Stop()

found, err := fetcher.FetchChunks(context.Background(), chunks)
require.NoError(t, err)
sort.Sort(byExternalKey{found, s})
sort.Sort(byExternalKey{chunks, s})
sort.Sort(byExternalKey{found, p})
sort.Sort(byExternalKey{chunks, p})
require.Equal(t, chunks, found)
}

type byExternalKey struct {
chunks []chunk.Chunk
scfg config.SchemaConfig
cfg config.PeriodConfig
}

func (a byExternalKey) Len() int { return len(a.chunks) }
func (a byExternalKey) Swap(i, j int) { a.chunks[i], a.chunks[j] = a.chunks[j], a.chunks[i] }
func (a byExternalKey) Less(i, j int) bool {
return a.scfg.ExternalKey(a.chunks[i].ChunkRef) < a.scfg.ExternalKey(a.chunks[j].ChunkRef)
return a.cfg.ExternalKey(a.chunks[i].ChunkRef) < a.cfg.ExternalKey(a.chunks[j].ChunkRef)
}

func testCacheMiss(t *testing.T, cache cache.Cache) {
Expand All @@ -164,16 +155,13 @@ func testCacheMiss(t *testing.T, cache cache.Cache) {
}

func testCache(t *testing.T, cache cache.Cache) {
s := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
p := config.PeriodConfig{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
}
keys, chunks := fillCache(t, s, cache)

keys, chunks := fillCache(t, p, cache)
t.Run("Single", func(t *testing.T) {
testCacheSingle(t, cache, keys, chunks)
})
Expand All @@ -184,7 +172,7 @@ func testCache(t *testing.T, cache cache.Cache) {
testCacheMiss(t, cache)
})
t.Run("Fetcher", func(t *testing.T) {
testChunkFetcher(t, cache, chunks)
testChunkFetcher(t, p, cache, chunks)
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const chunkDecodeParallelism = 16
// and writing back any misses to the cache. Also responsible for decoding
// chunks from the cache, in parallel.
type Fetcher struct {
schema config.SchemaConfig
schema config.PeriodConfig
storage client.Client
cache cache.Cache
cachel2 cache.Cache
Expand Down Expand Up @@ -89,7 +89,7 @@ type decodeResponse struct {
}

// New makes a new ChunkFetcher.
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) {
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.PeriodConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) {
c := &Fetcher{
schema: schema,
storage: storage,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func Test(t *testing.T) {
assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart))

// Build fetcher
f, err := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff)
f, err := New(c1, c2, false, sc.Configs[0], chunkClient, 1, 1, test.handoff)
assert.NoError(t, err)

// Run the test
Expand Down Expand Up @@ -290,7 +290,7 @@ func BenchmarkFetch(b *testing.B) {
_ = chunkClient.PutChunks(context.Background(), test.storeStart)

// Build fetcher
f, _ := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff)
f, _ := New(c1, c2, false, sc.Configs[0], chunkClient, 1, 1, test.handoff)

for i := 0; i < b.N; i++ {
_, err := f.FetchChunks(context.Background(), test.fetch)
Expand Down
44 changes: 26 additions & 18 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.fileName, "schema-config-file", "", "The path to the schema config file. The schema config is used only when running Cortex with the chunks storage.")
}

// Load the yaml file, or build the config from legacy command-line flags
func (cfg *SchemaConfig) Load() error {
if len(cfg.Configs) > 0 {
return nil
}

// Load config from file.
if err := cfg.loadFromFile(); err != nil {
return err
}

return cfg.Validate()
}

// loadFromFile loads the schema config from a yaml file
func (cfg *SchemaConfig) loadFromFile() error {
if cfg.fileName == "" {
Expand Down Expand Up @@ -448,20 +462,6 @@ func (cfg PeriodConfig) validate() error {
return nil
}

// Load the yaml file, or build the config from legacy command-line flags
func (cfg *SchemaConfig) Load() error {
if len(cfg.Configs) > 0 {
return nil
}

// Load config from file.
if err := cfg.loadFromFile(); err != nil {
return err
}

return cfg.Validate()
}

func (cfg *PeriodConfig) VersionAsInt() (int, error) {
// Read memoized schema version. This is called during unmarshaling,
// but may be nil in the case of testware.
Expand Down Expand Up @@ -507,6 +507,15 @@ func ValidatePathPrefix(prefix string) error {
return nil
}

// Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From
func (cfg *PeriodConfig) ExternalKey(ref logproto.ChunkRef) string {
v, err := cfg.VersionAsInt()
if err == nil && v >= 12 {
return newerExternalKey(ref)
}
return newExternalKey(ref)
}

// PeriodicTableConfig is configuration for a set of time-sharded tables.
type PeriodicTableConfig struct {
Prefix string `yaml:"prefix" doc:"description=Table prefix for all period tables."`
Expand Down Expand Up @@ -672,11 +681,10 @@ func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string {
// Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From
func (cfg SchemaConfig) ExternalKey(ref logproto.ChunkRef) string {
p, err := cfg.SchemaForTime(ref.From)
v, _ := p.VersionAsInt()
if err == nil && v >= 12 {
return newerExternalKey(ref)
if err != nil {
return newExternalKey(ref)
}
return newExternalKey(ref)
return p.ExternalKey(ref)
}

// VersionForChunk will return the schema version associated with the `From` timestamp of a chunk.
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func TestNamedStores(t *testing.T) {

t.Run("period config referring to unrecognized store", func(t *testing.T) {
schemaConfig := schemaConfig
cfg := cfg
schemaConfig.Configs[0].ObjectType = "not-found"
_, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger, constants.Loki)
require.Error(t, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *LokiStore) init() error {
if err != nil {
return err
}
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff)
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), p, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff)
if err != nil {
return err
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
}

indexReaderWriter = index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, indexReaderWriter, s.storeCfg.DisableIndexDeduplication)
chunkWriter := stores.NewChunkWriter(f, p, indexReaderWriter, s.storeCfg.DisableIndexDeduplication)

return chunkWriter, indexReaderWriter,
func() {
Expand All @@ -320,7 +320,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl

indexReaderWriter := series.NewIndexReaderWriter(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize, s.writeDedupeCache)
monitoredReaderWriter := index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, monitoredReaderWriter, s.storeCfg.DisableIndexDeduplication)
chunkWriter := stores.NewChunkWriter(f, p, monitoredReaderWriter, s.storeCfg.DisableIndexDeduplication)

return chunkWriter,
monitoredReaderWriter,
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/stores/series_store_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ var (
})
)

// Writer implements ChunkWriter
type Writer struct {
schemaCfg config.SchemaConfig
periodCfg config.PeriodConfig
DisableIndexDeduplication bool

indexWriter index.Writer
fetcher *fetcher.Fetcher
}

func NewChunkWriter(fetcher *fetcher.Fetcher, schemaCfg config.SchemaConfig, indexWriter index.Writer, disableIndexDeduplication bool) ChunkWriter {
func NewChunkWriter(fetcher *fetcher.Fetcher, periodCfg config.PeriodConfig, indexWriter index.Writer, disableIndexDeduplication bool) *Writer {
return &Writer{
schemaCfg: schemaCfg,
periodCfg: periodCfg,
DisableIndexDeduplication: disableIndexDeduplication,
fetcher: fetcher,
indexWriter: indexWriter,
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c *Writer) PutOne(ctx context.Context, from, through model.Time, chk chunk
}

// If this chunk is in cache it must already be in the database so we don't need to write it again
found, _, _, _ := c.fetcher.Cache().Fetch(ctx, []string{c.schemaCfg.ExternalKey(chk.ChunkRef)})
found, _, _, _ := c.fetcher.Cache().Fetch(ctx, []string{c.periodCfg.ExternalKey(chk.ChunkRef)})

if len(found) > 0 && !overlap {
writeChunk = false
Expand Down
10 changes: 3 additions & 7 deletions pkg/storage/stores/series_store_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ func TestChunkWriter_PutOne(t *testing.T) {
Schema: "v13",
}

schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{periodConfig},
}

chunkfmt, headfmt, err := periodConfig.ChunkFormat()
require.NoError(t, err)

Expand Down Expand Up @@ -144,7 +140,7 @@ func TestChunkWriter_PutOne(t *testing.T) {
t.Run(name, func(t *testing.T) {
cache := &mockCache{}
if tc.populateCache {
cacheKey := schemaConfig.ExternalKey(chk.ChunkRef)
cacheKey := periodConfig.ExternalKey(chk.ChunkRef)
cache = &mockCache{
data: map[string]string{
cacheKey: "foo",
Expand All @@ -155,10 +151,10 @@ func TestChunkWriter_PutOne(t *testing.T) {
idx := &mockIndexWriter{}
client := &mockChunksClient{}

f, err := fetcher.New(cache, nil, false, schemaConfig, client, 1, 1, 0)
f, err := fetcher.New(cache, nil, false, periodConfig, client, 1, 1, 0)
require.NoError(t, err)

cw := NewChunkWriter(f, schemaConfig, idx, true)
cw := NewChunkWriter(f, periodConfig, idx, true)

err = cw.PutOne(context.Background(), tc.from, tc.through, chk)
require.NoError(t, err)
Expand Down
23 changes: 16 additions & 7 deletions pkg/storage/stores/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ func NewMockChunkStore(chunkFormat byte, headfmt chunkenc.HeadBlockFmt, streams
}

func NewMockChunkStoreWithChunks(chunks []chunk.Chunk) *MockChunkStore {
schemas := config.SchemaConfig{
Configs: []config.PeriodConfig{
{From: config.DayTime{Time: 0}, Schema: "v13"},
},
}
return &MockChunkStore{
schemas: config.SchemaConfig{},
schemas: schemas,
chunks: chunks,
client: &mockChunkStoreClient{chunks: chunks, scfg: config.SchemaConfig{}},
client: &mockChunkStoreClient{
chunks: chunks,
schemas: schemas,
},
}
}

Expand Down Expand Up @@ -113,10 +121,11 @@ func (m *MockChunkStore) GetChunkFetcher(_ model.Time) *fetcher.Fetcher {
}

func (m *MockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
periodCfg := m.schemas.Configs[0]
refs := make([]chunk.Chunk, 0, len(m.chunks))
// transform real chunks into ref chunks.
for _, c := range m.chunks {
r, err := chunk.ParseExternalKey("fake", m.schemas.ExternalKey(c.ChunkRef))
r, err := chunk.ParseExternalKey("fake", periodCfg.ExternalKey(c.ChunkRef))
if err != nil {
panic(err)
}
Expand All @@ -128,7 +137,7 @@ func (m *MockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time,
panic(err)
}

f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 10, 100, 0)
f, err := fetcher.New(cache, nil, false, periodCfg, m.client, 10, 100, 0)
if err != nil {
panic(err)
}
Expand All @@ -144,8 +153,8 @@ func (m *MockChunkStore) Volume(_ context.Context, _ string, _, _ model.Time, _
}

type mockChunkStoreClient struct {
chunks []chunk.Chunk
scfg config.SchemaConfig
schemas config.SchemaConfig
chunks []chunk.Chunk
}

func (m mockChunkStoreClient) Stop() {
Expand All @@ -161,7 +170,7 @@ func (m mockChunkStoreClient) GetChunks(_ context.Context, chunks []chunk.Chunk)
for _, c := range chunks {
for _, sc := range m.chunks {
// only returns chunks requested using the external key
if m.scfg.ExternalKey(c.ChunkRef) == m.scfg.ExternalKey(sc.ChunkRef) {
if m.schemas.ExternalKey(c.ChunkRef) == m.schemas.ExternalKey(sc.ChunkRef) {
res = append(res, sc)
}
}
Expand Down

0 comments on commit d978e47

Please sign in to comment.