Skip to content

Commit

Permalink
more blockbuilder logging
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Nov 14, 2024
1 parent 7084e5a commit bebb944
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 6 deletions.
28 changes: 27 additions & 1 deletion pkg/blockbuilder/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewPartitionReader(
writerMetrics := partition.NewCommitterMetrics(r, partitionID)
group := kafkaCfg.GetConsumerGroup(instanceID, partitionID)

logger = log.With(logger, "component", "partition_reader")
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
Expand Down Expand Up @@ -110,6 +111,14 @@ func (r *partitionReader) fetchPartitionOffset(ctx context.Context, position int

// Ensure no error occurred.
res := resps[0]

level.Debug(r.logger).Log(
"msg", "fetched partition offset",
"partition", r.partitionID,
"position", position,
"topic", r.topic,
"err", res.Err,
)
if res.Err != nil {
return 0, res.Err
}
Expand Down Expand Up @@ -194,7 +203,18 @@ func (r *partitionReader) fetchLastCommittedOffset(ctx context.Context) int64 {
return kafkaStartOffset
}

return fetchRes.Groups[0].Topics[0].Partitions[0].Offset
position := fetchRes.Groups[0].Topics[0].Partitions[0].Offset

level.Debug(r.logger).Log(
"msg", "fetched last committed offset",
"partition", r.partitionID,
"position", position,
"topic", r.topic,
"group", r.group,
"err", res.Err,
)

return position
}

func (r *partitionReader) updateReaderOffset(offset int64) {
Expand Down Expand Up @@ -313,6 +333,12 @@ func (r *partitionReader) Process(ctx context.Context, offsets Offsets, ch chan<

for boff.Ongoing() {
fetches, done := r.poll(ctx, offsets.Max)
level.Debug(r.logger).Log(
"msg", "polling kafka",
"records", len(fetches),
"done", done,
"max_offset", offsets.Max,
)
if len(fetches) > 0 {
lastOffset = fetches[len(fetches)-1].Offset
converted := make([]AppendInput, 0, len(fetches))
Expand Down
53 changes: 48 additions & 5 deletions pkg/blockbuilder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ func (i *BlockBuilder) running(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-ticker.C:
_, err := i.runOne(ctx)
skipped, err := i.runOne(ctx)
level.Info(i.logger).Log(
"msg", "completed block builder run", "skipped",
"skipped", skipped,
"err", err,
)
if err != nil {
return err
}
Expand All @@ -171,13 +176,20 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
return true, nil
}

logger := log.With(
i.logger,
"partition", job.Partition,
"job_min_offset", job.Offsets.Min,
"job_max_offset", job.Offsets.Max,
)

indexer := newTsdbCreator()
appender := newAppender(i.id,
i.cfg,
i.periodicConfigs,
i.store,
i.objStore,
i.logger,
logger,
i.metrics,
)

Expand All @@ -195,7 +207,11 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
lastOffset, err = i.jobController.part.Process(ctx, job.Offsets, inputCh)
return err
},
func(_ context.Context) error {
func(ctx context.Context) error {
level.Debug(logger).Log(
"msg", "finished loading records",
"ctx_error", ctx.Err(),
)
close(inputCh)
return nil
},
Expand Down Expand Up @@ -224,7 +240,7 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
for _, input := range inputs {
cut, err := appender.Append(ctx, input)
if err != nil {
level.Error(i.logger).Log("msg", "failed to append records", "err", err)
level.Error(logger).Log("msg", "failed to append records", "err", err)
return err
}

Expand All @@ -239,7 +255,14 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
}
}
},
func(ctx context.Context) error {
func(ctx context.Context) (err error) {
defer func() {
level.Debug(logger).Log(
"msg", "finished appender",
"err", err,
"ctx_error", ctx.Err(),
)
}()
defer close(flush)

// once we're done appending, cut all remaining chunks.
Expand Down Expand Up @@ -306,6 +329,10 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
)

err = p.Run()
level.Debug(logger).Log(
"msg", "finished chunk creation",
"err", err,
)
if err != nil {
return false, err
}
Expand All @@ -330,12 +357,28 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {

return false, err
}

level.Debug(logger).Log(
"msg", "uploaded tsdb",
"name", db.id.Name(),
)
}

if err = i.jobController.part.Commit(ctx, lastOffset); err != nil {
level.Error(logger).Log(
"msg", "failed to commit offset",
"last_offset", lastOffset,
"err", err,
)
return false, err
}

// log success
level.Info(logger).Log(
"msg", "successfully processed and committed batch",
"last_offset", lastOffset,
)

return false, nil
}

Expand Down

0 comments on commit bebb944

Please sign in to comment.