Skip to content

Commit

Permalink
Resolve circular imports in test
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Nov 2, 2023
1 parent b334c96 commit 2509281
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 36 deletions.
9 changes: 5 additions & 4 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
)

var NilMetrics = NewChunkMetrics(nil, 0)
Expand Down Expand Up @@ -1010,7 +1011,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
t.Fatalf("error reading batch %s", err)
}

assertStream(t, tt.expected, streams.Streams)
AssertStream(t, tt.expected, streams.Streams)
})
}
}
Expand Down Expand Up @@ -1429,7 +1430,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
t.Fatalf("error reading batch %s", err)
}

assertSeries(t, tt.expected, series.Series)
AssertSeries(t, tt.expected, series.Series)
})
}
}
Expand Down Expand Up @@ -1660,7 +1661,7 @@ func TestBuildHeapIterator(t *testing.T) {
if err != nil {
t.Fatalf("error reading batch %s", err)
}
assertStream(t, tc.expected, streams.Streams)
AssertStream(t, tc.expected, streams.Streams)
})
}
}
Expand Down Expand Up @@ -1765,7 +1766,7 @@ func Benchmark_store_OverlappingChunks(b *testing.B) {
cfg: Config{
MaxChunkBatchSize: 50,
},
store: newMockChunkStore(chunkfmt, headfmt, newOverlappingStreams(200, 200)),
store: stores.NewMockChunkStore(chunkfmt, headfmt, newOverlappingStreams(200, 200)),
}
b.ResetTimer()
statsCtx, ctx := stats.NewContext(user.InjectOrgID(context.Background(), "fake"))
Expand Down
40 changes: 12 additions & 28 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
Expand Down Expand Up @@ -507,7 +508,7 @@ func Test_store_SelectLogs(t *testing.T) {
if err != nil {
t.Fatalf("error reading batch %s", err)
}
assertStream(t, tt.expected, streams.Streams)
AssertStream(t, tt.expected, streams.Streams)
})
}
}
Expand Down Expand Up @@ -831,7 +832,7 @@ func Test_store_SelectSample(t *testing.T) {
if err != nil {
t.Fatalf("error reading batch %s", err)
}
assertSeries(t, tt.expected, series.Series)
AssertSeries(t, tt.expected, series.Series)
})
}
}
Expand Down Expand Up @@ -938,7 +939,7 @@ func Test_store_GetSeries(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &LokiStore{
store: newMockChunkStore(chunkfmt, headfmt, streamsFixture),
store: stores.NewMockChunkStore(chunkfmt, headfmt, streamsFixture),
cfg: Config{
MaxChunkBatchSize: tt.batchSize,
},
Expand Down Expand Up @@ -1075,7 +1076,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
chunkfmt, headfmt, err := periodConfig.ChunkFormat()
require.NoError(t, err)

chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr))
chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to))

err = store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
Expand Down Expand Up @@ -1145,7 +1146,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
chunkfmt, headfmt, err := periodConfig.ChunkFormat()
require.NoError(t, err)

chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr))
chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to))

err = store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
Expand Down Expand Up @@ -1269,7 +1270,7 @@ func TestStore_MultiPeriod(t *testing.T) {
chunkfmt, headfmt, err := periodConfig.ChunkFormat()
require.NoError(t, err)

chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr))
chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to))

err = store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
Expand Down Expand Up @@ -1325,23 +1326,6 @@ func parseDate(in string) time.Time {
return t
}

func buildTestStreams(labels labels.Labels, tr timeRange) logproto.Stream {
stream := logproto.Stream{
Labels: labels.String(),
Hash: labels.Hash(),
Entries: []logproto.Entry{},
}

for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) {
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: from,
Line: from.String(),
})
}

return stream
}

func timeToModelTime(t time.Time) model.Time {
return model.TimeFromUnixNano(t.UnixNano())
}
Expand All @@ -1356,14 +1340,14 @@ func Test_OverlappingChunks(t *testing.T) {
require.NoError(t, err)

chunks := []chunk.Chunk{
newChunk(chunkfmt, headfmt, logproto.Stream{
stores.NewTestChunk(chunkfmt, headfmt, logproto.Stream{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 4), Line: "4"},
},
}),
newChunk(chunkfmt, headfmt, logproto.Stream{
stores.NewTestChunk(chunkfmt, headfmt, logproto.Stream{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 2), Line: "2"},
Expand All @@ -1372,7 +1356,7 @@ func Test_OverlappingChunks(t *testing.T) {
}),
}
s := &LokiStore{
store: &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}},
store: stores.NewMockChunkStoreWithChunks(chunks),
cfg: Config{
MaxChunkBatchSize: 10,
},
Expand Down Expand Up @@ -1414,7 +1398,7 @@ func Test_GetSeries(t *testing.T) {

var (
store = &LokiStore{
store: newMockChunkStore(chunkfmt, headfmt, []*logproto.Stream{
store: stores.NewMockChunkStore(chunkfmt, headfmt, []*logproto.Stream{
{
Labels: `{foo="bar",buzz="boo"}`,
Entries: []logproto.Entry{
Expand Down Expand Up @@ -1600,7 +1584,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
chunkfmt, headfmt, err := periodConfig.ChunkFormat()
require.NoError(t, err)

chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, tr))
chk := stores.NewTestChunk(chunkfmt, headfmt, stores.BuildTestStream(fooLabelsWithName, tr.from, tr.to))

err = store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/stores/async_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/grafana/loki/pkg/util"
)

var (
fooLabelsWithName = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}}
)

// storeMock is a mockable version of Loki's storage, used in querier unit tests
// to control the behaviour of the store without really hitting any storage backend
type storeMock struct {
Expand Down Expand Up @@ -95,10 +99,10 @@ func buildMockChunkRef(t *testing.T, num int) []chunk.Chunk {
require.NoError(t, err)

for i := 0; i < num; i++ {
chk := newChunk(chunkfmt, headfmt, buildTestStreams(fooLabelsWithName, timeRange{
from: now.Add(time.Duration(i) * time.Minute),
to: now.Add(time.Duration(i+1) * time.Minute),
}))
chk := NewTestChunk(chunkfmt, headfmt, BuildTestStream(fooLabelsWithName,
now.Add(time.Duration(i)*time.Minute),
now.Add(time.Duration(i+1)*time.Minute),
))

chunkRef, err := chunk.ParseExternalKey(chk.UserID, s.ExternalKey(chk.ChunkRef))
require.NoError(t, err)
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# TODOs for bloomshipper package

* avoid conversion between logproto and bloom structs
* reduce allocations in bloom querier by using a chunkref pool
* cache responses from bloom querier

# TODOs for bloomgateway package

* implement more efficient algo to determine bloom gateway server addresses
* shuffle sharding
Loading

0 comments on commit 2509281

Please sign in to comment.