Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bloom-compactor] downloading chunks in batches #11649

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3094,6 +3094,10 @@ shard_streams:
# CLI flag: -bloom-compactor.enable-compaction
[bloom_compactor_enable_compaction: <boolean> | default = false]

# The batch size of the chunks the bloom-compactor downloads at once.
# CLI flag: -bloom-compactor.chunks-batch-size
[bloom_compactor_chunks_batch_size: <int> | default = 100]

# Length of the n-grams created when computing blooms from log lines.
# CLI flag: -bloom-compactor.ngram-length
[bloom_ngram_length: <int> | default = 4]
Expand Down
5 changes: 2 additions & 3 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

fpRate := c.limits.BloomFalsePositiveRate(job.tenantID)
resultingBlock, err = compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder)
resultingBlock, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
if err != nil {
return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err)
}
Expand All @@ -545,7 +544,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
// When already compacted metas exists, we need to merge all blocks with amending blooms with new series
level.Info(logger).Log("msg", "already compacted metas exists, use mergeBlockBuilder")

var populate = createPopulateFunc(ctx, logger, job, storeClient, bt)
var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to pass the limits because it's extendable...


seriesIter := makeSeriesIterFromSeriesMeta(job)

Expand Down
53 changes: 26 additions & 27 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

type compactorTokenizer interface {
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunks []chunk.Chunk) error
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunkBatchesIterator v1.Iterator[[]chunk.Chunk]) error
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can avoid modifying the argument of this method as well as its implementation at bloom_tokenizer.go.

We can call PopulateSeriesWithBloom for each call of the iterator Next(). E.g. on createPopulateFunc

batchesIterator, err := newChunkBatchesIterator(ctx, storeClient.chunk, chunkRefs, limits.BloomCompactorChunksBatchSize(job.tenantID))
if err != nil {
	return fmt.Errorf("error creating chunks batches iterator: %w", err)
}
for batchesIterator.Next() {
   if err := batchesIterator.Err() ...
   chunks := batchesIterator.At()
   err = bt.PopulateSeriesWithBloom(&bloomForChks, chunks)
}
err = bt.PopulateSeriesWithBloom(&bloomForChks, batchesIterator)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would affect the metrics we expose inside PopulateSeriesWithBloom tho.

So what about PopulateSeriesWithBloom receiving an iterator of chunks (Iterator[chunk.Chunk]) and doing the buffering inside the iterator implementation. That way the batching logic is present only in the iterator.

func (c *chunksBatchesIterator) loadNextBatch() error {
	batchSize := c.batchSize
	chunksToDownloadCount := len(c.chunksToDownload)
	if chunksToDownloadCount < batchSize {
		batchSize = chunksToDownloadCount
	}
	chunksToDownload := c.chunksToDownload[:batchSize]
	c.chunksToDownload = c.chunksToDownload[batchSize:]

	newBatch, err := c.client.GetChunks(c.context, chunksToDownload)
	if err != nil {
		return err
	}

	c.currentBatch = newBatch
	return nil
}

func (c *chunksBatchesIterator) Next() bool {
	if len(c.currentBatch) == 0 {
		if len(c.chunksToDownload) == 0 {
			return false
		}
		if c.err = c.loadNextBatch(); c.err != nil {
			return false
		}
	}

	// Pop the first chunk from the current batch and set it as the current chunk.
	c.currentChunk = c.currentBatch[0]
	c.currentBatch = c.currentBatch[1:]

	return true
}

func (c *chunksBatchesIterator) At() chunk.Chunk {
	return c.currentChunk
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it will do the same... but I do not believe that we win anything by changing it this way...
it just increases the complexity of the iterator.
let's say if we failed to download the batch, we will report an error, but this error is not connected to this particular chunk is returned by At() function. so, it might confuse somebody in the future...
not really against it, but it looks almost the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, not a strong opinion!

}

type chunkClient interface {
Expand Down Expand Up @@ -86,7 +86,7 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing
return chunkRefs
}

func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks []chunk.Chunk) (v1.SeriesWithBloom, error) {
func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) {
// Create a bloom for this series
bloomForChks := v1.SeriesWithBloom{
Series: &v1.Series{
Expand Down Expand Up @@ -155,21 +155,20 @@ func createLocalDirName(workingDir string, job Job) string {
}

// Compacts given list of chunks, uploads them to storage and returns a list of bloomBlocks
func compactNewChunks(
ctx context.Context,
func compactNewChunks(ctx context.Context,
logger log.Logger,
job Job,
fpRate float64,
bt compactorTokenizer,
storeClient chunkClient,
builder blockBuilder,
limits Limits,
) (bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
}

bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, fpRate, logger)
bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits)

// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
Expand All @@ -182,13 +181,14 @@ func compactNewChunks(
}

type lazyBloomBuilder struct {
ctx context.Context
metas v1.Iterator[seriesMeta]
tenant string
client chunkClient
bt compactorTokenizer
fpRate float64
logger log.Logger
ctx context.Context
metas v1.Iterator[seriesMeta]
tenant string
client chunkClient
bt compactorTokenizer
fpRate float64
logger log.Logger
chunksBatchSize int

cur v1.SeriesWithBloom // retured by At()
err error // returned by Err()
Expand All @@ -198,15 +198,16 @@ type lazyBloomBuilder struct {
// which are used by the blockBuilder to write a bloom block.
// We use an interator to avoid loading all blooms into memory first, before
// building the block.
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, fpRate float64, logger log.Logger) *lazyBloomBuilder {
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, logger log.Logger, limits Limits) *lazyBloomBuilder {
return &lazyBloomBuilder{
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: fpRate,
logger: logger,
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: limits.BloomFalsePositiveRate(job.tenantID),
logger: logger,
chunksBatchSize: limits.BloomCompactorChunksBatchSize(job.tenantID),
}
}

Expand All @@ -218,20 +219,18 @@ func (it *lazyBloomBuilder) Next() bool {
}
meta := it.metas.At()

// Get chunks data from list of chunkRefs
chks, err := it.client.GetChunks(it.ctx, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP))
batchesIterator, err := newChunkBatchesIterator(it.ctx, it.client, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP), it.chunksBatchSize)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in getChunks", err)
level.Debug(it.logger).Log("msg", "err creating chunks batches iterator", "err", err)
return false
}

it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, chks)
it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, batchesIterator)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in buildBloomFromSeries", err)
level.Debug(it.logger).Log("msg", "err in buildBloomFromSeries", "err", err)
return false
}
return true
Expand Down
12 changes: 7 additions & 5 deletions pkg/bloomcompactor/chunkcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestChunkCompactor_BuildBloomFromSeries(t *testing.T) {
chunks := []chunk.Chunk{createTestChunk(fp, label)}

mbt := mockBloomTokenizer{}
bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, chunks)
bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, v1.NewSliceIter([][]chunk.Chunk{chunks}))
require.NoError(t, err)
require.Equal(t, seriesMeta.seriesFP, bloom.Series.Fingerprint)
require.Equal(t, chunks, mbt.chunks)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) {
pbb := mockPersistentBlockBuilder{}

// Run Compaction
compactedBlock, err := compactNewChunks(context.Background(), logger, job, fpRate, &mbt, &mcc, &pbb)
compactedBlock, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, &pbb, mockLimits{fpRate: fpRate})

// Validate Compaction Succeeds
require.NoError(t, err)
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestLazyBloomBuilder(t *testing.T) {
mbt := &mockBloomTokenizer{}
mcc := &mockChunkClient{}

it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, fpRate, logger)
it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, logger, mockLimits{chunksDownloadingBatchSize: 10, fpRate: fpRate})

// first seriesMeta has 1 chunks
require.True(t, it.Next())
Expand Down Expand Up @@ -199,8 +199,10 @@ type mockBloomTokenizer struct {
chunks []chunk.Chunk
}

func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c []chunk.Chunk) error {
mbt.chunks = append(mbt.chunks, c...)
func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c v1.Iterator[[]chunk.Chunk]) error {
for c.Next() {
mbt.chunks = append(mbt.chunks, c.At()...)
}
return nil
}

Expand Down
48 changes: 48 additions & 0 deletions pkg/bloomcompactor/chunksbatchesiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package bloomcompactor

import (
"context"
"errors"

"github.com/grafana/loki/pkg/storage/chunk"
)

type chunksBatchesIterator struct {
context context.Context
client chunkClient
chunksToDownload []chunk.Chunk
batchSize int

currentBatch []chunk.Chunk
err error
}

func newChunkBatchesIterator(context context.Context, client chunkClient, chunksToDownload []chunk.Chunk, batchSize int) (*chunksBatchesIterator, error) {
if batchSize <= 0 {
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("batchSize must be greater than 0")
}
return &chunksBatchesIterator{context: context, client: client, chunksToDownload: chunksToDownload, batchSize: batchSize}, nil
}

func (c *chunksBatchesIterator) Next() bool {
if len(c.chunksToDownload) == 0 {
return false
}
batchSize := c.batchSize
chunksToDownloadCount := len(c.chunksToDownload)
if chunksToDownloadCount < batchSize {
batchSize = chunksToDownloadCount
}
chunksToDownload := c.chunksToDownload[:batchSize]
c.chunksToDownload = c.chunksToDownload[batchSize:]
c.currentBatch, c.err = c.client.GetChunks(c.context, chunksToDownload)
return c.err == nil
}

func (c *chunksBatchesIterator) Err() error {
return c.err
}

func (c *chunksBatchesIterator) At() []chunk.Chunk {
return c.currentBatch
}
96 changes: 96 additions & 0 deletions pkg/bloomcompactor/chunksbatchesiterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package bloomcompactor

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk"
tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

func Test_chunksBatchesIterator(t *testing.T) {
tests := map[string]struct {
batchSize int
chunksToDownload []chunk.Chunk
constructorError error

hadNextCount int
}{
"expected error if batch size is set to 0": {
batchSize: 0,
constructorError: errors.New("batchSize must be greater than 0"),
},
"expected no error if there are no chunks": {
hadNextCount: 0,
batchSize: 10,
},
"expected 1 call to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 1,
batchSize: 20,
},
"expected 1 call to the client(2)": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 1,
batchSize: 10,
},
"expected 2 calls to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 2,
batchSize: 6,
},
"expected 10 calls to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 10,
batchSize: 1,
},
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
client := &fakeClient{}
iterator, err := newChunkBatchesIterator(context.Background(), client, data.chunksToDownload, data.batchSize)
if data.constructorError != nil {
require.Equal(t, err, data.constructorError)
return
}
hadNextCount := 0
var downloadedChunks []chunk.Chunk
for iterator.Next() {
hadNextCount++
downloaded := iterator.At()
downloadedChunks = append(downloadedChunks, downloaded...)
require.LessOrEqual(t, len(downloaded), data.batchSize)
}
require.NoError(t, iterator.Err())
require.Equal(t, data.chunksToDownload, downloadedChunks)
require.Equal(t, data.hadNextCount, client.callsCount)
require.Equal(t, data.hadNextCount, hadNextCount)
})
}
}

func createFakeChunks(count int) []chunk.Chunk {
metas := make([]tsdbindex.ChunkMeta, 0, count)
for i := 0; i < count; i++ {
metas = append(metas, tsdbindex.ChunkMeta{
Checksum: uint32(i),
MinTime: int64(i),
MaxTime: int64(i + 100),
KB: uint32(i * 100),
Entries: uint32(i * 10),
})
}
return makeChunkRefs(metas, "fake", 0xFFFF)
}

type fakeClient struct {
callsCount int
}

func (f *fakeClient) GetChunks(_ context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
f.callsCount++
return chunks, nil
}
1 change: 1 addition & 0 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorChunksBatchSize(userID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloomcompactor/mergecompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomcompactor

import (
"context"
"fmt"

"github.com/grafana/dskit/concurrency"

Expand Down Expand Up @@ -74,7 +75,7 @@ func makeBlockIterFromBlocks(ctx context.Context, logger log.Logger,
return blockIters, blockPaths, nil
}

func createPopulateFunc(ctx context.Context, logger log.Logger, job Job, storeClient storeClient, bt *v1.BloomTokenizer) func(series *v1.Series, bloom *v1.Bloom) error {
func createPopulateFunc(ctx context.Context, job Job, storeClient storeClient, bt *v1.BloomTokenizer, limits Limits) func(series *v1.Series, bloom *v1.Bloom) error {
return func(series *v1.Series, bloom *v1.Bloom) error {
bloomForChks := v1.SeriesWithBloom{
Series: series,
Expand All @@ -95,12 +96,11 @@ func createPopulateFunc(ctx context.Context, logger log.Logger, job Job, storeCl
}
}

chks, err := storeClient.chunk.GetChunks(ctx, chunkRefs)
batchesIterator, err := newChunkBatchesIterator(ctx, storeClient.chunk, chunkRefs, limits.BloomCompactorChunksBatchSize(job.tenantID))
if err != nil {
level.Error(logger).Log("msg", "failed downloading chunks", "err", err)
return err
return fmt.Errorf("error creating chunks batches iterator: %w", err)
}
err = bt.PopulateSeriesWithBloom(&bloomForChks, chks)
err = bt.PopulateSeriesWithBloom(&bloomForChks, batchesIterator)
if err != nil {
return err
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/bloomcompactor/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,22 @@ func TestShuffleSharding(t *testing.T) {

type mockLimits struct {
*validation.Overrides
bloomCompactorShardSize int
bloomCompactorShardSize int
chunksDownloadingBatchSize int
fpRate float64
}

func (m mockLimits) BloomFalsePositiveRate(_ string) float64 {
return m.fpRate
}

func (m mockLimits) BloomCompactorShardSize(_ string) int {
return m.bloomCompactorShardSize
}

func (m mockLimits) BloomCompactorChunksBatchSize(_ string) int {
if m.chunksDownloadingBatchSize != 0 {
return m.chunksDownloadingBatchSize
}
return 1
}
Loading
Loading