Skip to content

Commit

Permalink
chore: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 27, 2024
1 parent b476d05 commit 6ab0a22
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 41 deletions.
24 changes: 12 additions & 12 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func TestInstance_Volume(t *testing.T) {

volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1196,7 +1196,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1212,7 +1212,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand Down Expand Up @@ -1246,7 +1246,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"log_stream"},
Expand All @@ -1264,7 +1264,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1281,7 +1281,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand All @@ -1301,7 +1301,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1321,7 +1321,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="worker"}`,
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1342,7 +1342,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand Down Expand Up @@ -1377,7 +1377,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1394,7 +1394,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1411,7 +1411,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand Down
30 changes: 12 additions & 18 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ func getLocalStore(path string, cm ClientMetrics) Store {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
},
}},
RowShards: 16,
},
},
Expand Down Expand Up @@ -926,6 +925,7 @@ func Test_PipelineWrapper(t *testing.T) {
s.SetPipelineWrapper(wrapper)
ctx = user.InjectOrgID(context.Background(), "test-user")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -956,6 +956,7 @@ func Test_PipelineWrapper_disabled(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "test-user")
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -1295,8 +1296,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
}

schemaConfig := config.SchemaConfig{
Expand Down Expand Up @@ -1370,8 +1370,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_tsdb_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
}
schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2)
Expand Down Expand Up @@ -1476,8 +1475,7 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
}

periodConfigV11 := config.PeriodConfig{
Expand All @@ -1489,8 +1487,7 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
}

Expand Down Expand Up @@ -1569,6 +1566,7 @@ func TestStore_MultiPeriod(t *testing.T) {
}
})
}

}

func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry {
Expand Down Expand Up @@ -1835,8 +1833,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
},
{
Expand All @@ -1849,8 +1846,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
},
},
}
Expand Down Expand Up @@ -1988,8 +1984,7 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
},
{
Expand All @@ -2002,8 +1997,7 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
},
},
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp
}
return NewPrefixedIdentifier(id, parentDir, "")
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,6 +195,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, _ string, fpFilter index.Fing
}
return p.Err()
})

}

func (i *TSDBIndex) forPostings(
Expand All @@ -218,6 +220,7 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu

if err := i.ForSeries(ctx, "", fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
for _, chk := range chks {

res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: fp,
Expand Down Expand Up @@ -292,12 +295,14 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim
var filterer chunk.Filterer
by := make(map[string]struct{})
if i.chunkFilter != nil {
if filterer = i.chunkFilter.ForRequest(ctx); filterer != nil {
filterer = i.chunkFilter.ForRequest(ctx)
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
Expand Down Expand Up @@ -364,29 +369,26 @@ func (i *TSDBIndex) Volume(

aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == ""
var by map[string]struct{}
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
}
if !includeAll && (aggregateBySeries || len(targetLabels) > 0) {
by = make(map[string]struct{}, len(labelsToMatch))
for k := range labelsToMatch {
by[k] = struct{}{}
}

// If we are aggregating by series, we need to include all labels in the series required for filtering chunks.
if i.chunkFilter != nil {
if filterer := i.chunkFilter.ForRequest(ctx); filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
if filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
}

for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by)
if err != nil {
Expand Down

0 comments on commit 6ab0a22

Please sign in to comment.