Skip to content

Commit

Permalink
makes batchedLoader generic + removes unnecessary interfaces & adapte…
Browse files Browse the repository at this point in the history
…rs (grafana#11924)

While reviewing grafana#11919, I figured
it'd be nice to make `batchedLoader` generic so we can reuse it's logic.
This let me test it easier and remove a lot of now-unnecessary adapter
code (interfaces, types)
  • Loading branch information
owen-d authored and rhnasc committed Apr 12, 2024
1 parent 6c6e108 commit a5e562b
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 100 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(
c.metrics = NewMetrics(r, c.btMetrics)

chunkLoader := NewStoreChunkLoader(
NewFetcherProviderAdapter(fetcherProvider),
fetcherProvider,
c.metrics,
)

Expand Down
206 changes: 107 additions & 99 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
logql_log "github.com/grafana/loki/pkg/logql/log"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
Expand Down Expand Up @@ -235,39 +236,13 @@ type ChunkLoader interface {
Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error)
}

// interface modeled from `pkg/storage/stores/composite_store.ChunkFetcherProvider`
type fetcherProvider interface {
GetChunkFetcher(model.Time) chunkFetcher
}

// interface modeled from `pkg/storage/chunk/fetcher.Fetcher`
type chunkFetcher interface {
FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error)
}

// Adapter turning `stores.ChunkFetcherProvider` into `fetcherProvider`
// The former returns a concrete type and is heavily used externally
// while the latter returns an interface for better testing and
// is used internally
type FetcherProviderAdapter struct {
root stores.ChunkFetcherProvider
}

func NewFetcherProviderAdapter(root stores.ChunkFetcherProvider) *FetcherProviderAdapter {
return &FetcherProviderAdapter{root: root}
}

func (f *FetcherProviderAdapter) GetChunkFetcher(t model.Time) chunkFetcher {
return f.root.GetChunkFetcher(t)
}

// StoreChunkLoader loads chunks from a store
type StoreChunkLoader struct {
fetcherProvider fetcherProvider
fetcherProvider stores.ChunkFetcherProvider
metrics *Metrics
}

func NewStoreChunkLoader(fetcherProvider fetcherProvider, metrics *Metrics) *StoreChunkLoader {
func NewStoreChunkLoader(fetcherProvider stores.ChunkFetcherProvider, metrics *Metrics) *StoreChunkLoader {
return &StoreChunkLoader{
fetcherProvider: fetcherProvider,
metrics: metrics,
Expand All @@ -278,7 +253,7 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S
// NB(owen-d): This is probably unnecessary as we should only have one fetcher
// because we'll only be working on a single index period at a time, but this should protect
// us in the case of refactoring/changing this and likely isn't a perf bottleneck.
chksByFetcher := make(map[chunkFetcher][]chunk.Chunk)
chksByFetcher := make(map[*fetcher.Fetcher][]chunk.Chunk)
for _, chk := range series.Chunks {
fetcher := s.fetcherProvider.GetChunkFetcher(chk.Start)
chksByFetcher[fetcher] = append(chksByFetcher[fetcher], chunk.Chunk{
Expand All @@ -292,119 +267,152 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S
})
}

work := make([]chunkWork, 0, len(chksByFetcher))
var (
fetchers = make([]Fetcher[chunk.Chunk, chunk.Chunk], 0, len(chksByFetcher))
inputs = make([][]chunk.Chunk, 0, len(chksByFetcher))
)
for fetcher, chks := range chksByFetcher {
work = append(work, chunkWork{
fetcher: fetcher,
chks: chks,
})
fn := FetchFunc[chunk.Chunk, chunk.Chunk](fetcher.FetchChunks)
fetchers = append(fetchers, fn)
inputs = append(inputs, chks)
}

return &ChunkItersByFingerprint{
fp: series.Fingerprint,
itr: newBatchedLoader(ctx, work, batchedLoaderDefaultBatchSize, s.metrics),
itr: newBatchedChunkLoader(ctx, fetchers, inputs, s.metrics, batchedLoaderDefaultBatchSize),
}, nil
}

type chunkWork struct {
fetcher chunkFetcher
chks []chunk.Chunk
type Fetcher[A, B any] interface {
Fetch(ctx context.Context, inputs []A) ([]B, error)
}

type FetchFunc[A, B any] func(ctx context.Context, inputs []A) ([]B, error)

func (f FetchFunc[A, B]) Fetch(ctx context.Context, inputs []A) ([]B, error) {
return f(ctx, inputs)
}

// batchedLoader implements `v1.Iterator[v1.ChunkRefWithIter]` in batches
// to ensure memory is bounded while loading chunks
// TODO(owen-d): testware
type batchedLoader struct {
type batchedLoader[A, B, C any] struct {
metrics *Metrics
batchSize int
ctx context.Context
work []chunkWork
fetchers []Fetcher[A, B]
work [][]A

cur v1.ChunkRefWithIter
batch []chunk.Chunk
err error
mapper func(B) (C, error)
cur C
batch []B
err error
}

const batchedLoaderDefaultBatchSize = 50

func newBatchedLoader(ctx context.Context, work []chunkWork, batchSize int, metrics *Metrics) *batchedLoader {
return &batchedLoader{
metrics: metrics,
batchSize: batchSize,
func newBatchedLoader[A, B, C any](
ctx context.Context,
fetchers []Fetcher[A, B],
inputs [][]A,
mapper func(B) (C, error),
batchSize int,
) *batchedLoader[A, B, C] {
return &batchedLoader[A, B, C]{
batchSize: max(batchSize, 1),
ctx: ctx,
work: work,
fetchers: fetchers,
work: inputs,
mapper: mapper,
}
}

func (b *batchedLoader) Next() bool {
if len(b.batch) > 0 {
return b.prepNext(false)
}
func (b *batchedLoader[A, B, C]) Next() bool {

if len(b.work) == 0 {
return false
}
// iterate work until we have non-zero length batch
for len(b.batch) == 0 {

// setup next batch
next := b.work[0]
batchSize := min(b.batchSize, len(next.chks))
toFetch := next.chks[:batchSize]
// update work
b.work[0].chks = next.chks[batchSize:]
if len(b.work[0].chks) == 0 {
b.work = b.work[1:]
}
// empty batch + no work remaining = we're done
if len(b.work) == 0 {
return false
}

if len(toFetch) == 0 {
return false
}
// setup next batch
next := b.work[0]
batchSize := min(b.batchSize, len(next))
toFetch := next[:batchSize]
fetcher := b.fetchers[0]

// update work
b.work[0] = b.work[0][batchSize:]
if len(b.work[0]) == 0 {
// if we've exhausted work from this set of inputs,
// set pointer to next set of inputs
// and their respective fetcher
b.work = b.work[1:]
b.fetchers = b.fetchers[1:]
}

b.batch, b.err = next.fetcher.FetchChunks(b.ctx, toFetch)
if b.err != nil {
return false
// there was no work in this batch; continue (should not happen)
if len(toFetch) == 0 {
continue
}

b.batch, b.err = fetcher.Fetch(b.ctx, toFetch)
// error fetching, short-circuit iteration
if b.err != nil {
return false
}
}

return b.prepNext(true)
return b.prepNext()
}

func (b *batchedLoader) prepNext(checkLen bool) bool {
if checkLen && len(b.batch) == 0 {
return false
}
b.cur, b.err = b.format(b.batch[0])
func (b *batchedLoader[_, B, C]) prepNext() bool {
b.cur, b.err = b.mapper(b.batch[0])
b.batch = b.batch[1:]
return b.err == nil
}

func (b *batchedLoader) format(c chunk.Chunk) (v1.ChunkRefWithIter, error) {
chk := c.Data.(*chunkenc.Facade).LokiChunk()
b.metrics.chunkSize.Observe(float64(chk.UncompressedSize()))
itr, err := chk.Iterator(
b.ctx,
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
logql_log.NewNoopPipeline().ForStream(c.Metric),
)
func newBatchedChunkLoader(
ctx context.Context,
fetchers []Fetcher[chunk.Chunk, chunk.Chunk],
inputs [][]chunk.Chunk,
metrics *Metrics,
batchSize int,
) *batchedLoader[chunk.Chunk, chunk.Chunk, v1.ChunkRefWithIter] {

mapper := func(c chunk.Chunk) (v1.ChunkRefWithIter, error) {
chk := c.Data.(*chunkenc.Facade).LokiChunk()
metrics.chunkSize.Observe(float64(chk.UncompressedSize()))
itr, err := chk.Iterator(
ctx,
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
logql_log.NewNoopPipeline().ForStream(c.Metric),
)

if err != nil {
return v1.ChunkRefWithIter{}, err
}
if err != nil {
return v1.ChunkRefWithIter{}, err
}

return v1.ChunkRefWithIter{
Ref: v1.ChunkRef{
Start: c.From,
End: c.Through,
Checksum: c.Checksum,
},
Itr: itr,
}, nil
return v1.ChunkRefWithIter{
Ref: v1.ChunkRef{
Start: c.From,
End: c.Through,
Checksum: c.Checksum,
},
Itr: itr,
}, nil
}
return newBatchedLoader(ctx, fetchers, inputs, mapper, batchSize)
}

func (b *batchedLoader) At() v1.ChunkRefWithIter {
func (b *batchedLoader[_, _, C]) At() C {
return b.cur
}

func (b *batchedLoader) Err() error {
func (b *batchedLoader[_, _, _]) Err() error {
return b.err
}
Loading

0 comments on commit a5e562b

Please sign in to comment.