Skip to content

Commit

Permalink
chore(storage): Simplify store interfaces and abstractions (pt 2) (g…
Browse files Browse the repository at this point in the history
…rafana#10454)

**What this PR does / why we need it**:

* Remove preemptive interface from indexReaderWriter and make it an exported struct
    
* Compose IngesterStore and QuerierStore
    
    The ingester and querier do not require a store that implements the full
    interface, but can accept stores that are a subset of the interfaces.
    
    Therefore we can make specific interfaces that are accepted by the
    ingester and querier constructors.
    

* Split up ChunkFetcher interface
    
    This commit splits the ChunkFetcher interface into a ChunkFetcher and a
    ChunkFetcherProvider interface.
    
    This has the advantage that methods can allow accepting smaller interfaces.
    
    This commit also renames the `GetChunkRefs()` method of the ChunkFetcher
    interface into `GetChunks()`, because it returns a `[][]chunk.Chunk`,
    not a `[]chunk.ChunkRef` type.


Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Sep 12, 2023
1 parent ebef2fe commit 2b6bdc2
Show file tree
Hide file tree
Showing 22 changed files with 115 additions and 85 deletions.
1 change: 1 addition & 0 deletions clients/pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/utils"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/push"
lokiflag "github.com/grafana/loki/pkg/util/flagext"
Expand Down
2 changes: 1 addition & 1 deletion cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
var totalBytes uint64
var totalChunks uint64
//log.Printf("%d processing sync range %d - Start: %v, End: %v\n", threadID, sr.number, time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC())
schemaGroups, fetchers, err := m.source.GetChunkRefs(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), m.matchers...)
schemaGroups, fetchers, err := m.source.GetChunks(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), m.matchers...)
if err != nil {
log.Println(threadID, "Error querying index for chunk refs:", err)
errCh <- err
Expand Down
1 change: 1 addition & 0 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/push"
Expand Down
10 changes: 9 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
return nil
}

func (s *testStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return nil
}

func (s *testStore) IsLocal() bool {
return false
}
Expand All @@ -351,7 +355,11 @@ func (s *testStore) SelectSamples(_ context.Context, _ logql.SelectSampleParams)
return nil, nil
}

func (s *testStore) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *testStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
return nil, nil
}

func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

Expand Down
25 changes: 12 additions & 13 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ import (
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
indexstore "github.com/grafana/loki/pkg/storage/stores/index"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -166,15 +167,13 @@ type Wrapper interface {
Wrap(wrapped Interface) Interface
}

// ChunkStore is the interface we need to store chunks.
type ChunkStore interface {
Put(ctx context.Context, chunks []chunk.Chunk) error
SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error)
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error)
GetSchemaConfigs() []config.PeriodConfig
Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error)
Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error)
// Store is the store interface we need on the ingester.
type Store interface {
stores.ChunkWriter
stores.ChunkFetcher
storage.SelectStore
storage.SchemaConfigProvider
indexstore.StatsReader
}

// Interface is an interface for the Ingester
Expand Down Expand Up @@ -211,7 +210,7 @@ type Ingester struct {
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher

store ChunkStore
store Store
periodicConfigs []config.PeriodConfig

loopDone sync.WaitGroup
Expand Down Expand Up @@ -248,7 +247,7 @@ type Ingester struct {
}

// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
Expand Down Expand Up @@ -1006,7 +1005,7 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq
}

// get chunk references
chunksGroups, _, err := i.store.GetChunkRefs(ctx, orgID, start, end, matchers...)
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, matchers...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (s *mockStore) SelectSamples(_ context.Context, _ logql.SelectSampleParams)
return nil, nil
}

func (s *mockStore) GetSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
func (s *mockStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
return nil, nil
}

Expand All @@ -449,7 +449,7 @@ func (s *mockStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) er
return nil
}

func (s *mockStore) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *mockStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/pkg/storage/stores/index"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -100,10 +101,17 @@ type Limits interface {
MaxEntriesLimitPerQuery(context.Context, string) int
}

// Store is the store interface we need on the querier.
type Store interface {
storage.SelectStore
index.BaseReader
index.StatsReader
}

// SingleTenantQuerier handles single tenant queries.
type SingleTenantQuerier struct {
cfg Config
store storage.Store
store Store
limits Limits
ingesterQuerier *IngesterQuerier
deleteGetter deleteGetter
Expand All @@ -115,7 +123,7 @@ type deleteGetter interface {
}

// New makes a new Querier.
func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) {
func New(cfg Config, store Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) {
return &SingleTenantQuerier{
cfg: cfg,
store: store,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *storeMock) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return res.(iter.SampleIterator), args.Error(1)
}

func (s *storeMock) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
args := s.Called(ctx, userID, from, through, matchers)
return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*fetcher.Fetcher), args.Error(2)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/async_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (a *AsyncStore) shouldQueryIngesters(through, now model.Time) bool {
return a.queryIngestersWithin == 0 || through.After(now.Add(-a.queryIngestersWithin))
}

func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (a *AsyncStore) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
spanLogger := spanlogger.FromContext(ctx)

errs := make(chan error)
Expand All @@ -72,7 +72,7 @@ func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, thro
var fetchers []*fetcher.Fetcher
go func() {
var err error
storeChunks, fetchers, err = a.Store.GetChunkRefs(ctx, userID, from, through, matchers...)
storeChunks, fetchers, err = a.Store.GetChunks(ctx, userID, from, through, matchers...)
errs <- err
}()

Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/async_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newStoreMock() *storeMock {
return &storeMock{}
}

func (s *storeMock) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
args := s.Called(ctx, userID, from, through, matchers)
return args.Get(0).([][]chunk.Chunk), args.Get(1).([]*fetcher.Fetcher), args.Error(2)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
store := newStoreMock()
store.On("GetChunkRefs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.storeChunks, tc.storeFetcher, nil)
store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.storeChunks, tc.storeFetcher, nil)
store.On("GetChunkFetcher", mock.Anything).Return(tc.ingesterFetcher)

ingesterQuerier := newIngesterQuerierMock()
Expand All @@ -242,7 +242,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
asyncStoreCfg := AsyncStoreCfg{IngesterQuerier: ingesterQuerier}
asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{})

chunks, fetchers, err := asyncStore.GetChunkRefs(context.Background(), "fake", model.Now(), model.Now(), nil)
chunks, fetchers, err := asyncStore.GetChunks(context.Background(), "fake", model.Now(), model.Now(), nil)
require.NoError(t, err)

require.Equal(t, tc.expectedChunks, chunks)
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {

store := newStoreMock()
store.On("GetChunkRefs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([][]chunk.Chunk{}, []*fetcher.Fetcher{}, nil)
store.On("GetChunks", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([][]chunk.Chunk{}, []*fetcher.Fetcher{}, nil)

ingesterQuerier := newIngesterQuerierMock()
ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)
Expand All @@ -304,7 +304,7 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) {
}
asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{})

_, _, err := asyncStore.GetChunkRefs(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil)
_, _, err := asyncStore.GetChunks(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil)
require.NoError(t, err)

expectedNumCalls := 0
Expand Down
14 changes: 9 additions & 5 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ type SelectStore interface {
SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error)
}

type SchemaConfigProvider interface {
GetSchemaConfigs() []config.PeriodConfig
}

type Store interface {
stores.Store
SelectStore
GetSchemaConfigs() []config.PeriodConfig
SchemaConfigProvider
}

type LokiStore struct {
Expand Down Expand Up @@ -306,11 +310,11 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
}

indexReaderWriter := series.NewIndexReaderWriter(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize, s.writeDedupeCache)
indexReaderWriter = index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, indexReaderWriter, s.storeCfg.DisableIndexDeduplication)
monitoredReaderWriter := index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, monitoredReaderWriter, s.storeCfg.DisableIndexDeduplication)

return chunkWriter,
indexReaderWriter,
monitoredReaderWriter,
func() {
chunkClient.Stop()
f.Stop()
Expand Down Expand Up @@ -382,7 +386,7 @@ func (s *LokiStore) lazyChunks(ctx context.Context, matchers []*labels.Matcher,
stats := stats.FromContext(ctx)

start := time.Now()
chks, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, matchers...)
chks, fetchers, err := s.GetChunks(ctx, userID, from, through, matchers...)
stats.AddChunkRefsFetchTime(time.Since(start))

if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
}

// get all the chunks from the first period
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate), newMatchers(fooLabelsWithName.String())...)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
Expand Down Expand Up @@ -1148,7 +1148,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
}

// get all the chunks from both the stores
chunks, _, err = store.GetChunkRefs(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
chunks, _, err = store.GetChunks(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)

totalChunks = 0
Expand Down Expand Up @@ -1281,7 +1281,7 @@ func TestStore_MultiPeriod(t *testing.T) {
defer store.Stop()

// get all the chunks from both the stores
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
Expand Down Expand Up @@ -1627,7 +1627,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
defer store.Stop()

// get all the chunks from both the stores
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
chunks, _, err := store.GetChunks(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@ type ChunkWriter interface {
PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error
}

type ChunkFetcher interface {
type ChunkFetcherProvider interface {
GetChunkFetcher(tm model.Time) *fetcher.Fetcher
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error)
}

type ChunkFetcher interface {
GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error)
}

type Store interface {
index.BaseReader
index.StatsReader
index.Filterable
ChunkWriter
ChunkFetcher
ChunkFetcherProvider
Stop()
}

Expand Down Expand Up @@ -149,11 +154,11 @@ func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID stri
return result.Strings(), err
}

func (c CompositeStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (c CompositeStore) GetChunks(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
chunkIDs := [][]chunk.Chunk{}
fetchers := []*fetcher.Fetcher{}
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
ids, fetcher, err := store.GetChunkRefs(innerCtx, userID, from, through, matchers...)
ids, fetcher, err := store.GetChunks(innerCtx, userID, from, through, matchers...)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type storeEntry struct {
ChunkWriter
}

func (c *storeEntry) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (c *storeEntry) GetChunks(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "GetChunkRefs")
sp, ctx := opentracing.StartSpanFromContext(ctx, "GetChunks")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m mockStore) LabelValuesForMetricName(_ context.Context, _ string, _, _ mo

func (m mockStore) SetChunkFilterer(_ chunk.RequestChunkFilterer) {}

func (m mockStore) GetChunkRefs(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
func (m mockStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

Expand Down
Loading

0 comments on commit 2b6bdc2

Please sign in to comment.