-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bloom-compactor] downloading chunks in batches #11649
[bloom-compactor] downloading chunks in batches #11649
Conversation
…f downloading all of them at once Signed-off-by: Vladyslav Diachenko <[email protected]>
Signed-off-by: Vladyslav Diachenko <[email protected]>
Signed-off-by: Vladyslav Diachenko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall lgtm
@@ -545,7 +545,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, | |||
// When already compacted metas exists, we need to merge all blocks with amending blooms with new series | |||
level.Info(logger).Log("msg", "already compacted metas exists, use mergeBlockBuilder") | |||
|
|||
var populate = createPopulateFunc(ctx, logger, job, storeClient, bt) | |||
var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to pass the limits because it's extendable...
pkg/bloomcompactor/bloomcompactor.go
Outdated
@@ -536,7 +536,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, | |||
} | |||
|
|||
fpRate := c.limits.BloomFalsePositiveRate(job.tenantID) | |||
resultingBlock, err = compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder) | |||
resultingBlock, err = compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder, c.limits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing down the limits, should we just pass the batchSize?
We also resolve the fpRate per tenant just before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it's better to pass the limits because we will always add additional parameters and it will make us to add additional function's incoming parameters... I will remove fpRate
and will resolve it inside the function.
@@ -22,7 +22,7 @@ import ( | |||
) | |||
|
|||
type compactorTokenizer interface { | |||
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunks []chunk.Chunk) error | |||
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunkBatchesIterator v1.Iterator[[]chunk.Chunk]) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can avoid modifying the argument of this method as well as its implementation at bloom_tokenizer.go.
We can call PopulateSeriesWithBloom for each call of the iterator Next(). E.g. on createPopulateFunc
batchesIterator, err := newChunkBatchesIterator(ctx, storeClient.chunk, chunkRefs, limits.BloomCompactorChunksBatchSize(job.tenantID))
if err != nil {
return fmt.Errorf("error creating chunks batches iterator: %w", err)
}
for batchesIterator.Next() {
if err := batchesIterator.Err() ...
chunks := batchesIterator.At()
err = bt.PopulateSeriesWithBloom(&bloomForChks, chunks)
}
err = bt.PopulateSeriesWithBloom(&bloomForChks, batchesIterator)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would affect the metrics we expose inside PopulateSeriesWithBloom
tho.
So what about PopulateSeriesWithBloom receiving an iterator of chunks (Iterator[chunk.Chunk]
) and doing the buffering inside the iterator implementation. That way the batching logic is present only in the iterator.
func (c *chunksBatchesIterator) loadNextBatch() error {
batchSize := c.batchSize
chunksToDownloadCount := len(c.chunksToDownload)
if chunksToDownloadCount < batchSize {
batchSize = chunksToDownloadCount
}
chunksToDownload := c.chunksToDownload[:batchSize]
c.chunksToDownload = c.chunksToDownload[batchSize:]
newBatch, err := c.client.GetChunks(c.context, chunksToDownload)
if err != nil {
return err
}
c.currentBatch = newBatch
return nil
}
func (c *chunksBatchesIterator) Next() bool {
if len(c.currentBatch) == 0 {
if len(c.chunksToDownload) == 0 {
return false
}
if c.err = c.loadNextBatch(); c.err != nil {
return false
}
}
// Pop the first chunk from the current batch and set it as the current chunk.
c.currentChunk = c.currentBatch[0]
c.currentBatch = c.currentBatch[1:]
return true
}
func (c *chunksBatchesIterator) At() chunk.Chunk {
return c.currentChunk
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it will do the same... but I do not believe that we win anything by changing it this way...
it just increases the complexity of the iterator.
let's say if we failed to download the batch, we will report an error, but this error is not connected to this particular chunk is returned by At()
function. so, it might confuse somebody in the future...
not really against it, but it looks almost the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, not a strong opinion!
Signed-off-by: Vladyslav Diachenko <[email protected]>
**What this PR does / why we need it**: Added chunks batches iterator to download chunks in batches instead of downloading all of them at once. Otherwise, when the stream contains a lot of chunks, it can lead to OOM. **Special notes for your reviewer**: **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](grafana@d10549e) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](grafana@0d4416a) --------- Signed-off-by: Vladyslav Diachenko <[email protected]>
What this PR does / why we need it:
Added chunks batches iterator to download chunks in batches instead of downloading all of them at once. Otherwise, when the stream contains a lot of chunks, it can lead to OOM.
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)CHANGELOG.md
updatedadd-to-release-notes
labeldocs/sources/setup/upgrade/_index.md
production/helm/loki/Chart.yaml
and updateproduction/helm/loki/CHANGELOG.md
andproduction/helm/loki/README.md
. Example PRdeprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR