diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 3e2a822d3eace..4086831bb33fb 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1177,7 +1177,7 @@ func TestInstance_Volume(t *testing.T) { volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1196,7 +1196,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, AggregateBy: seriesvolume.Series, @@ -1212,7 +1212,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1246,7 +1246,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"log_stream"}, @@ -1264,7 +1264,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1281,7 +1281,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, @@ -1301,7 +1301,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1321,7 +1321,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="worker"}`, Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1342,7 +1342,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1377,7 +1377,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1394,7 +1394,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1411,7 +1411,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, // milliseconds + Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index d31501893dc45..101c906b8b4fe 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -219,8 +219,7 @@ func getLocalStore(path string, cm ClientMetrics) Store { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 168, - }, - }, + }}, RowShards: 16, }, }, @@ -926,6 +925,7 @@ func Test_PipelineWrapper(t *testing.T) { s.SetPipelineWrapper(wrapper) ctx = user.InjectOrgID(context.Background(), "test-user") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) + if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -956,6 +956,7 @@ func Test_PipelineWrapper_disabled(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "test-user") ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) + if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -1295,8 +1296,7 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, } schemaConfig := config.SchemaConfig{ @@ -1370,8 +1370,7 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_tsdb_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, } schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2) @@ -1476,8 +1475,7 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, } periodConfigV11 := config.PeriodConfig{ @@ -1489,8 +1487,7 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, } @@ -1569,6 +1566,7 @@ func TestStore_MultiPeriod(t *testing.T) { } }) } + } func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry { @@ -1835,8 +1833,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, }, { @@ -1849,8 +1846,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, }, }, } @@ -1988,8 +1984,7 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, RowShards: 2, }, { @@ -2002,8 +1997,7 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }, - }, + }}, }, }, } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index a27f12382cc81..255425b286f22 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -74,6 +74,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp } return NewPrefixedIdentifier(id, parentDir, "") }) + if err != nil { return nil, err } @@ -194,6 +195,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, _ string, fpFilter index.Fing } return p.Err() }) + } func (i *TSDBIndex) forPostings( @@ -218,6 +220,7 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu if err := i.ForSeries(ctx, "", fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { for _, chk := range chks { + res = append(res, ChunkRef{ User: userID, // assumed to be the same, will be enforced by caller. Fingerprint: fp, @@ -292,12 +295,14 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim var filterer chunk.Filterer by := make(map[string]struct{}) if i.chunkFilter != nil { - if filterer = i.chunkFilter.ForRequest(ctx); filterer != nil { + filterer = i.chunkFilter.ForRequest(ctx) + if filterer != nil { for _, k := range filterer.RequiredLabelNames() { by[k] = struct{}{} } } } + for p.Next() { fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil { @@ -364,6 +369,10 @@ func (i *TSDBIndex) Volume( aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" var by map[string]struct{} + var filterer chunk.Filterer + if i.chunkFilter != nil { + filterer = i.chunkFilter.ForRequest(ctx) + } if !includeAll && (aggregateBySeries || len(targetLabels) > 0) { by = make(map[string]struct{}, len(labelsToMatch)) for k := range labelsToMatch { @@ -371,22 +380,15 @@ func (i *TSDBIndex) Volume( } // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. - if i.chunkFilter != nil { - if filterer := i.chunkFilter.ForRequest(ctx); filterer != nil { - for _, k := range filterer.RequiredLabelNames() { - by[k] = struct{}{} - } + if filterer != nil { + for _, k := range filterer.RequiredLabelNames() { + by[k] = struct{}{} } } } return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { var ls labels.Labels - var filterer chunk.Filterer - if i.chunkFilter != nil { - filterer = i.chunkFilter.ForRequest(ctx) - } - for p.Next() { fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil {