diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 067134db07ca0..6867b0e1519c2 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1173,7 +1173,7 @@ func TestInstance_Volume(t *testing.T) { volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1192,7 +1192,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, AggregateBy: seriesvolume.Series, @@ -1208,7 +1208,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1242,7 +1242,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"log_stream"}, @@ -1260,7 +1260,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1277,7 +1277,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, @@ -1297,7 +1297,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1317,7 +1317,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="worker"}`, Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1338,7 +1338,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1373,7 +1373,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1390,7 +1390,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1407,7 +1407,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 0b5061d8d60b8..3c9acdfa5a638 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -219,8 +219,7 @@ func getLocalStore(path string, cm ClientMetrics) Store { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 168, - }, - }, + }}, RowShards: 16, }, }, @@ -922,6 +921,7 @@ func Test_PipelineWrapper(t *testing.T) { s.SetPipelineWrapper(wrapper) ctx = user.InjectOrgID(context.Background(), "test-user") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) + if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -952,6 +952,7 @@ func Test_PipelineWrapper_disabled(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "test-user") ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) + if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -1291,8 +1292,7 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, } schemaConfig := config.SchemaConfig{ @@ -1366,8 +1366,7 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_tsdb_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, } schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2) @@ -1472,8 +1471,7 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, } periodConfigV11 := config.PeriodConfig{ @@ -1485,8 +1483,7 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, } @@ -1565,6 +1562,7 @@ func TestStore_MultiPeriod(t *testing.T) { } }) } + } func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry { @@ -1831,8 +1829,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, }, { @@ -1845,8 +1842,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, }, }, } @@ -1984,8 +1980,7 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, }, { @@ -1998,8 +1993,7 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, }, }, } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index e12ae5718ed7c..68528e905b8f4 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -74,6 +74,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp } return NewPrefixedIdentifier(id, parentDir, "") }) + if err != nil { return nil, err } @@ -194,6 +195,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, _ string, fpFilter index.Fing } return p.Err() }) + } func (i *TSDBIndex) forPostings( @@ -218,6 +220,7 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu if err := i.ForSeries(ctx, "", fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { for _, chk := range chks { + res = append(res, ChunkRef{ User: userID, // assumed to be the same, will be enforced by caller. Fingerprint: fp, @@ -365,15 +368,6 @@ func (i *TSDBIndex) Volume( for k := range labelsToMatch { by[k] = struct{}{} } - - // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. - if i.chunkFilter != nil { - if filterer := i.chunkFilter.ForRequest(ctx); filterer != nil { - for _, k := range filterer.RequiredLabelNames() { - by[k] = struct{}{} - } - } - } } return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error {