Skip to content

Commit

Permalink
refactor: kafka offset signals (#15201)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Dec 2, 2024
1 parent a10140d commit 3a5bed4
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
)

const (
kafkaStartOffset = -2
kafkaEndOffset = -1

phaseStarting = "starting"
phaseRunning = "running"
)
Expand Down Expand Up @@ -111,7 +108,7 @@ func newReaderService(
consumerFactory: consumerFactory,
logger: log.With(logger, "partition", reader.Partition(), "consumer_group", reader.ConsumerGroup()),
metrics: newServiceMetrics(reg),
lastProcessedOffset: kafkaEndOffset,
lastProcessedOffset: int64(KafkaEndOffset),
}

// Create the committer
Expand All @@ -135,12 +132,12 @@ func (s *ReaderService) starting(ctx context.Context) error {
}

if lastCommittedOffset == int64(KafkaEndOffset) {
level.Warn(logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
level.Warn(logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", KafkaStartOffset))
} else {
level.Debug(logger).Log("msg", "last committed offset", "offset", lastCommittedOffset)
}

consumeOffset := int64(kafkaStartOffset)
consumeOffset := int64(KafkaStartOffset)
if lastCommittedOffset >= 0 {
// Read from the next offset.
consumeOffset = lastCommittedOffset + 1
Expand Down Expand Up @@ -222,7 +219,7 @@ func (s *ReaderService) fetchUntilLagSatisfied(

for b.Ongoing() {
// Send a direct request to the Kafka backend to fetch the partition start offset.
partitionStartOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaStartOffset)
partitionStartOffset, err := s.reader.FetchPartitionOffset(ctx, KafkaStartOffset)
if err != nil {
level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err)
b.Wait()
Expand All @@ -240,7 +237,7 @@ func (s *ReaderService) fetchUntilLagSatisfied(
// We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further
// latency.
lastProducedOffsetRequestedAt := time.Now()
lastProducedOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaEndOffset)
lastProducedOffset, err := s.reader.FetchPartitionOffset(ctx, KafkaEndOffset)
if err != nil {
level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err)
b.Wait()
Expand Down

0 comments on commit 3a5bed4

Please sign in to comment.