From 25092817f44d6a5c51a682460252e3295227d048 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 8 Sep 2023 15:39:16 +0200 Subject: [PATCH] Resolve circular imports in test Signed-off-by: Christian Haudum --- pkg/storage/batch_test.go | 9 +- pkg/storage/store_test.go | 40 +--- pkg/storage/stores/async_store_test.go | 12 +- .../stores/shipper/bloomshipper/TODO.md | 10 + pkg/storage/stores/testutils.go | 223 ++++++++++++++++++ pkg/storage/testutils.go | 116 +++++++++ 6 files changed, 374 insertions(+), 36 deletions(-) create mode 100644 pkg/storage/stores/shipper/bloomshipper/TODO.md create mode 100644 pkg/storage/stores/testutils.go create mode 100644 pkg/storage/testutils.go diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 574b79e706400..22c964810ffbf 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores" ) var NilMetrics = NewChunkMetrics(nil, 0) @@ -1010,7 +1011,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { t.Fatalf("error reading batch %s", err) } - assertStream(t, tt.expected, streams.Streams) + AssertStream(t, tt.expected, streams.Streams) }) } } @@ -1429,7 +1430,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { t.Fatalf("error reading batch %s", err) } - assertSeries(t, tt.expected, series.Series) + AssertSeries(t, tt.expected, series.Series) }) } } @@ -1660,7 +1661,7 @@ func TestBuildHeapIterator(t *testing.T) { if err != nil { t.Fatalf("error reading batch %s", err) } - assertStream(t, tc.expected, streams.Streams) + AssertStream(t, tc.expected, streams.Streams) }) } } @@ -1765,7 +1766,7 @@ func Benchmark_store_OverlappingChunks(b *testing.B) { cfg: Config{ MaxChunkBatchSize: 50, }, - store: newMockChunkStore(chunkfmt, headfmt, newOverlappingStreams(200, 200)), + store: stores.NewMockChunkStore(chunkfmt, headfmt, newOverlappingStreams(200, 200)), } b.ResetTimer() statsCtx, ctx := stats.NewContext(user.InjectOrgID(context.Background(), "fake")) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 3425c2999ec3f..09c23a22ba89c 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" @@ -507,7 +508,7 @@ func Test_store_SelectLogs(t *testing.T) { if err != nil { t.Fatalf("error reading batch %s", err) } - assertStream(t, tt.expected, streams.Streams) + AssertStream(t, tt.expected, streams.Streams) }) } } @@ -831,7 +832,7 @@ func Test_store_SelectSample(t *testing.T) { if err != nil { t.Fatalf("error reading batch %s", err) } - assertSeries(t, tt.expected, series.Series) + AssertSeries(t, tt.expected, series.Series) }) } } @@ -938,7 +939,7 @@ func Test_store_GetSeries(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &LokiStore{ - store: newMockChunkStore(chunkfmt, headfmt, streamsFixture), + store: stores.NewMockChunkStore(chunkfmt, headfmt, streamsFixture), cfg: Config{ MaxChunkBatchSize: tt.batchSize, }, @@ -1075,7 +1076,7 @@ func TestStore_indexPrefixChange(t *testing.T) { chunkfmt, headfmt, err := periodConfig.ChunkFormat() require.NoError(t, err) - chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr)) + chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to)) err = store.PutOne(ctx, chk.From, chk.Through, chk) require.NoError(t, err) @@ -1145,7 +1146,7 @@ func TestStore_indexPrefixChange(t *testing.T) { chunkfmt, headfmt, err := periodConfig.ChunkFormat() require.NoError(t, err) - chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr)) + chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to)) err = store.PutOne(ctx, chk.From, chk.Through, chk) require.NoError(t, err) @@ -1269,7 +1270,7 @@ func TestStore_MultiPeriod(t *testing.T) { chunkfmt, headfmt, err := periodConfig.ChunkFormat() require.NoError(t, err) - chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr)) + chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to)) err = store.PutOne(ctx, chk.From, chk.Through, chk) require.NoError(t, err) @@ -1325,23 +1326,6 @@ func parseDate(in string) time.Time { return t } -func buildTestStreams(labels labels.Labels, tr timeRange) logproto.Stream { - stream := logproto.Stream{ - Labels: labels.String(), - Hash: labels.Hash(), - Entries: []logproto.Entry{}, - } - - for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) { - stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: from, - Line: from.String(), - }) - } - - return stream -} - func timeToModelTime(t time.Time) model.Time { return model.TimeFromUnixNano(t.UnixNano()) } @@ -1356,14 +1340,14 @@ func Test_OverlappingChunks(t *testing.T) { require.NoError(t, err) chunks := []chunk.Chunk{ - newChunk(chunkfmt, headfmt, logproto.Stream{ + stores.NewTestChunk(chunkfmt, headfmt, logproto.Stream{ Labels: `{foo="bar"}`, Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 1), Line: "1"}, {Timestamp: time.Unix(0, 4), Line: "4"}, }, }), - newChunk(chunkfmt, headfmt, logproto.Stream{ + stores.NewTestChunk(chunkfmt, headfmt, logproto.Stream{ Labels: `{foo="bar"}`, Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 2), Line: "2"}, @@ -1372,7 +1356,7 @@ func Test_OverlappingChunks(t *testing.T) { }), } s := &LokiStore{ - store: &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}}, + store: stores.NewMockChunkStoreWithChunks(chunks), cfg: Config{ MaxChunkBatchSize: 10, }, @@ -1414,7 +1398,7 @@ func Test_GetSeries(t *testing.T) { var ( store = &LokiStore{ - store: newMockChunkStore(chunkfmt, headfmt, []*logproto.Stream{ + store: stores.NewMockChunkStore(chunkfmt, headfmt, []*logproto.Stream{ { Labels: `{foo="bar",buzz="boo"}`, Entries: []logproto.Entry{ @@ -1600,7 +1584,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { chunkfmt, headfmt, err := periodConfig.ChunkFormat() require.NoError(t, err) - chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr)) + chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to)) err = store.PutOne(ctx, chk.From, chk.Through, chk) require.NoError(t, err) diff --git a/pkg/storage/stores/async_store_test.go b/pkg/storage/stores/async_store_test.go index a9d30ad701d7c..44c9dd61add07 100644 --- a/pkg/storage/stores/async_store_test.go +++ b/pkg/storage/stores/async_store_test.go @@ -18,6 +18,10 @@ import ( "github.com/grafana/loki/pkg/util" ) +var ( + fooLabelsWithName = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}} +) + // storeMock is a mockable version of Loki's storage, used in querier unit tests // to control the behaviour of the store without really hitting any storage backend type storeMock struct { @@ -95,10 +99,10 @@ func buildMockChunkRef(t *testing.T, num int) []chunk.Chunk { require.NoError(t, err) for i := 0; i < num; i++ { - chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, timeRange{ - from: now.Add(time.Duration(i) * time.Minute), - to: now.Add(time.Duration(i+1) * time.Minute), - })) + chk := NewTestChunk(chunkfmt, headfmt, BuildTestStream(fooLabelsWithName, + now.Add(time.Duration(i)*time.Minute), + now.Add(time.Duration(i+1)*time.Minute), + )) chunkRef, err := chunk.ParseExternalKey(chk.UserID, s.ExternalKey(chk.ChunkRef)) require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/bloomshipper/TODO.md b/pkg/storage/stores/shipper/bloomshipper/TODO.md new file mode 100644 index 0000000000000..b131f9127140b --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/TODO.md @@ -0,0 +1,10 @@ +# TODOs for bloomshipper package + +* avoid conversion between logproto and bloom structs +* reduce allocations in bloom querier by using a chunkref pool +* cache responses from bloom querier + +# TODOs for bloomgateway package + +* implement more efficient algo to determine bloom gateway server addresses +* shuffle sharding diff --git a/pkg/storage/stores/testutils.go b/pkg/storage/stores/testutils.go new file mode 100644 index 0000000000000..1a721b19ce626 --- /dev/null +++ b/pkg/storage/stores/testutils.go @@ -0,0 +1,223 @@ +package stores + +import ( + "context" + "sort" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/cache" + chunkclient "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/chunk/fetcher" + "github.com/grafana/loki/pkg/storage/config" + indexstats "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/util" + util_log "github.com/grafana/loki/pkg/util/log" +) + +type MockChunkStore struct { + schemas config.SchemaConfig + chunks []chunk.Chunk + client *mockChunkStoreClient + f chunk.RequestChunkFilterer +} + +// mockChunkStore cannot implement both chunk.Store and chunk.Client, +// since there is a conflict in signature for DeleteChunk method. +var ( + _ Store = &MockChunkStore{} + _ chunkclient.Client = &mockChunkStoreClient{} +) + +func NewMockChunkStore(chunkFormat byte, headfmt chunkenc.HeadBlockFmt, streams []*logproto.Stream) *MockChunkStore { + chunks := make([]chunk.Chunk, 0, len(streams)) + for _, s := range streams { + chunks = append(chunks, NewTestChunk(chunkFormat, headfmt, *s)) + } + return NewMockChunkStoreWithChunks(chunks) +} + +func NewMockChunkStoreWithChunks(chunks []chunk.Chunk) *MockChunkStore { + return &MockChunkStore{ + schemas: config.SchemaConfig{}, + chunks: chunks, + client: &mockChunkStoreClient{chunks: chunks, scfg: config.SchemaConfig{}}, + } +} + +func (m *MockChunkStore) Put(_ context.Context, _ []chunk.Chunk) error { return nil } +func (m *MockChunkStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { + return nil +} + +func (m *MockChunkStore) GetSeries(ctx context.Context, _ string, _, _ model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { + result := make([]labels.Labels, 0, len(m.chunks)) + unique := map[uint64]struct{}{} +Outer: + for _, c := range m.chunks { + if _, ok := unique[c.Fingerprint]; !ok { + for _, m := range matchers { + if !m.Matches(c.Metric.Get(m.Name)) { + continue Outer + } + } + l := labels.NewBuilder(c.Metric).Del(labels.MetricName).Labels() + if m.f != nil { + if m.f.ForRequest(ctx).ShouldFilter(l) { + continue + } + } + + result = append(result, l) + unique[c.Fingerprint] = struct{}{} + } + } + sort.Slice(result, func(i, j int) bool { return labels.Compare(result[i], result[j]) < 0 }) + return result, nil +} + +func (m *MockChunkStore) LabelValuesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string, _ string, _ ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (m *MockChunkStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string) ([]string, error) { + return nil, nil +} + +func (m *MockChunkStore) SetChunkFilterer(f chunk.RequestChunkFilterer) { + m.f = f +} + +func (m *MockChunkStore) DeleteChunk(_ context.Context, _, _ model.Time, _, _ string, _ labels.Labels, _ *model.Interval) error { + return nil +} + +func (m *MockChunkStore) DeleteSeriesIDs(_ context.Context, _, _ model.Time, _ string, _ labels.Labels) error { + return nil +} +func (m *MockChunkStore) Stop() {} +func (m *MockChunkStore) Get(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([]chunk.Chunk, error) { + return nil, nil +} + +func (m *MockChunkStore) GetChunkFetcher(_ model.Time) *fetcher.Fetcher { + return nil +} + +func (m *MockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { + refs := make([]chunk.Chunk, 0, len(m.chunks)) + // transform real chunks into ref chunks. + for _, c := range m.chunks { + r, err := chunk.ParseExternalKey("fake", m.schemas.ExternalKey(c.ChunkRef)) + if err != nil { + panic(err) + } + refs = append(refs, r) + } + + cache, err := cache.New(cache.Config{Prefix: "chunks"}, nil, util_log.Logger, stats.ChunkCache) + if err != nil { + panic(err) + } + + f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 10, 100, 0) + if err != nil { + panic(err) + } + return [][]chunk.Chunk{refs}, []*fetcher.Fetcher{f}, nil +} + +func (m *MockChunkStore) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) (*indexstats.Stats, error) { + return nil, nil +} + +func (m *MockChunkStore) Volume(_ context.Context, _ string, _, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) { + return nil, nil +} + +type mockChunkStoreClient struct { + chunks []chunk.Chunk + scfg config.SchemaConfig +} + +func (m mockChunkStoreClient) Stop() { + panic("implement me") +} + +func (m mockChunkStoreClient) PutChunks(_ context.Context, _ []chunk.Chunk) error { + return nil +} + +func (m mockChunkStoreClient) GetChunks(_ context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { + var res []chunk.Chunk + for _, c := range chunks { + for _, sc := range m.chunks { + // only returns chunks requested using the external key + if m.scfg.ExternalKey(c.ChunkRef) == m.scfg.ExternalKey(sc.ChunkRef) { + res = append(res, sc) + } + } + } + return res, nil +} + +func (m mockChunkStoreClient) DeleteChunk(_ context.Context, _, _ string) error { + return nil +} + +func (m mockChunkStoreClient) IsChunkNotFoundErr(_ error) bool { + return false +} + +func (m mockChunkStoreClient) IsRetryableErr(_ error) bool { + return false +} + +func NewTestChunk(chunkFormat byte, headBlockFmt chunkenc.HeadBlockFmt, stream logproto.Stream) chunk.Chunk { + lbs, err := syntax.ParseLabels(stream.Labels) + if err != nil { + panic(err) + } + if !lbs.Has(labels.MetricName) { + builder := labels.NewBuilder(lbs) + builder.Set(labels.MetricName, "logs") + lbs = builder.Labels() + } + from, through := util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp) + chk := chunkenc.NewMemChunk(chunkFormat, chunkenc.EncGZIP, headBlockFmt, 256*1024, 0) + for _, e := range stream.Entries { + _ = chk.Append(&e) + } + chk.Close() + c := chunk.NewChunk("fake", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through) + // force the checksum creation + if err := c.Encode(); err != nil { + panic(err) + } + return c +} + +func BuildTestStream(labels labels.Labels, fromTs, toTs time.Time) logproto.Stream { + stream := logproto.Stream{ + Labels: labels.String(), + Hash: labels.Hash(), + Entries: []logproto.Entry{}, + } + + for from := fromTs; from.Before(toTs); from = from.Add(time.Second) { + stream.Entries = append(stream.Entries, logproto.Entry{ + Timestamp: from, + Line: from.String(), + }) + } + + return stream +} diff --git a/pkg/storage/testutils.go b/pkg/storage/testutils.go new file mode 100644 index 0000000000000..c2887fb19a86b --- /dev/null +++ b/pkg/storage/testutils.go @@ -0,0 +1,116 @@ +package storage + +import ( + "sort" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/storage/stores" +) + +var ( + fooLabelsWithName = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}} + fooLabels = labels.Labels{{Name: "foo", Value: "bar"}} +) + +var from = time.Unix(0, time.Millisecond.Nanoseconds()) + +func AssertStream(t *testing.T, expected, actual []logproto.Stream) { + if len(expected) != len(actual) { + t.Fatalf("error stream length are different expected %d actual %d\n%s", len(expected), len(actual), spew.Sdump(expected, actual)) + return + } + sort.Slice(expected, func(i int, j int) bool { return expected[i].Labels < expected[j].Labels }) + sort.Slice(actual, func(i int, j int) bool { return actual[i].Labels < actual[j].Labels }) + for i := range expected { + assert.Equal(t, expected[i].Labels, actual[i].Labels) + if len(expected[i].Entries) != len(actual[i].Entries) { + t.Fatalf("error entries length are different expected %d actual %d\n%s", len(expected[i].Entries), len(actual[i].Entries), spew.Sdump(expected[i].Entries, actual[i].Entries)) + + return + } + for j := range expected[i].Entries { + assert.Equal(t, expected[i].Entries[j].Timestamp.UnixNano(), actual[i].Entries[j].Timestamp.UnixNano()) + assert.Equal(t, expected[i].Entries[j].Line, actual[i].Entries[j].Line) + } + } +} + +func AssertSeries(t *testing.T, expected, actual []logproto.Series) { + if len(expected) != len(actual) { + t.Fatalf("error stream length are different expected %d actual %d\n%s", len(expected), len(actual), spew.Sdump(expected, actual)) + return + } + sort.Slice(expected, func(i int, j int) bool { return expected[i].Labels < expected[j].Labels }) + sort.Slice(actual, func(i int, j int) bool { return actual[i].Labels < actual[j].Labels }) + for i := range expected { + assert.Equal(t, expected[i].Labels, actual[i].Labels) + if len(expected[i].Samples) != len(actual[i].Samples) { + t.Fatalf("error entries length are different expected %d actual%d\n%s", len(expected[i].Samples), len(actual[i].Samples), spew.Sdump(expected[i].Samples, actual[i].Samples)) + + return + } + for j := range expected[i].Samples { + assert.Equal(t, expected[i].Samples[j].Timestamp, actual[i].Samples[j].Timestamp) + assert.Equal(t, expected[i].Samples[j].Value, actual[i].Samples[j].Value) + assert.Equal(t, expected[i].Samples[j].Hash, actual[i].Samples[j].Hash) + } + } +} + +func newLazyChunk(chunkFormat byte, headfmt chunkenc.HeadBlockFmt, stream logproto.Stream) *LazyChunk { + return &LazyChunk{ + Fetcher: nil, + IsValid: true, + Chunk: stores.NewTestChunk(chunkFormat, headfmt, stream), + } +} + +func newLazyInvalidChunk(chunkFormat byte, headfmt chunkenc.HeadBlockFmt, stream logproto.Stream) *LazyChunk { + return &LazyChunk{ + Fetcher: nil, + IsValid: false, + Chunk: stores.NewTestChunk(chunkFormat, headfmt, stream), + } +} + +func newMatchers(matchers string) []*labels.Matcher { + res, err := syntax.ParseMatchers(matchers, true) + if err != nil { + panic(err) + } + return res +} + +func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnotation, deletes []*logproto.Delete) *logproto.QueryRequest { + req := &logproto.QueryRequest{ + Selector: query, + Start: start, + Limit: 1000, + End: end, + Direction: logproto.FORWARD, + Deletes: deletes, + } + for _, shard := range shards { + req.Shards = append(req.Shards, shard.String()) + } + return req +} + +func newSampleQuery(query string, start, end time.Time, deletes []*logproto.Delete) *logproto.SampleQueryRequest { + req := &logproto.SampleQueryRequest{ + Selector: query, + Start: start, + End: end, + Deletes: deletes, + } + return req +}