Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Gather aggregate per-line and per-tenant metrics for Drain patterns #13368

Merged
merged 6 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
if len(tokens) < 4 {
return nil
}
if d.metrics != nil {
d.metrics.TokensPerLine.Observe(float64(len(tokens)))
d.metrics.StatePerLine.Observe(float64(len(state.([]int))))
}
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
line := scanner.Text()
lines = append(lines, line)
}
drain := New(DefaultConfig(), nil)

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
drain := New(DefaultConfig(), nil)
for _, line := range lines {
drain.Train(line, 0)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
drain *Drain
inputFile string
patterns []string
format string
}{
{
drain: New(DefaultConfig(), nil),
inputFile: `testdata/agent-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
`ts=2024-04-16T15:10:42.<_> level=info msg="finished node evaluation" controller_id=module.http.cloudwatch_pipelines node_id=prometheus.scrape.<_> duration=<_>.<_>`,
`ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", batch_kubernetes_io_job_name=\"testcoordinator-job-2665838\", container=\"testcoordinator\", controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", job=\"k6-cloud/testcoordinator\", job_name=\"testcoordinator-job-2665838\", name=\"testcoordinator\", namespace=\"k6-cloud\", pod=\"testcoordinator-job-2665838-9g8ds\"}"`,
Expand Down Expand Up @@ -61,6 +63,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: `testdata/ingester-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
`ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg="GET /debug/pprof/delta_mutex (200) 1.161082ms"`,
`ts=2024-04-17T09:52:46.<_> caller=head.go:216 level=debug tenant=987678 msg="profile is empty after delta computation" metricName=memory`,
Expand All @@ -70,6 +73,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: `testdata/drone-json.txt`,
format: FormatJSON,
patterns: []string{
`{"duration":<_>,"level":"debug","method":"GET","msg":"request completed","referer":"","remote":"10.136.105.40:52702","request":"/metrics","status":200,"time":"<_>:<_>:<_>","user-agent":"GrafanaAgent/v0.40.3 (flow; linux; helm)"}`,
`{"id":"<_>","level":"debug","max-pool":4,"min-pool":0,"msg":"check capacity","pending-builds":0,"running-builds":0,"server-buffer":0,"server-capacity":0,"server-count":0,"time":"<_>:<_>:<_>"}`,
Expand All @@ -82,6 +86,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/distributor-logfmt.txt",
format: FormatLogfmt,
patterns: []string{
`ts=2024-05-02T12:17:22.851228301Z caller=http.go:194 level=debug traceID=1e1fe5ba1756bc38 orgID=1819 msg="POST /pyroscope/ingest?aggregationType=sum&from=1714652230&name=flamegraph.com%7Bapp_kubernetes_io_instance%3Dflamegraph-com%2Capp_kubernetes_io_name%3Dflamegraph-com%2Ccluster%3Dflamegraph.com%2Cinstance%3D10.0.11.146%3A8001%2Cjob%3Dkubernetes-pods%2Cnamespace%3Dflamegraph-com%2Cpod%3Dflamegraph-com-backend-79c858c7bf-jw2hn%2Cpod_template_hash%3D79c858c7bf%2Cpyroscope_tenant%3Dpyroscope%2Ctier%3Dbackend%7D&sampleRate=0&spyName=scrape&units=samples&until=1714652240 (200) 22.345191ms"`,
`ts=2024-05-02T12:17:22.<_> caller=http.go:194 level=debug traceID=<_> orgID=75 msg="POST /ingest?aggregationType=&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=<_>&spyName=gospy&units=&until=1714652242232506798 (200) <_>.<_>"`,
Expand All @@ -93,6 +98,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/journald.txt",
format: FormatUnknown,
patterns: []string{
` ln --force -s /proc/$(pidof hgrun-pause)/root/bin/hgrun /bin/hgrun;`,
` while [ "$(pidof plugins-pause)" = "" ]; do sleep 0.5; done;`,
Expand Down Expand Up @@ -199,6 +205,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kafka.txt",
format: FormatUnknown,
patterns: []string{
`[2024-05-07 10:55:40,626] INFO [LocalLog partition=ingest-6, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=180391157, size=16991045, lastModifiedTime=1715075754780, largestRecordTimestamp=Some(1715075754774)),LogSegment(baseOffset=180393429, size=16997692, lastModifiedTime=1715075760206, largestRecordTimestamp=Some(1715075760186)),LogSegment(baseOffset=180395889, size=16998200, lastModifiedTime=1715075765542, largestRecordTimestamp=Some(1715075765526)),LogSegment(baseOffset=180398373, size=16977347, lastModifiedTime=1715075770515, largestRecordTimestamp=Some(1715075770504)) (kafka.log.LocalLog$)`,
`[2024-05-07 10:55:53,038] INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-1, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=447957, size=948, lastModifiedTime=1715059232052, largestRecordTimestamp=Some(1715059232002)),LogSegment(baseOffset=447969, size=948, lastModifiedTime=1715059424352, largestRecordTimestamp=Some(1715059424301)) (kafka.log.LocalLog$)`,
Expand All @@ -219,6 +226,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kubernetes.txt",
format: FormatUnknown,
patterns: []string{
`I0507 12:02:27.947830 1 nodeutilization.go:274] "Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers"`,
`I0507 12:02:27.<_> 1 defaultevictor.go:163] "pod does not fit on any other node because of nodeSelector(s), Taint(s), or nodes marked as unschedulable" pod="<_>/<_>"`,
Expand Down Expand Up @@ -268,6 +276,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/vault.txt",
format: FormatUnknown,
patterns: []string{
`2024-05-07T10:56:38.667Z [INFO] expiration: revoked lease: lease_id=auth/gcp/login/h4c031a99aa555040a0dd99864d828e946c6d4e31f4f5178757183def61f9d104`,
`2024-05-07T10:<_>:<_>.<_> [INFO] expiration: revoked lease: lease_id=auth/kubernetes/<_>/login/<_>`,
Expand All @@ -276,6 +285,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/calico.txt",
format: FormatUnknown,
patterns: []string{
`2024-05-08 15:23:56.403 [DEBUG][615489] felix/table.go 699: Finished loading iptables state ipVersion=0x4 table="filter"`,
`2024-05-08 15:23:56.403 [INFO][615489] felix/summary.go 100: Summarising 1 dataplane reconciliation loops over 600ms: avg=119ms longest=119ms (resync-filter-v4)`,
Expand Down Expand Up @@ -357,6 +367,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/grafana-ruler.txt",
format: FormatLogfmt,
patterns: []string{
`level=debug ts=2024-05-29T13:44:15.804597912Z caller=remote_instance_store.go:51 user=297794 slug=leanix msg="calling SaveAlertInstance"`,
`level=debug ts=2024-05-29T13:44:15.<_> caller=remote_instance_store.go:51 user=396586 slug=opengov msg="calling SaveAlertInstance"`,
Expand Down Expand Up @@ -411,10 +422,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
require.NoError(t, err)
defer file.Close()

detectedFormat := false
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
tt.drain.Train(line, 0)
if !detectedFormat {
require.Equal(t, tt.format, DetectLogFormat(line))
detectedFormat = true
}
}

var output []string
Expand Down Expand Up @@ -564,7 +580,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
for _, line := range tt.inputLines {
passes := matcher.Test([]byte(line))
require.Truef(t, passes, "Line should match extracted pattern: \nPatt[%q] \nLine[%q]", cluster.String(), line)

}
})
}
Expand Down
29 changes: 28 additions & 1 deletion pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,35 @@
package drain

import "github.com/prometheus/client_golang/prometheus"
import (
"regexp"

"github.com/prometheus/client_golang/prometheus"
)

const (
FormatLogfmt = "logfmt"
FormatJSON = "json"
FormatUnknown = "unknown"
)

var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$")

// DetectLogFormat guesses at how the logs are encoded based on some simple heuristics.
// It only runs on the first log line when a new stream is created, so it could do some more complex parsing or regex.
func DetectLogFormat(line string) string {
if len(line) < 2 {
return FormatUnknown
} else if line[0] == '{' && line[len(line)-1] == '}' {
return FormatJSON
} else if logfmtRegex.MatchString(line) {
return FormatLogfmt
}
return FormatUnknown
}

type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
TokensPerLine prometheus.Observer
StatePerLine prometheus.Observer
}
5 changes: 4 additions & 1 deletion pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/chunk"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/metric"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
Expand Down Expand Up @@ -208,7 +209,9 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger)
firstEntryLine := pushReqStream.Entries[0].Line
s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID)

if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
28 changes: 22 additions & 6 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
patternsDiscardedTotal prometheus.Counter
patternsDetectedTotal prometheus.Counter
patternsDiscardedTotal *prometheus.CounterVec
patternsDetectedTotal *prometheus.CounterVec
tokensPerLine *prometheus.HistogramVec
statePerLine *prometheus.HistogramVec
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
Expand All @@ -19,18 +21,32 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "flush_queue_length",
Help: "The total number of series pending in the flush queue.",
}),
patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
patternsDiscardedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_evicted_total",
Help: "The total number of patterns evicted from the LRU cache.",
}),
patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
}, []string{"tenant", "format"}),
patternsDetectedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}),
}, []string{"tenant", "format"}),
tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "tokens_per_line",
Help: "The number of tokens an incoming logline is split into for pattern recognition.",
Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280},
}, []string{"tenant", "format"}),
statePerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "state_per_line",
Help: "The number of items of additional state returned alongside tokens for pattern recognition.",
Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280},
}, []string{"tenant", "format"}),
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,19 @@ func newStream(
chunkMetrics *metric.ChunkMetrics,
cfg metric.AggregationConfig,
logger log.Logger,
guessedFormat string,
instanceID string,
) (*stream, error) {
stream := &stream{
fp: fp,
labels: labels,
labelsString: labels.String(),
labelHash: labels.Hash(),
patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{
PatternsEvictedTotal: metrics.patternsDiscardedTotal,
PatternsDetectedTotal: metrics.patternsDetectedTotal,
PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat),
PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat),
TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat),
StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat),
}),
cfg: cfg,
logger: logger,
Expand Down
11 changes: 11 additions & 0 deletions pkg/pattern/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/pattern/metric"

Expand All @@ -28,6 +29,8 @@ func TestAddStream(t *testing.T) {
Enabled: false,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -65,6 +68,8 @@ func TestPruneStream(t *testing.T) {
Enabled: false,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -113,6 +118,8 @@ func TestSampleIterator(t *testing.T) {
Enabled: true,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -158,6 +165,8 @@ func TestSampleIterator(t *testing.T) {
Enabled: true,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -244,6 +253,8 @@ func TestSampleIterator(t *testing.T) {
Enabled: true,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down
Loading