Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Replay kafka from last commit before becoming ready #14330

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,20 @@ kafka_config:
# CLI flag: -kafka.producer-max-buffered-bytes
[producer_max_buffered_bytes: <int> | default = 1073741824]

# The best-effort maximum lag a consumer tries to achieve at startup. Set both
# -kafka.target-consumer-lag-at-startup and -kafka.max-consumer-lag-at-startup
# to 0 to disable waiting for maximum consumer lag being honored at startup.
# CLI flag: -kafka.target-consumer-lag-at-startup
[target_consumer_lag_at_startup: <duration> | default = 2s]

# The guaranteed maximum lag before a consumer is considered to have caught up
# reading from a partition at startup, becomes ACTIVE in the hash ring and
# passes the readiness check. Set both -kafka.target-consumer-lag-at-startup
# and -kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum
# consumer lag being honored at startup.
# CLI flag: -kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]

# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
[runtime_config: <runtime_config>]
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ type Ingester struct {
}

// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher ring.PartitionRingReader) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ingester
import (
"context"
"errors"
math "math"
"math"
"sync"
"time"

Expand Down
23 changes: 17 additions & 6 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,14 @@ const (
// in the worst case scenario, which is expected to be way above the actual one.
maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384
minProducerRecordDataBytesLimit = 1024 * 1024

kafkaConfigFlagPrefix = "ingest-storage.kafka"
targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".target-consumer-lag-at-startup"
maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".max-consumer-lag-at-startup"
)

var (
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)

consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp}
)

// Config holds the generic config for the Kafka backend.
Expand All @@ -68,6 +64,9 @@ type Config struct {

ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`

TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -91,6 +90,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {

f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

targetConsumerLagAtStartupFlag := prefix + ".target-consumer-lag-at-startup"
benclive marked this conversation as resolved.
Show resolved Hide resolved
maxConsumerLagAtStartupFlag := prefix + ".max-consumer-lag-at-startup"
howToDisableConsumerLagAtStartup := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", targetConsumerLagAtStartupFlag, maxConsumerLagAtStartupFlag)
benclive marked this conversation as resolved.
Show resolved Hide resolved
f.DurationVar(&cfg.TargetConsumerLagAtStartup, targetConsumerLagAtStartupFlag, 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+howToDisableConsumerLagAtStartup)
f.DurationVar(&cfg.MaxConsumerLagAtStartup, maxConsumerLagAtStartupFlag, 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+howToDisableConsumerLagAtStartup)
}

func (cfg *Config) Validate() error {
Expand All @@ -103,6 +108,12 @@ func (cfg *Config) Validate() error {
if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit {
return ErrInvalidProducerMaxRecordSizeBytes
}
if (cfg.TargetConsumerLagAtStartup != 0) != (cfg.MaxConsumerLagAtStartup != 0) {
benclive marked this conversation as resolved.
Show resolved Hide resolved
benclive marked this conversation as resolved.
Show resolved Hide resolved
return ErrInconsistentConsumerLagAtStartup
}
if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup {
return ErrInvalidMaxConsumerLagAtStartup
}

return nil
}
Expand Down
Loading
Loading