Skip to content

Commit

Permalink
feat: remove log line with less than 3 tokens and start working on pr…
Browse files Browse the repository at this point in the history
…uning old sample
  • Loading branch information
cyriltovena committed Apr 3, 2024
1 parent a66f18b commit 44eb1c8
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, inclu
}

// handle case of empty log string - return the single cluster in that group
if tokenCount == 0 {
if tokenCount < 2 {
return d.idToCluster.Get(curNode.clusterIDs[0])
}

Expand Down
71 changes: 71 additions & 0 deletions pkg/pattern/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package pattern

import (
"fmt"

"github.com/go-kit/log/level"
"github.com/grafana/loki/v3/pkg/util"
"github.com/prometheus/common/model"
)

func (i *Ingester) initFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
// for now we don't flush only prune old samples.
// go i.flushLoop(j)
}
}

func (i *Ingester) Flush() {
i.flush(true)
}

func (i *Ingester) flush(mayRemoveStreams bool) {
i.sweepUsers(true, mayRemoveStreams)

// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}

i.flushQueuesDone.Wait()
level.Debug(i.logger).Log("msg", "flush queues have drained")
}

type flushOp struct {
from model.Time
userID string
fp model.Fingerprint
immediate bool
}

func (o *flushOp) Key() string {
return fmt.Sprintf("%s-%s-%v", o.userID, o.fp, o.immediate)
}

func (o *flushOp) Priority() int64 {
return -int64(o.from)
}

// sweepUsers periodically schedules series for flushing and garbage collects users with no series
func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) {
instances := i.getInstances()

for _, instance := range instances {
i.sweepInstance(instance, immediate, mayRemoveStreams)
}
}

func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) {
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
if mayRemoveStreams {
instance.streams.WithLock(func() {
if s.prune() {
instance.removeStream(s)
}
})
}
return true, nil
})
}
115 changes: 99 additions & 16 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"errors"
"flag"
"fmt"
"math/rand"
"net/http"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
Expand All @@ -27,9 +30,11 @@ import (
const readBatchSize = 1024

type Config struct {
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester is enabled."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the pattern ingester will operate and where it will register for discovery."`
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester is enabled."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the pattern ingester will operate and where it will register for discovery."`
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`

// For testing.
factory ring_client.PoolFactory `yaml:"-"`
Expand All @@ -40,6 +45,8 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger)
cfg.ClientConfig.RegisterFlags(fs)
fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
}

func (cfg *Config) Validate() error {
Expand All @@ -61,6 +68,15 @@ type Ingester struct {

instancesMtx sync.RWMutex
instances map[string]*instance

// One queue per flush thread. Fingerprint is used to
// pick a queue.
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup
loopDone sync.WaitGroup
loopQuit chan struct{}

metrics *ingesterMetrics
}

func New(
Expand All @@ -69,13 +85,17 @@ func New(
registerer prometheus.Registerer,
logger log.Logger,
) (*Ingester, error) {
metrics := newIngesterMetrics(registerer, metricsNamespace)
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)

i := &Ingester{
cfg: cfg,
logger: log.With(logger, "component", "pattern-ingester"),
registerer: registerer,
instances: make(map[string]*instance),
cfg: cfg,
logger: log.With(logger, "component", "pattern-ingester"),
registerer: registerer,
metrics: metrics,
instances: make(map[string]*instance),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
loopQuit: make(chan struct{}),
}
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
var err error
Expand Down Expand Up @@ -106,18 +126,74 @@ func (i *Ingester) starting(ctx context.Context) error {
if err != nil {
return err
}
// todo: start flush queue.
i.initFlushQueues()
// start our loop
i.loopDone.Add(1)
go i.loop()
return nil
}

func (i *Ingester) running(ctx context.Context) error {
<-ctx.Done()
return nil
var serviceError error
select {
// wait until service is asked to stop
case <-ctx.Done():
// stop
case err := <-i.lifecyclerWatcher.Chan():
serviceError = fmt.Errorf("lifecycler failed: %w", err)
}

close(i.loopQuit)
i.loopDone.Wait()
return serviceError
}

func (i *Ingester) stopping(_ error) error {
// todo: stop flush queue
return nil
err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}
i.flushQueuesDone.Wait()
return err
}

func (i *Ingester) loop() {
defer i.loopDone.Done()

// Delay the first flush operation by up to 0.8x the flush time period.
// This will ensure that multiple ingesters started at the same time do not
// flush at the same time. Flushing at the same time can cause concurrently
// writing the same chunk to object storage, which in AWS S3 leads to being
// rate limited.
jitter := time.Duration(rand.Int63n(int64(float64(i.cfg.FlushCheckPeriod.Nanoseconds()) * 0.8)))
initialDelay := time.NewTimer(jitter)
defer initialDelay.Stop()

level.Info(i.logger).Log("msg", "sleeping for initial delay before starting periodic flushing", "delay", jitter)

select {
case <-initialDelay.C:
// do nothing and continue with flush loop
case <-i.loopQuit:
// ingester stopped while waiting for initial delay
return
}

// Add +/- 20% of flush interval as jitter.
// The default flush check period is 30s so max jitter will be 6s.
j := i.cfg.FlushCheckPeriod / 5
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

case <-i.loopQuit:
return
}
}
}

// Watch implements grpc_health_v1.HealthCheck.
Expand All @@ -135,10 +211,6 @@ func (i *Ingester) CheckReady(ctx context.Context) error {
return i.lifecycler.CheckReady(ctx)
}

func (i *Ingester) Flush() {
// todo flush or use transfer out
}

func (i *Ingester) TransferOut(_ context.Context) error {
// todo may be.
return ring.ErrTransferDisabled
Expand Down Expand Up @@ -217,3 +289,14 @@ func (i *Ingester) getInstanceByID(id string) (*instance, bool) {
inst, ok := i.instances[id]
return inst, ok
}

func (i *Ingester) getInstances() []*instance {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()

instances := make([]*instance, 0, len(i.instances))
for _, instance := range i.instances {
instances = append(instances, instance)
}
return instances
}
7 changes: 7 additions & 0 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,10 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
}
return s.labels
}

// removeStream removes a stream from the instance.
func (i *instance) removeStream(s *stream) {
if i.streams.Delete(s) {
i.index.Delete(s.labels, s.fp)
}
}
21 changes: 21 additions & 0 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pattern

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
return &ingesterMetrics{
flushQueueLength: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "flush_queue_length",
Help: "The total number of series pending in the flush queue.",
}),
}
}
11 changes: 10 additions & 1 deletion pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,19 @@ func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Ite
iters := make([]iter.Iterator, 0, len(clusters))

for _, cluster := range clusters {
if cluster.Size < minClusterSize {
if cluster.Size < minClusterSize || cluster.String() == "" {
continue
}
iters = append(iters, cluster.Iterator(from, through))
}
return iter.NewMerge(iters...), nil
}

func (s *stream) prune() bool {
s.mtx.Lock()
defer s.mtx.Unlock()
// todo first prune all Volume Chunks.
// todo then prune all clusters.

return false
}

0 comments on commit 44eb1c8

Please sign in to comment.