diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 5cece24172526..ed1f50ae72582 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -90,7 +90,7 @@ func New( c.metrics = NewMetrics(r, c.btMetrics) chunkLoader := NewStoreChunkLoader( - NewFetcherProviderAdapter(fetcherProvider), + fetcherProvider, c.metrics, ) diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index d9d9c68947a73..58dd2674895ed 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -16,6 +16,7 @@ import ( logql_log "github.com/grafana/loki/pkg/logql/log" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/stores" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" @@ -235,39 +236,13 @@ type ChunkLoader interface { Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error) } -// interface modeled from `pkg/storage/stores/composite_store.ChunkFetcherProvider` -type fetcherProvider interface { - GetChunkFetcher(model.Time) chunkFetcher -} - -// interface modeled from `pkg/storage/chunk/fetcher.Fetcher` -type chunkFetcher interface { - FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) -} - -// Adapter turning `stores.ChunkFetcherProvider` into `fetcherProvider` -// The former returns a concrete type and is heavily used externally -// while the latter returns an interface for better testing and -// is used internally -type FetcherProviderAdapter struct { - root stores.ChunkFetcherProvider -} - -func NewFetcherProviderAdapter(root stores.ChunkFetcherProvider) *FetcherProviderAdapter { - return &FetcherProviderAdapter{root: root} -} - -func (f *FetcherProviderAdapter) GetChunkFetcher(t model.Time) chunkFetcher { - return f.root.GetChunkFetcher(t) -} - // StoreChunkLoader loads chunks from a store type StoreChunkLoader struct { - fetcherProvider fetcherProvider + fetcherProvider stores.ChunkFetcherProvider metrics *Metrics } -func NewStoreChunkLoader(fetcherProvider fetcherProvider, metrics *Metrics) *StoreChunkLoader { +func NewStoreChunkLoader(fetcherProvider stores.ChunkFetcherProvider, metrics *Metrics) *StoreChunkLoader { return &StoreChunkLoader{ fetcherProvider: fetcherProvider, metrics: metrics, @@ -278,7 +253,7 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S // NB(owen-d): This is probably unnecessary as we should only have one fetcher // because we'll only be working on a single index period at a time, but this should protect // us in the case of refactoring/changing this and likely isn't a perf bottleneck. - chksByFetcher := make(map[chunkFetcher][]chunk.Chunk) + chksByFetcher := make(map[*fetcher.Fetcher][]chunk.Chunk) for _, chk := range series.Chunks { fetcher := s.fetcherProvider.GetChunkFetcher(chk.Start) chksByFetcher[fetcher] = append(chksByFetcher[fetcher], chunk.Chunk{ @@ -292,119 +267,152 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S }) } - work := make([]chunkWork, 0, len(chksByFetcher)) + var ( + fetchers = make([]Fetcher[chunk.Chunk, chunk.Chunk], 0, len(chksByFetcher)) + inputs = make([][]chunk.Chunk, 0, len(chksByFetcher)) + ) for fetcher, chks := range chksByFetcher { - work = append(work, chunkWork{ - fetcher: fetcher, - chks: chks, - }) + fn := FetchFunc[chunk.Chunk, chunk.Chunk](fetcher.FetchChunks) + fetchers = append(fetchers, fn) + inputs = append(inputs, chks) } return &ChunkItersByFingerprint{ fp: series.Fingerprint, - itr: newBatchedLoader(ctx, work, batchedLoaderDefaultBatchSize, s.metrics), + itr: newBatchedChunkLoader(ctx, fetchers, inputs, s.metrics, batchedLoaderDefaultBatchSize), }, nil } -type chunkWork struct { - fetcher chunkFetcher - chks []chunk.Chunk +type Fetcher[A, B any] interface { + Fetch(ctx context.Context, inputs []A) ([]B, error) +} + +type FetchFunc[A, B any] func(ctx context.Context, inputs []A) ([]B, error) + +func (f FetchFunc[A, B]) Fetch(ctx context.Context, inputs []A) ([]B, error) { + return f(ctx, inputs) } // batchedLoader implements `v1.Iterator[v1.ChunkRefWithIter]` in batches // to ensure memory is bounded while loading chunks // TODO(owen-d): testware -type batchedLoader struct { +type batchedLoader[A, B, C any] struct { metrics *Metrics batchSize int ctx context.Context - work []chunkWork + fetchers []Fetcher[A, B] + work [][]A - cur v1.ChunkRefWithIter - batch []chunk.Chunk - err error + mapper func(B) (C, error) + cur C + batch []B + err error } const batchedLoaderDefaultBatchSize = 50 -func newBatchedLoader(ctx context.Context, work []chunkWork, batchSize int, metrics *Metrics) *batchedLoader { - return &batchedLoader{ - metrics: metrics, - batchSize: batchSize, +func newBatchedLoader[A, B, C any]( + ctx context.Context, + fetchers []Fetcher[A, B], + inputs [][]A, + mapper func(B) (C, error), + batchSize int, +) *batchedLoader[A, B, C] { + return &batchedLoader[A, B, C]{ + batchSize: max(batchSize, 1), ctx: ctx, - work: work, + fetchers: fetchers, + work: inputs, + mapper: mapper, } } -func (b *batchedLoader) Next() bool { - if len(b.batch) > 0 { - return b.prepNext(false) - } +func (b *batchedLoader[A, B, C]) Next() bool { - if len(b.work) == 0 { - return false - } + // iterate work until we have non-zero length batch + for len(b.batch) == 0 { - // setup next batch - next := b.work[0] - batchSize := min(b.batchSize, len(next.chks)) - toFetch := next.chks[:batchSize] - // update work - b.work[0].chks = next.chks[batchSize:] - if len(b.work[0].chks) == 0 { - b.work = b.work[1:] - } + // empty batch + no work remaining = we're done + if len(b.work) == 0 { + return false + } - if len(toFetch) == 0 { - return false - } + // setup next batch + next := b.work[0] + batchSize := min(b.batchSize, len(next)) + toFetch := next[:batchSize] + fetcher := b.fetchers[0] + + // update work + b.work[0] = b.work[0][batchSize:] + if len(b.work[0]) == 0 { + // if we've exhausted work from this set of inputs, + // set pointer to next set of inputs + // and their respective fetcher + b.work = b.work[1:] + b.fetchers = b.fetchers[1:] + } - b.batch, b.err = next.fetcher.FetchChunks(b.ctx, toFetch) - if b.err != nil { - return false + // there was no work in this batch; continue (should not happen) + if len(toFetch) == 0 { + continue + } + + b.batch, b.err = fetcher.Fetch(b.ctx, toFetch) + // error fetching, short-circuit iteration + if b.err != nil { + return false + } } - return b.prepNext(true) + return b.prepNext() } -func (b *batchedLoader) prepNext(checkLen bool) bool { - if checkLen && len(b.batch) == 0 { - return false - } - b.cur, b.err = b.format(b.batch[0]) +func (b *batchedLoader[_, B, C]) prepNext() bool { + b.cur, b.err = b.mapper(b.batch[0]) b.batch = b.batch[1:] return b.err == nil } -func (b *batchedLoader) format(c chunk.Chunk) (v1.ChunkRefWithIter, error) { - chk := c.Data.(*chunkenc.Facade).LokiChunk() - b.metrics.chunkSize.Observe(float64(chk.UncompressedSize())) - itr, err := chk.Iterator( - b.ctx, - time.Unix(0, 0), - time.Unix(0, math.MaxInt64), - logproto.FORWARD, - logql_log.NewNoopPipeline().ForStream(c.Metric), - ) +func newBatchedChunkLoader( + ctx context.Context, + fetchers []Fetcher[chunk.Chunk, chunk.Chunk], + inputs [][]chunk.Chunk, + metrics *Metrics, + batchSize int, +) *batchedLoader[chunk.Chunk, chunk.Chunk, v1.ChunkRefWithIter] { + + mapper := func(c chunk.Chunk) (v1.ChunkRefWithIter, error) { + chk := c.Data.(*chunkenc.Facade).LokiChunk() + metrics.chunkSize.Observe(float64(chk.UncompressedSize())) + itr, err := chk.Iterator( + ctx, + time.Unix(0, 0), + time.Unix(0, math.MaxInt64), + logproto.FORWARD, + logql_log.NewNoopPipeline().ForStream(c.Metric), + ) - if err != nil { - return v1.ChunkRefWithIter{}, err - } + if err != nil { + return v1.ChunkRefWithIter{}, err + } - return v1.ChunkRefWithIter{ - Ref: v1.ChunkRef{ - Start: c.From, - End: c.Through, - Checksum: c.Checksum, - }, - Itr: itr, - }, nil + return v1.ChunkRefWithIter{ + Ref: v1.ChunkRef{ + Start: c.From, + End: c.Through, + Checksum: c.Checksum, + }, + Itr: itr, + }, nil + } + return newBatchedLoader(ctx, fetchers, inputs, mapper, batchSize) } -func (b *batchedLoader) At() v1.ChunkRefWithIter { +func (b *batchedLoader[_, _, C]) At() C { return b.cur } -func (b *batchedLoader) Err() error { +func (b *batchedLoader[_, _, _]) Err() error { return b.err } diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index 798d65e2f2bcd..44b1fa26a4d1f 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -3,6 +3,7 @@ package bloomcompactor import ( "bytes" "context" + "errors" "testing" "github.com/go-kit/log" @@ -155,3 +156,129 @@ func TestSimpleBloomGenerator(t *testing.T) { }) } } + +func TestBatchedLoader(t *testing.T) { + errMapper := func(i int) (int, error) { + return 0, errors.New("bzzt") + } + successMapper := func(i int) (int, error) { + return i, nil + } + + expired, cancel := context.WithCancel(context.Background()) + cancel() + + for _, tc := range []struct { + desc string + ctx context.Context + batchSize int + mapper func(int) (int, error) + err bool + inputs [][]int + exp []int + }{ + { + desc: "OneBatch", + ctx: context.Background(), + batchSize: 2, + mapper: successMapper, + err: false, + inputs: [][]int{{0, 1}}, + exp: []int{0, 1}, + }, + { + desc: "ZeroBatchSizeStillWorks", + ctx: context.Background(), + batchSize: 0, + mapper: successMapper, + err: false, + inputs: [][]int{{0, 1}}, + exp: []int{0, 1}, + }, + { + desc: "OneBatchLessThanFull", + ctx: context.Background(), + batchSize: 2, + mapper: successMapper, + err: false, + inputs: [][]int{{0}}, + exp: []int{0}, + }, + { + desc: "TwoBatches", + ctx: context.Background(), + batchSize: 2, + mapper: successMapper, + err: false, + inputs: [][]int{{0, 1, 2, 3}}, + exp: []int{0, 1, 2, 3}, + }, + { + desc: "MultipleBatchesMultipleLoaders", + ctx: context.Background(), + batchSize: 2, + mapper: successMapper, + err: false, + inputs: [][]int{{0, 1}, {2}, {3, 4, 5}}, + exp: []int{0, 1, 2, 3, 4, 5}, + }, + { + desc: "HandlesEmptyInputs", + ctx: context.Background(), + batchSize: 2, + mapper: successMapper, + err: false, + inputs: [][]int{{0, 1, 2, 3}, nil, {4}}, + exp: []int{0, 1, 2, 3, 4}, + }, + { + desc: "Timeout", + ctx: expired, + batchSize: 2, + mapper: successMapper, + err: true, + inputs: [][]int{{0}}, + }, + { + desc: "MappingFailure", + ctx: context.Background(), + batchSize: 2, + mapper: errMapper, + err: true, + inputs: [][]int{{0}}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + fetchers := make([]Fetcher[int, int], 0, len(tc.inputs)) + for range tc.inputs { + fetchers = append( + fetchers, + FetchFunc[int, int](func(ctx context.Context, xs []int) ([]int, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + return xs, nil + }), + ) + } + + loader := newBatchedLoader[int, int, int]( + tc.ctx, + fetchers, + tc.inputs, + tc.mapper, + tc.batchSize, + ) + + got, err := v1.Collect[int](loader) + if tc.err { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.exp, got) + + }) + } + +}