Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blockbuilder component #14621

Merged
merged 61 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
54b9b7e
blockbuilder pkg init
owen-d Oct 21, 2024
4edccef
specing out blockbuilder pkg
owen-d Oct 21, 2024
cd65c1f
unexport
owen-d Oct 22, 2024
838f95f
job building
owen-d Oct 22, 2024
62ca1d7
polling/process
owen-d Oct 22, 2024
79ff4da
sequential polling handling
owen-d Oct 22, 2024
a45e5be
writermetrics kafka pkg
owen-d Oct 22, 2024
8299cf7
blockbuilder partition committer
owen-d Oct 22, 2024
324ed0f
speccing out slimgester
owen-d Oct 23, 2024
754d961
slimgester instance work
owen-d Oct 23, 2024
2129bd3
tuning
owen-d Oct 24, 2024
239b517
flush queue start
owen-d Oct 24, 2024
543094b
fpmapper integration
owen-d Oct 24, 2024
901c159
svc scaffolding
owen-d Oct 24, 2024
56f4e41
workers, metrics integration
owen-d Oct 25, 2024
749455e
start working on slimgester tsdb creator
owen-d Oct 26, 2024
deba529
decoding, input alignment
owen-d Oct 28, 2024
7974367
pipeline
owen-d Oct 29, 2024
6ff0be8
commit offset
owen-d Oct 29, 2024
5e56f5a
minor refactoring + wiring up
owen-d Oct 29, 2024
ad28891
Merge remote-tracking branch 'upstream/main' into split-writes
owen-d Nov 5, 2024
9ae5bc5
merge cleanup
owen-d Nov 5, 2024
ef9f73d
cosmetic linting
owen-d Nov 5, 2024
51d5496
refactoring + CutRemainingChunks
owen-d Nov 6, 2024
9c96120
tsdb creator work + minor refactoring to tenant label address
owen-d Nov 6, 2024
52725e7
Merge remote-tracking branch 'upstream/main' into split-writes
owen-d Nov 6, 2024
312e273
exposing some tsdb shipper fns+types
owen-d Nov 6, 2024
96880be
plumbing + tsdb uploader + building multiple tsdbs per cycle
owen-d Nov 6, 2024
ee6a62d
better object store integration for writing indices
owen-d Nov 7, 2024
e3eefa3
some index bucket testing
owen-d Nov 7, 2024
684c50d
propagates lifecycler id
owen-d Nov 7, 2024
569f4e8
blockbuilder flag registration+deps defined
owen-d Nov 8, 2024
1ad155f
named pipeline stages
owen-d Nov 8, 2024
ba75ab7
dummy partition controller
owen-d Nov 8, 2024
bba20f0
use named stages, reconfig default values, always run one at beginning
owen-d Nov 8, 2024
2a218e9
parameterize interval
owen-d Nov 8, 2024
5590902
minor fixes: tenant heads resetting & mtx access
owen-d Nov 8, 2024
f80e060
parameterized chunk creation
owen-d Nov 8, 2024
08a5ee4
tenant tuning
owen-d Nov 8, 2024
63493fd
[revert-local] use kafka reader
owen-d Nov 12, 2024
edc24b7
isolates slimgester state across jobs
owen-d Nov 12, 2024
554d142
better pipelining
owen-d Nov 13, 2024
63f598a
cleanup uses pipeline ctx instead of stage ctx which is already cance…
owen-d Nov 13, 2024
6ad5e83
indexshipper.ModeDisabled for explicit indexing operations
owen-d Nov 13, 2024
029f91a
blockbuilder parameterized backoff
owen-d Nov 13, 2024
735ab08
hooks up blockbuilder backoff & disables indexshipper when block-buil…
owen-d Nov 13, 2024
b150592
move multistore into blockbuilder pkg
owen-d Nov 14, 2024
fb5b44f
Merge remote-tracking branch 'upstream/main' into split-writes
owen-d Nov 14, 2024
66bfd92
make format
owen-d Nov 14, 2024
295a310
config.md
owen-d Nov 14, 2024
7084e5a
Merge remote-tracking branch 'upstream/main' into split-writes
owen-d Nov 14, 2024
bebb944
more blockbuilder logging
owen-d Nov 14, 2024
6714603
lint
owen-d Nov 15, 2024
b734482
more instrumentation
owen-d Nov 15, 2024
26acba1
fixes to load job with valid offsets on the first run
ashwanthgoli Nov 21, 2024
812de32
Merge remote-tracking branch 'upstream/main' into split-writes
owen-d Nov 22, 2024
6a951c0
blockbuilder uses refactored partition lib
owen-d Nov 22, 2024
6c7233b
moves partition client creation into reader from readersvc + reintegr…
owen-d Nov 22, 2024
1df3297
make format
owen-d Nov 22, 2024
7c475e5
more linting
owen-d Nov 22, 2024
e8a93c9
removes unused file
owen-d Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,57 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# itself to a key value store.
[ingester: <ingester>]

block_builder:
# How many flushes can happen concurrently
# CLI flag: -blockbuilder.concurrent-flushes
[concurrent_flushes: <int> | default = 1]

# How many workers to process writes, defaults to number of available cpus
# CLI flag: -blockbuilder.concurrent-writers
[concurrent_writers: <int> | default = 1]

# The targeted _uncompressed_ size in bytes of a chunk block When this
# threshold is exceeded the head block will be cut and compressed inside the
# chunk.
# CLI flag: -blockbuilder.chunks-block-size
[chunk_block_size: <int> | default = 256KB]

# A target _compressed_ size in bytes for chunks. This is a desired size not
# an exact size, chunks may be slightly bigger or significantly smaller if
# they get flushed for other reasons (e.g. chunk_idle_period). A value of 0
# creates chunks with a fixed 10 blocks, a non zero value will create chunks
# with a variable number of blocks to meet the target size.
# CLI flag: -blockbuilder.chunk-target-size
[chunk_target_size: <int> | default = 1536KB]

# The algorithm to use for compressing chunk. (none, gzip, lz4-64k, snappy,
# lz4-256k, lz4-1M, lz4, flate, zstd)
# CLI flag: -blockbuilder.chunk-encoding
[chunk_encoding: <string> | default = "snappy"]

# The maximum duration of a timeseries chunk in memory. If a timeseries runs
# for longer than this, the current chunk will be flushed to the store and a
# new chunk created.
# CLI flag: -blockbuilder.max-chunk-age
[max_chunk_age: <duration> | default = 2h]

# The interval at which to run.
# CLI flag: -blockbuilder.interval
[interval: <duration> | default = 10m]

backoff_config:
# Minimum delay when backing off.
# CLI flag: -blockbuilder.backoff..backoff-min-period
[min_period: <duration> | default = 100ms]

# Maximum delay when backing off.
# CLI flag: -blockbuilder.backoff..backoff-max-period
[max_period: <duration> | default = 10s]

# Number of times to backoff and retry before failing.
# CLI flag: -blockbuilder.backoff..backoff-retries
[max_retries: <int> | default = 10]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
305 changes: 305 additions & 0 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package blockbuilder

import (
"context"
"fmt"
"time"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/dskit/backoff"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"

"github.com/grafana/loki/pkg/push"
)

// [min,max)
type Offsets struct {
Min, Max int64
}

type Job struct {
Partition int32
Offsets Offsets
}

// Interface required for interacting with queue partitions.
type PartitionController interface {
Topic() string
Partition() int32
// Returns the highest committed offset from the consumer group
HighestCommittedOffset(ctx context.Context) (int64, error)
// Returns the highest available offset in the partition
HighestPartitionOffset(ctx context.Context) (int64, error)
// Returns the earliest available offset in the partition
EarliestPartitionOffset(ctx context.Context) (int64, error)
// Commits the offset to the consumer group.
Commit(context.Context, int64) error
// Process will run load batches at a time and send them to channel,
// so it's advised to not buffer the channel for natural backpressure.
// As a convenience, it returns the last seen offset, which matches
// the final record sent on the channel.
Process(context.Context, Offsets, chan<- []AppendInput) (int64, error)

Close() error
}

// PartitionJobController loads a single job a time, bound to a given
// * topic
// * partition
// * offset_step_len: the number of offsets each job to contain. e.g. "10" could yield a job w / min=15, max=25
//
// At a high level, it watches a source topic/partition (where log data is ingested) and a "committed" topic/partition.
// The "committed" partition corresponds to the offsets from the source partition which have been committed to object storage.
// In essence, the following loop is performed
// 1. load the most recent record from the "committed" partition. This contains the highest msg offset in the "source" partition
// that has been committed to object storage. We'll call that $START_POS.
// 2. Create a job with `min=$START_POS+1,end=$START_POS+1+$STEP_LEN`
// 3. Sometime later when the job has been processed, we'll commit the final processed offset from the "source" partition (which
// will be <= $END_POS) to the "committed" partition.
//
// NB(owen-d): In our case, "source" is the partition
//
// containing log data and "committed" is the consumer group
type PartitionJobController struct {
stepLen int64
part partition.ReaderIfc
backoff backoff.Config
decoder *kafka.Decoder
}

func NewPartitionJobController(
controller partition.ReaderIfc,
backoff backoff.Config,
) (*PartitionJobController, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}
return &PartitionJobController{
stepLen: 1000, // Default step length of 1000 offsets per job
part: controller,
backoff: backoff,
decoder: decoder,
}, nil
}

func (l *PartitionJobController) HighestCommittedOffset(ctx context.Context) (int64, error) {
return withBackoff(
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchLastCommittedOffset(ctx)
},
)
}

func (l *PartitionJobController) HighestPartitionOffset(ctx context.Context) (int64, error) {
return withBackoff(
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaEndOffset)
},
)
}

func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (int64, error) {
return withBackoff(
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaStartOffset)
},
)
}

func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
l.part.SetOffsetForConsumption(offsets.Min)

var (
lastOffset = offsets.Min - 1
boff = backoff.New(ctx, l.backoff)
err error
)

for boff.Ongoing() {
var records []partition.Record
records, err = l.part.Poll(ctx)
if err != nil {
boff.Wait()
continue
}

if len(records) == 0 {
// No more records available
break
}

// Reset backoff on successful poll
boff.Reset()

converted := make([]AppendInput, 0, len(records))
for _, record := range records {
offset := records[len(records)-1].Offset
if offset >= offsets.Max {
break
}
lastOffset = offset

stream, labels, err := l.decoder.Decode(record.Content)
if err != nil {
return 0, fmt.Errorf("failed to decode record: %w", err)
}
if len(stream.Entries) == 0 {
continue
}

converted = append(converted, AppendInput{
tenant: record.TenantID,
labels: labels,
labelsStr: stream.Labels,
entries: stream.Entries,
})

select {
case ch <- converted:
case <-ctx.Done():
return 0, ctx.Err()
}
}
}

return lastOffset, err
}

// LoadJob(ctx) returns the next job by finding the most recent unconsumed offset in the partition
// Returns whether an applicable job exists, the job, and an error
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) {
// Read the most recent committed offset
committedOffset, err := l.HighestCommittedOffset(ctx)
if err != nil {
return false, Job{}, err
}

earliestOffset, err := l.EarliestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
}

startOffset := committedOffset + 1
if startOffset < earliestOffset {
startOffset = earliestOffset
}

highestOffset, err := l.HighestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
}
if highestOffset == committedOffset {
return false, Job{}, nil
}

// Create the job with the calculated offsets
job := Job{
Partition: l.part.Partition(),
Offsets: Offsets{
Min: startOffset,
Max: min(startOffset+l.stepLen, highestOffset),
},
}

return true, job, nil
}

// implement a dummy controller which can be parameterized to
// deterministically simulate partitions
type dummyPartitionController struct {
topic string
partition int32
committed int64
highest int64
numTenants int // number of unique tenants to simulate
streamsPerTenant int // number of streams per tenant
entriesPerOffset int // coefficient for entries per offset
}

// used in testing
// nolint:revive
func NewDummyPartitionController(topic string, partition int32, highest int64) *dummyPartitionController {
return &dummyPartitionController{
topic: topic,
partition: partition,
committed: 0, // always starts at zero
highest: highest,
numTenants: 2, // default number of tenants
streamsPerTenant: 2, // default streams per tenant
entriesPerOffset: 1, // default entries per offset coefficient
}
}

func (d *dummyPartitionController) Topic() string {
return d.topic
}

func (d *dummyPartitionController) Partition() int32 {
return d.partition
}

func (d *dummyPartitionController) HighestCommittedOffset(_ context.Context) (int64, error) {
return d.committed, nil
}

func (d *dummyPartitionController) HighestPartitionOffset(_ context.Context) (int64, error) {
return d.highest, nil
}

func (d *dummyPartitionController) Commit(_ context.Context, offset int64) error {
d.committed = offset
return nil
}

func (d *dummyPartitionController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
for i := int(offsets.Min); i < int(offsets.Max); i++ {
batch := d.createBatch(i)
select {
case <-ctx.Done():
return int64(i - 1), ctx.Err()
case ch <- batch:
}
}
return offsets.Max - 1, nil
}

// creates (tenants*streams) inputs
func (d *dummyPartitionController) createBatch(offset int) []AppendInput {
result := make([]AppendInput, 0, d.numTenants*d.streamsPerTenant)
for i := 0; i < d.numTenants; i++ {
tenant := fmt.Sprintf("tenant-%d", i)
for j := 0; j < d.streamsPerTenant; j++ {
lbls := labels.Labels{
{Name: "stream", Value: fmt.Sprintf("stream-%d", j)},
}
entries := make([]push.Entry, d.entriesPerOffset)
for k := 0; k < d.entriesPerOffset; k++ {
entries[k] = push.Entry{
Timestamp: time.Now(),
Line: fmt.Sprintf("tenant=%d stream=%d line=%d offset=%d", i, j, k, offset),
}
}
result = append(result, AppendInput{
tenant: tenant,
labels: lbls,
labelsStr: lbls.String(),
entries: entries,
})
}
}
return result
}

func (d *dummyPartitionController) Close() error {
return nil
}
Loading
Loading