diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 5b8535cdea179..928f4a4b12a15 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -255,7 +255,7 @@ func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, inclu } // handle case of empty log string - return the single cluster in that group - if tokenCount == 0 { + if tokenCount < 2 { return d.idToCluster.Get(curNode.clusterIDs[0]) } diff --git a/pkg/pattern/flush.go b/pkg/pattern/flush.go new file mode 100644 index 0000000000000..40f75de43822e --- /dev/null +++ b/pkg/pattern/flush.go @@ -0,0 +1,71 @@ +package pattern + +import ( + "fmt" + + "github.com/go-kit/log/level" + "github.com/grafana/loki/v3/pkg/util" + "github.com/prometheus/common/model" +) + +func (i *Ingester) initFlushQueues() { + i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) + for j := 0; j < i.cfg.ConcurrentFlushes; j++ { + i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength) + // for now we don't flush only prune old samples. + // go i.flushLoop(j) + } +} + +func (i *Ingester) Flush() { + i.flush(true) +} + +func (i *Ingester) flush(mayRemoveStreams bool) { + i.sweepUsers(true, mayRemoveStreams) + + // Close the flush queues, to unblock waiting workers. + for _, flushQueue := range i.flushQueues { + flushQueue.Close() + } + + i.flushQueuesDone.Wait() + level.Debug(i.logger).Log("msg", "flush queues have drained") +} + +type flushOp struct { + from model.Time + userID string + fp model.Fingerprint + immediate bool +} + +func (o *flushOp) Key() string { + return fmt.Sprintf("%s-%s-%v", o.userID, o.fp, o.immediate) +} + +func (o *flushOp) Priority() int64 { + return -int64(o.from) +} + +// sweepUsers periodically schedules series for flushing and garbage collects users with no series +func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) { + instances := i.getInstances() + + for _, instance := range instances { + i.sweepInstance(instance, immediate, mayRemoveStreams) + } +} + +func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) { + _ = instance.streams.ForEach(func(s *stream) (bool, error) { + if mayRemoveStreams { + instance.streams.WithLock(func() { + if s.prune() { + instance.removeStream(s) + } + }) + } + return true, nil + }) +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index f2851c5703b89..af2e842c28b83 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -5,10 +5,13 @@ import ( "errors" "flag" "fmt" + "math/rand" "net/http" "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" @@ -27,9 +30,11 @@ import ( const readBatchSize = 1024 type Config struct { - Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester is enabled."` - LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the pattern ingester will operate and where it will register for discovery."` - ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."` + Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester is enabled."` + LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the pattern ingester will operate and where it will register for discovery."` + ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."` + ConcurrentFlushes int `yaml:"concurrent_flushes"` + FlushCheckPeriod time.Duration `yaml:"flush_check_period"` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -40,6 +45,8 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger) cfg.ClientConfig.RegisterFlags(fs) fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.") + fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") + fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") } func (cfg *Config) Validate() error { @@ -61,6 +68,15 @@ type Ingester struct { instancesMtx sync.RWMutex instances map[string]*instance + + // One queue per flush thread. Fingerprint is used to + // pick a queue. + flushQueues []*util.PriorityQueue + flushQueuesDone sync.WaitGroup + loopDone sync.WaitGroup + loopQuit chan struct{} + + metrics *ingesterMetrics } func New( @@ -69,13 +85,17 @@ func New( registerer prometheus.Registerer, logger log.Logger, ) (*Ingester, error) { + metrics := newIngesterMetrics(registerer, metricsNamespace) registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) i := &Ingester{ - cfg: cfg, - logger: log.With(logger, "component", "pattern-ingester"), - registerer: registerer, - instances: make(map[string]*instance), + cfg: cfg, + logger: log.With(logger, "component", "pattern-ingester"), + registerer: registerer, + metrics: metrics, + instances: make(map[string]*instance), + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + loopQuit: make(chan struct{}), } i.Service = services.NewBasicService(i.starting, i.running, i.stopping) var err error @@ -106,18 +126,74 @@ func (i *Ingester) starting(ctx context.Context) error { if err != nil { return err } - // todo: start flush queue. + i.initFlushQueues() + // start our loop + i.loopDone.Add(1) + go i.loop() return nil } func (i *Ingester) running(ctx context.Context) error { - <-ctx.Done() - return nil + var serviceError error + select { + // wait until service is asked to stop + case <-ctx.Done(): + // stop + case err := <-i.lifecyclerWatcher.Chan(): + serviceError = fmt.Errorf("lifecycler failed: %w", err) + } + + close(i.loopQuit) + i.loopDone.Wait() + return serviceError } func (i *Ingester) stopping(_ error) error { - // todo: stop flush queue - return nil + err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler) + for _, flushQueue := range i.flushQueues { + flushQueue.Close() + } + i.flushQueuesDone.Wait() + return err +} + +func (i *Ingester) loop() { + defer i.loopDone.Done() + + // Delay the first flush operation by up to 0.8x the flush time period. + // This will ensure that multiple ingesters started at the same time do not + // flush at the same time. Flushing at the same time can cause concurrently + // writing the same chunk to object storage, which in AWS S3 leads to being + // rate limited. + jitter := time.Duration(rand.Int63n(int64(float64(i.cfg.FlushCheckPeriod.Nanoseconds()) * 0.8))) + initialDelay := time.NewTimer(jitter) + defer initialDelay.Stop() + + level.Info(i.logger).Log("msg", "sleeping for initial delay before starting periodic flushing", "delay", jitter) + + select { + case <-initialDelay.C: + // do nothing and continue with flush loop + case <-i.loopQuit: + // ingester stopped while waiting for initial delay + return + } + + // Add +/- 20% of flush interval as jitter. + // The default flush check period is 30s so max jitter will be 6s. + j := i.cfg.FlushCheckPeriod / 5 + flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) + defer flushTicker.Stop() + + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + + case <-i.loopQuit: + return + } + } } // Watch implements grpc_health_v1.HealthCheck. @@ -135,10 +211,6 @@ func (i *Ingester) CheckReady(ctx context.Context) error { return i.lifecycler.CheckReady(ctx) } -func (i *Ingester) Flush() { - // todo flush or use transfer out -} - func (i *Ingester) TransferOut(_ context.Context) error { // todo may be. return ring.ErrTransferDisabled @@ -217,3 +289,14 @@ func (i *Ingester) getInstanceByID(id string) (*instance, bool) { inst, ok := i.instances[id] return inst, ok } + +func (i *Ingester) getInstances() []*instance { + i.instancesMtx.RLock() + defer i.instancesMtx.RUnlock() + + instances := make([]*instance, 0, len(i.instances)) + for _, instance := range i.instances { + instances = append(instances, instance) + } + return instances +} diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index cf319cd2a946e..4b270a04ca391 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -154,3 +154,10 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels } return s.labels } + +// removeStream removes a stream from the instance. +func (i *instance) removeStream(s *stream) { + if i.streams.Delete(s) { + i.index.Delete(s.labels, s.fp) + } +} diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go new file mode 100644 index 0000000000000..e4a9c146c36f6 --- /dev/null +++ b/pkg/pattern/metrics.go @@ -0,0 +1,21 @@ +package pattern + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type ingesterMetrics struct { + flushQueueLength prometheus.Gauge +} + +func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics { + return &ingesterMetrics{ + flushQueueLength: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "flush_queue_length", + Help: "The total number of series pending in the flush queue.", + }), + } +} diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 3bbe649d9aecc..8e80628e6e194 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -72,10 +72,19 @@ func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Ite iters := make([]iter.Iterator, 0, len(clusters)) for _, cluster := range clusters { - if cluster.Size < minClusterSize { + if cluster.Size < minClusterSize || cluster.String() == "" { continue } iters = append(iters, cluster.Iterator(from, through)) } return iter.NewMerge(iters...), nil } + +func (s *stream) prune() bool { + s.mtx.Lock() + defer s.mtx.Unlock() + // todo first prune all Volume Chunks. + // todo then prune all clusters. + + return false +}