Skip to content

Commit

Permalink
chore: remove unwanted changes
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 27, 2024
1 parent 4ad2bd8 commit 4ae23d4
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 39 deletions.
24 changes: 12 additions & 12 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ func TestInstance_Volume(t *testing.T) {

volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1192,7 +1192,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1208,7 +1208,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand Down Expand Up @@ -1242,7 +1242,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"log_stream"},
Expand All @@ -1260,7 +1260,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1277,7 +1277,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand All @@ -1297,7 +1297,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1317,7 +1317,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="worker"}`,
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1338,7 +1338,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand Down Expand Up @@ -1373,7 +1373,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1390,7 +1390,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1407,7 +1407,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, // milliseconds
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand Down
30 changes: 12 additions & 18 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ func getLocalStore(path string, cm ClientMetrics) Store {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
},
}},
RowShards: 16,
},
},
Expand Down Expand Up @@ -922,6 +921,7 @@ func Test_PipelineWrapper(t *testing.T) {
s.SetPipelineWrapper(wrapper)
ctx = user.InjectOrgID(context.Background(), "test-user")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -952,6 +952,7 @@ func Test_PipelineWrapper_disabled(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "test-user")
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -1291,8 +1292,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
}

schemaConfig := config.SchemaConfig{
Expand Down Expand Up @@ -1366,8 +1366,7 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_tsdb_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
}
schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2)
Expand Down Expand Up @@ -1472,8 +1471,7 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
}

periodConfigV11 := config.PeriodConfig{
Expand All @@ -1485,8 +1483,7 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
}

Expand Down Expand Up @@ -1565,6 +1562,7 @@ func TestStore_MultiPeriod(t *testing.T) {
}
})
}

}

func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry {
Expand Down Expand Up @@ -1831,8 +1829,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
},
{
Expand All @@ -1845,8 +1842,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
},
},
}
Expand Down Expand Up @@ -1984,8 +1980,7 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
RowShards: 2,
},
{
Expand All @@ -1998,8 +1993,7 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
}},
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp
}
return NewPrefixedIdentifier(id, parentDir, "")
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,6 +195,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, _ string, fpFilter index.Fing
}
return p.Err()
})

}

func (i *TSDBIndex) forPostings(
Expand All @@ -218,6 +220,7 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu

if err := i.ForSeries(ctx, "", fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
for _, chk := range chks {

res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: fp,
Expand Down Expand Up @@ -365,15 +368,6 @@ func (i *TSDBIndex) Volume(
for k := range labelsToMatch {
by[k] = struct{}{}
}

// If we are aggregating by series, we need to include all labels in the series required for filtering chunks.
if i.chunkFilter != nil {
if filterer := i.chunkFilter.ForRequest(ctx); filterer != nil {
for _, k := range filterer.RequiredLabelNames() {
by[k] = struct{}{}
}
}
}
}

return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {
Expand Down

0 comments on commit 4ae23d4

Please sign in to comment.