Skip to content

Commit

Permalink
Add partition ring to kafka ingest path
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Aug 28, 2024
1 parent 983d361 commit 6b2c69a
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 53 deletions.
78 changes: 70 additions & 8 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ import (
"net/http"
"os"
"path"
"regexp"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"

"github.com/grafana/dskit/kv"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool"
"github.com/grafana/loki/v3/pkg/ingester-rf1/kafka"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/ingester-rf1/partitionring"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/storage"
Expand Down Expand Up @@ -62,6 +68,7 @@ var (
compressionStats = analytics.NewString("ingester_compression")
targetSizeStats = analytics.NewInt("ingester_target_size_bytes")
activeTenantsStats = analytics.NewInt("ingester_active_tenants")
ingesterIdRegexp = regexp.MustCompile("ingester(-rf1)-([0-9]+)")

Check warning on line 71 in pkg/ingester-rf1/ingester.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

var-naming: var ingesterIdRegexp should be ingesterIDRegexp (revive)
)

// Config for an ingester.
Expand Down Expand Up @@ -98,6 +105,10 @@ type Config struct {
// Tee configs
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
factory ring_client.PoolFactory `yaml:"-"`

// Used for the kafka ingestion path
PartitionRingConfig partitionring.PartitionRingConfig `yaml:"partition_ring" category:"experimental"`
KafkaConfig kafka.Config
}

// RegisterFlags registers the flags.
Expand Down Expand Up @@ -213,9 +224,9 @@ type Ingester struct {

customStreamsTracker push.UsageTracker

// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
// recalculateOwnedStreams *recalculateOwnedStreams
readRing ring.ReadRing
ingestPartitionId int32

Check warning on line 228 in pkg/ingester-rf1/ingester.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

var-naming: struct field ingestPartitionId should be ingestPartitionID (revive)
partitionRingLifecycler *ring.PartitionInstanceLifecycler
}

// New makes a new Ingester.
Expand Down Expand Up @@ -271,6 +282,29 @@ func New(cfg Config, clientConfig client.Config,
wal: walManager,
}

if cfg.KafkaConfig.Enabled {
i.ingestPartitionId, err = ingesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
}

partitionRingKV := cfg.LifecyclerConfig.RingConfig.KVStore.Mock
if partitionRingKV == nil {
partitionRingKV, err = kv.NewClient(cfg.LifecyclerConfig.RingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, "partition-ring-lifecycler"), logger)
if err != nil {
return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err)
}
}

i.partitionRingLifecycler = ring.NewPartitionInstanceLifecycler(
cfg.PartitionRingConfig.ToLifecyclerConfig(i.ingestPartitionId, cfg.LifecyclerConfig.ID),
"partition-ring",
"partition-ring-key",
partitionRingKV,
logger,
prometheus.WrapRegistererWithPrefix("loki_", registerer))
}

// TODO: change flush on shutdown
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester-rf1", "ingester-rf1-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer))
if err != nil {
Expand All @@ -279,6 +313,9 @@ func New(cfg Config, clientConfig client.Config,

i.lifecyclerWatcher = services.NewFailureWatcher()
i.lifecyclerWatcher.WatchService(i.lifecycler)
if i.partitionRingLifecycler != nil {
i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler)
}

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
Expand All @@ -293,6 +330,25 @@ func New(cfg Config, clientConfig client.Config,
return i, nil
}

// ingesterPartitionID returns the partition ID owner the the given ingester.
func ingesterPartitionID(ingesterID string) (int32, error) {
if strings.Contains(ingesterID, "local") {
return 0, nil
}

match := ingesterIdRegexp.FindStringSubmatch(ingesterID)
if len(match) == 0 {
return 0, fmt.Errorf("ingester ID %s doesn't match regular expression %q", ingesterID, ingesterIdRegexp.String())
}
// Parse the ingester sequence number.
ingesterSeq, err := strconv.Atoi(match[1])
if err != nil {
return 0, fmt.Errorf("no ingester sequence number in ingester ID %s", ingesterID)
}

return int32(ingesterSeq), nil
}

// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
Expand Down Expand Up @@ -383,6 +439,17 @@ func (i *Ingester) starting(ctx context.Context) error {
return err
}

if i.cfg.KafkaConfig.Enabled {
err = i.partitionRingLifecycler.StartAsync(context.Background())
if err != nil {
return err
}
err = i.partitionRingLifecycler.AwaitRunning(ctx)
if err != nil {
return err
}
}

shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename)
shutdownMarker, err := shutdownMarkerExists(shutdownMarkerPath)
if err != nil {
Expand All @@ -399,11 +466,6 @@ func (i *Ingester) starting(ctx context.Context) error {
// return fmt.Errorf("can not start recalculate owned streams service: %w", err)
//}

err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err)
}

go i.periodicStreamMaintenance()
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ingesterrf1
package kafka

import (
"context"
"errors"
"flag"
"fmt"
"math"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -20,10 +22,22 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
)

const writeTimeout = time.Minute

type Config struct {
Enabled bool `yaml:"enabled" docs:"whether the kafka ingest path is enabled"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "enabled", false, "enables the use of the Kafka ingest path")
}

type KafkaTee struct {

Check warning on line 36 in pkg/ingester-rf1/kafka/kafka_tee.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exported: type name will be used as kafka.KafkaTee by other packages, and that stutters; consider calling this Tee (revive)
cfg Config
logger log.Logger
kafkaClient *kgo.Client
cfg Config
logger log.Logger
kafkaClient *kgo.Client
partitionRing *ring.PartitionInstanceRing

ingesterAppends *prometheus.CounterVec
}
Expand All @@ -33,6 +47,7 @@ func NewKafkaTee(
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
partitionRing *ring.PartitionInstanceRing,
) (*KafkaTee, error) {
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)

Expand Down Expand Up @@ -98,13 +113,14 @@ func NewKafkaTee(
}

t := &KafkaTee{
logger: log.With(logger, "component", "ingester-rf1-tee"),
logger: log.With(logger, "component", "kafka-tee"),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "ingester_rf1_appends_total",
Help: "The total number of batch appends sent to rf1 ingesters.",
}, []string{"ingester", "status"}),
cfg: cfg,
kafkaClient: kafkaClient,
Name: "kafka_ingester_appends_total",
Help: "The total number of appends sent to kafka ingest path.",
}, []string{"partition", "status"}),
cfg: cfg,
kafkaClient: kafkaClient,
partitionRing: partitionRing,
}

return t, nil
Expand All @@ -122,10 +138,14 @@ func (t *KafkaTee) Duplicate(tenant string, streams []distributor.KeyedStream) {
}

func (t *KafkaTee) sendStream(tenant string, stream distributor.KeyedStream) error {
partitionID := stream.HashKey % 3
records, err := marshalWriteRequestToRecords(int32(partitionID), tenant, stream.Stream, 1024*1024)
partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
if err != nil {
t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc()
return fmt.Errorf("failed to find active partition for stream: %w", err)
}
records, err := marshalWriteRequestToRecords(partitionID, tenant, stream.Stream, 1024*1024)

ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), t.cfg.ClientConfig.RemoteTimeout)
ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout)
defer cancel()
produceResults := t.kafkaClient.ProduceSync(ctx, records...)

Expand Down
47 changes: 47 additions & 0 deletions pkg/ingester-rf1/partitionring/partition_ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package partitionring

import (
"flag"
"time"

"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
)

type PartitionRingConfig struct {

Check warning on line 11 in pkg/ingester-rf1/partitionring/partition_ring.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exported: type name will be used as partitionring.PartitionRingConfig by other packages, and that stutters; consider calling this Config (revive)
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances. This option needs be set on ingesters, distributors, queriers, and rulers when running in microservices mode."`

// MinOwnersCount maps to ring.PartitionInstanceLifecyclerConfig's WaitOwnersCountOnPending.
MinOwnersCount int `yaml:"min_partition_owners_count"`

// MinOwnersDuration maps to ring.PartitionInstanceLifecyclerConfig's WaitOwnersDurationOnPending.
MinOwnersDuration time.Duration `yaml:"min_partition_owners_duration"`

// DeleteInactivePartitionAfter maps to ring.PartitionInstanceLifecyclerConfig's DeleteInactivePartitionAfterDuration.
DeleteInactivePartitionAfter time.Duration `yaml:"delete_inactive_partition_after"`

// lifecyclerPollingInterval is the lifecycler polling interval. This setting is used to lower it in tests.
lifecyclerPollingInterval time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *PartitionRingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.Store = "memberlist" // Override default value.
cfg.KVStore.RegisterFlagsWithPrefix("ingester.partition-ring.", "collectors/", f)

f.IntVar(&cfg.MinOwnersCount, "ingester.partition-ring.min-partition-owners-count", 1, "Minimum number of owners to wait before a PENDING partition gets switched to ACTIVE.")
f.DurationVar(&cfg.MinOwnersDuration, "ingester.partition-ring.min-partition-owners-duration", 10*time.Second, "How long the minimum number of owners are enforced before a PENDING partition gets switched to ACTIVE.")
f.DurationVar(&cfg.DeleteInactivePartitionAfter, "ingester.partition-ring.delete-inactive-partition-after", 13*time.Hour, "How long to wait before an INACTIVE partition is eligible for deletion. The partition is deleted only if it has been in INACTIVE state for at least the configured duration and it has no owners registered. A value of 0 disables partitions deletion.")
}

func (cfg *PartitionRingConfig) ToLifecyclerConfig(partitionID int32, instanceID string) ring.PartitionInstanceLifecyclerConfig {
return ring.PartitionInstanceLifecyclerConfig{
PartitionID: partitionID,
InstanceID: instanceID,
WaitOwnersCountOnPending: cfg.MinOwnersCount,
WaitOwnersDurationOnPending: cfg.MinOwnersDuration,
DeleteInactivePartitionAfterDuration: cfg.DeleteInactivePartitionAfter,
PollingInterval: cfg.lifecyclerPollingInterval,
}
}
Loading

0 comments on commit 6b2c69a

Please sign in to comment.