diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index fcfd017884fed..9308ab99a511d 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -30,6 +30,7 @@ import ( "unsafe" "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/grafana/loki/v3/pkg/logproto" @@ -211,12 +212,29 @@ func (d *Drain) Train(content string, ts int64) *LogCluster { if !d.limiter.Allow() { return nil } - d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state) + var linesSkipped *prometheus.CounterVec + if d.metrics != nil { + linesSkipped = d.metrics.LinesSkipped + } + d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state, linesSkipped) + if d.tokens == nil && d.state == nil { + return nil + } + return d.train(d.tokens, d.state, ts) } func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster { if len(tokens) < 4 { + if d.metrics != nil && d.metrics.LinesSkipped != nil { + d.metrics.LinesSkipped.WithLabelValues(TooFewTokens).Inc() + } + return nil + } + if len(tokens) > 80 { + if d.metrics != nil && d.metrics.LinesSkipped != nil { + d.metrics.LinesSkipped.WithLabelValues(TooManyTokens).Inc() + } return nil } if d.metrics != nil { @@ -255,7 +273,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster } func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster { - tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state) + tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state, d.metrics.LinesSkipped) matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true) // Match no existing log cluster if matchCluster == nil { diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index c2beda4b44d5f..b9045080d2c11 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -24,25 +24,23 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { // Set this so the test will print the patterns found, in string slice format for easy copy-paste outputPatternsForTestUpdate := false tests := []struct { - drain *Drain - inputFile string - patterns []string - format string + drain *Drain + inputFile string + patterns []string + tooManyTokens []string + format string }{ { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/agent-logfmt.txt`, format: FormatLogfmt, patterns: []string{ - `ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", batch_kubernetes_io_job_name=\"testcoordinator-job-2665838\", container=\"testcoordinator\", controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", job=\"k6-cloud/testcoordinator\", job_name=\"testcoordinator-job-2665838\", name=\"testcoordinator\", namespace=\"k6-cloud\", pod=\"testcoordinator-job-2665838-9g8ds\"}"`, `ts=2024-04-16T15:10:43.551543875Z caller=filetargetmanager.go:397 level=info component=logs logs_config=default msg="Removing target" key="/var/log/pods/*35649bfd-52ff-4281-9294-5f65fd5a89fc/marketplaces-api/*.log:{container=\"marketplaces-api\", job=\"grafana-com/marketplaces-api\", name=\"marketplaces-api\", namespace=\"grafana-com\", pod=\"marketplaces-api-f67ff7567-gqrvb\", pod_template_hash=\"f67ff7567\"}"`, `ts=<_> caller=filetarget.go:192 level=info component=logs logs_config=default msg="filetarget: watcher closed, tailer stopped, positions saved" path=/var/log/pods/*<_>*.log`, `ts=<_> caller=filetarget.go:313 level=info component=logs logs_config=default msg="watching new directory" directory=<_>`, `ts=<_> caller=filetarget.go:326 level=info component=logs logs_config=default msg="removing directory from watcher" directory=<_>`, `ts=<_> caller=filetargetmanager.go:181 level=info component=logs logs_config=default msg="received file watcher event" name=<_> op=CREATE`, - `ts=<_> caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*<_>*.log:{app=\"grafana\", conprof=\"true\", container=\"<_>\", instanceId=\"i1111\", job=\"hosted-grafana/grafana\", name=\"grafana\", namespace=\"hosted-grafana\", org=\"orgnamehere\", plan=\"free\", pod=\"orgnamehere-grafana-7c65678f86-9zhlb\", pod_template_hash=\"7c65678f86\", resource_version=\"143638246\", slug=\"orgnamehere\", stackId=\"866772\"}"`, `ts=<_> caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*<_>*.log:{component=\"kube-proxy\", container=\"kube-proxy\", job=\"<_>\", namespace=\"kube-system\", pod=\"kube-proxy-gke-ops-us-east-0-main-n2s32-1-1dd39c-32ae1dde-hmhw\", tier=\"node\"}"`, - `ts=<_> caller=filetargetmanager.go:397 level=info component=logs logs_config=default msg="Removing target" key="/var/log/pods/*<_>*.log:{app=\"grafana\", conprof=\"true\", container=\"<_>\", instanceId=\"<_>\", job=\"hosted-grafana/grafana\", name=\"grafana\", namespace=\"hosted-grafana\", org=\"<_>\", plan=\"free\", pod=\"<_>\", pod_template_hash=\"<_>\", resource_version=\"<_>\", slug=<_>`, `ts=<_> caller=log.go:168 component=logs logs_config=default level=info msg="Re-opening moved/deleted file <_> ..."`, `ts=<_> caller=log.go:168 component=logs logs_config=default level=info msg="Seeked <_> - &{Offset:0 Whence:0}"`, `ts=<_> caller=log.go:168 component=logs logs_config=default level=info msg="Successfully reopened <_>"`, @@ -57,6 +55,11 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `ts=<_> caller=tailer.go:245 level=info component=logs logs_config=default component=tailer msg="stopped tailing file" path=<_>`, `ts=<_> level=info msg="finished node evaluation" controller_id=module.http.cloudwatch_pipelines node_id=<_> duration=<_>`, }, + tooManyTokens: []string{ + `ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", batch_kubernetes_io_job_name=\"testcoordinator-job-2665838\", container=\"testcoordinator\", controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", job=\"k6-cloud/testcoordinator\", job_name=\"testcoordinator-job-2665838\", name=\"testcoordinator\", namespace=\"k6-cloud\", pod=\"testcoordinator-job-2665838-9g8ds\"}"`, + `ts=<_> caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*<_>*.log:{app=\"grafana\", conprof=\"true\", container=\"<_>\", instanceId=\"i1111\", job=\"hosted-grafana/grafana\", name=\"grafana\", namespace=\"hosted-grafana\", org=\"orgnamehere\", plan=\"free\", pod=\"orgnamehere-grafana-7c65678f86-9zhlb\", pod_template_hash=\"7c65678f86\", resource_version=\"143638246\", slug=\"orgnamehere\", stackId=\"866772\"}"`, + `ts=<_> caller=filetargetmanager.go:397 level=info component=logs logs_config=default msg="Removing target" key="/var/log/pods/*<_>*.log:{app=\"grafana\", conprof=\"true\", container=\"<_>\", instanceId=\"<_>\", job=\"hosted-grafana/grafana\", name=\"grafana\", namespace=\"hosted-grafana\", org=\"<_>\", plan=\"free\", pod=\"<_>\", pod_template_hash=\"<_>\", resource_version=\"<_>\", slug=<_>`, + }, }, { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), @@ -67,19 +70,22 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `ts=<_> caller=head.go:216 level=debug tenant=987678 msg="profile is empty after delta computation" metricName=memory`, `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=<_> msg="POST /ingester.v1.IngesterService/Push (200) <_>"`, }, + tooManyTokens: []string{}, }, { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/drone-json.txt`, format: FormatJSON, patterns: []string{ - `{"duration"<_>,"level":"debug","method":"GET","msg":"request completed","referer":"","remote":"10.136.105.40:52702","request":"/metrics","status":200,"time":"<_>","user-agent":"GrafanaAgent/v0.40.3 (flow; linux; helm)"}`, `{"id":"<_>","level":"debug","max-pool":4,"min-pool":0,"msg":"check capacity","pending-builds":0,"running-builds":0,"server-buffer":0,"server-capacity":0,"server-count":0,"time":"<_>"}`, `{"id":"<_>","level":"debug","msg":"calculate server capacity","time":"<_>"}`, `{"id":"<_>","level":"debug","msg":"calculate unfinished jobs","time":"<_>"}`, `{"id":"<_>","level":"debug","msg":"check capacity complete","time":"<_>"}`, `{"id":"<_>","level":"debug","msg":"no capacity changes required","time":"<_>"}`, }, + tooManyTokens: []string{ + `{"duration"<_>,"level":"debug","method":"GET","msg":"request completed","referer":"","remote":"10.136.105.40:52702","request":"/metrics","status":200,"time":"<_>","user-agent":"GrafanaAgent/v0.40.3 (flow; linux; helm)"}`, + }, }, { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), @@ -88,10 +94,12 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { patterns: []string{ `ts=2024-05-02T12:17:22.115385619Z caller=http.go:194 level=debug traceID=7836a12bb7f1964e orgID=75 msg="POST /ingest?aggregationType=sum&from=1714652227107641016&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=100&spyName=gospy&units=samples&until=1714652242109516917 (200) 1.562143ms"`, `ts=2024-05-02T12:17:22.242343806Z caller=http.go:194 level=debug traceID=404c6a83a18e66a4 orgID=75 msg="POST /ingest?aggregationType=average&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=0&spyName=gospy&units=goroutines&until=1714652242232506798 (200) 2.902485ms"`, - `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=1819 msg="POST /pyroscope/ingest?aggregationType=sum&from=1714652230&name=<_>%7Bapp_kubernetes_io_instance%3Dflamegraph-com%2Capp_kubernetes_io_name%3Dflamegraph-com%2Ccluster%3Dflamegraph.com%2Cinstance%<_>%<_>%2Cjob%3Dkubernetes-pods%2Cnamespace%3Dflamegraph-com%2Cpod%<_>%2Cpod_template_hash%<_>%2Cpyroscope_tenant%3Dpyroscope%2Ctier%<_>%7D&sampleRate=0&spyName=scrape&units=samples&until=1714652240 (200) <_>"`, `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=75 msg="POST /ingest?aggregationType=&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=<_>&spyName=gospy&units=&until=1714652242232506798 (200) <_>"`, `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=<_> msg="POST /push.v1.PusherService/Push (<_>) <_>"`, }, + tooManyTokens: []string{ + `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=1819 msg="POST /pyroscope/ingest?aggregationType=sum&from=1714652230&name=<_>%7Bapp_kubernetes_io_instance%3Dflamegraph-com%2Capp_kubernetes_io_name%3Dflamegraph-com%2Ccluster%3Dflamegraph.com%2Cinstance%<_>%<_>%2Cjob%3Dkubernetes-pods%2Cnamespace%3Dflamegraph-com%2Cpod%<_>%2Cpod_template_hash%<_>%2Cpyroscope_tenant%3Dpyroscope%2Ctier%<_>%7D&sampleRate=0&spyName=scrape&units=samples&until=1714652240 (200) <_>"`, + }, }, { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), @@ -110,7 +118,6 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `E0507 11:59:31.554203 4531 pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"frontend\" with CrashLoopBackOff: \"back-off 5m0s restarting failed container=frontend pod=otel-demo-alt-dev-frontend-79ccf98858-mbj4x_otel-demo-alt(d08e620e-00d0-49f1-a195-820a62e8de8f)\"" pod="otel-demo-alt/otel-demo-alt-dev-frontend-79ccf98858-mbj4x" podUID="d08e620e-00d0-49f1-a195-820a62e8de8f"`, `E0507 11:59:31.928148 4734 pod_workers.go:1300] "Error syncing pod, skipping" err="unmounted volumes=[terraform-drift-detector-data], unattached volumes=[terraform-drift-detector-data], failed to process volumes=[]: context deadline exceeded" pod="terraform-drift-detector/terraform-drift-detector-d68b4c545-jg2vj" podUID="6c607496-ef26-454e-b2f2-4cb75b233fa3"`, `E0507 11:59:34.856101 4727 pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"grafana-render-security\" with ImagePullBackOff: \"Back-off pulling image \\\"us.gcr.io/hosted-grafana/hosted-grafana-security:0.1.181\\\"\"" pod="integration/grafana-render-service-cbff479fc-cj9tp" podUID="0e3114d1-2f3a-49d6-a71d-dbc75050d8e0"`, - `E0507 11:59:34.923938 3027 kuberuntime_manager.go:1261] container &Container{Name:mysqld-exporter,Image:prom/mysqld-exporter:v0.13.0,Command:[],Args:[--collect.info_schema.innodb_metrics],WorkingDir:,Ports:[]ContainerPort{ContainerPort{Name:http-metrics,HostPort:0,ContainerPort:9104,Protocol:TCP,HostIP:,},},Env:[]EnvVar{EnvVar{Name:MYSQL_USER,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:username,Optional:nil,},},},EnvVar{Name:MYSQL_PASSWORD,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:password,Optional:nil,},},},EnvVar{Name:MYSQL_HOST,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:endpoint,Optional:nil,},},},EnvVar{Name:MYSQL_PORT,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:port,Optional:nil,},},},EnvVar{Name:MYSQL_TLS_MODE,Value:preferred,ValueFrom:nil,},EnvVar{Name:DATA_SOURCE_NAME,Value:$(MYSQL_USER):$(MYSQL_PASSWORD)@tcp($(MYSQL_HOST):$(MYSQL_PORT))/?tls=$(MYSQL_TLS_MODE),ValueFrom:nil,},},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{VolumeMount{Name:kube-api-access-dzx7d,ReadOnly:true,MountPath:/var/run/secrets/kubernetes.io/serviceaccount,SubPath:,MountPropagation:nil,SubPathExpr:,},},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:IfNotPresent,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:File,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},RestartPolicy:nil,} start failed in pod testcrossplane-exporter-c67cfc58f-vbzl4_crossplane-playground(3d49134d-3378-4ec3-824c-5ff4ea2590a5): CreateContainerConfigError: secret "testcrossplane-user-exporter" not found`, `E0507 11:59:34.923984 3027 pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"mysqld-exporter\" with CreateContainerConfigError: \"secret \\\"testcrossplane-user-exporter\\\" not found\"" pod="crossplane-playground/testcrossplane-exporter-c67cfc58f-vbzl4" podUID="3d49134d-3378-4ec3-824c-5ff4ea2590a5"`, `E0507 11:59:35.928465 4734 pod_workers.go:1300] "Error syncing pod, skipping" err="unmounted volumes=[custom-grafana-agent], unattached volumes=[], failed to process volumes=[]: context deadline exceeded" pod="loki-dev-010/custom-grafana-agent-856948968f-6jfks" podUID="17b244cc-ecb9-4fbc-beaa-8fa47fafe013"`, `E0507 11:59:37.252214 4736 pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"ksm\" with CrashLoopBackOff: \"back-off 5m0s restarting failed container=ksm pod=new-relic-nri-bundle-nrk8s-ksm-6c785668f5-jcxh2_integration(f7cc3cca-2ffb-4fde-a73e-a4ba8b0f6b3c)\"" pod="integration/new-relic-nri-bundle-nrk8s-ksm-6c785668f5-jcxh2" podUID="f7cc3cca-2ffb-4fde-a73e-a4ba8b0f6b3c"`, @@ -123,7 +130,6 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"gcom-sync\" with ImagePullBackOff: \"Back-off pulling image \\\"us.gcr.io/kubernetes-dev/frontend-monitoring:6a8eb5a\\\"\"" pod="<_>" podUID="<_>"`, `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"goldpinger\" with CrashLoopBackOff: \"back-off 5m0s restarting failed container=goldpinger pod=<_>(<_>)\"" pod="<_>" podUID="<_>"`, `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"grafana\" with CrashLoopBackOff: \"back-off <_> restarting failed container=grafana pod=<_>(<_>)\"" pod="<_>" podUID="<_>"`, - `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"grafana\" with ErrImagePull: \"[rpc error: code = NotFound desc = failed to pull and unpack image \\\"<_>\\\": failed to resolve reference \\\"<_>\\\": <_> not found, failed to pull and unpack image \\\"<_>\\\": failed to resolve reference \\\"<_>\\\": unexpected status from HEAD request to <_> 403 Forbidden]\"" pod="<_>" podUID="<_>"`, `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"grafana\" with ImagePullBackOff: \"Back-off pulling image \\\"<_>\\\"\"" pod="<_>" podUID="<_>"`, `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"pdc\" with ErrImageNeverPull: \"Container image \\\"us.gcr.io/hosted-grafana/pdc:0.1.415\\\" is not present with pull policy of Never\"" pod="<_>" podUID="<_>"`, `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"ruler\" with CreateContainerConfigError: \"secret \\\"ruler-alertmanager-token\\\" not found\"" pod="<_>" podUID="<_>"`, @@ -212,13 +218,16 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `time="<_>" level=warning msg="cleaning up after shim disconnected" id=<_> namespace=k8s.io`, `var-lib-containerd-tmpmounts-containerd\<_> Deactivated successfully.`, }, + tooManyTokens: []string{ + `E0507 11:59:34.923938 3027 kuberuntime_manager.go:1261] container &Container{Name:mysqld-exporter,Image:prom/mysqld-exporter:v0.13.0,Command:[],Args:[--collect.info_schema.innodb_metrics],WorkingDir:,Ports:[]ContainerPort{ContainerPort{Name:http-metrics,HostPort:0,ContainerPort:9104,Protocol:TCP,HostIP:,},},Env:[]EnvVar{EnvVar{Name:MYSQL_USER,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:username,Optional:nil,},},},EnvVar{Name:MYSQL_PASSWORD,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:password,Optional:nil,},},},EnvVar{Name:MYSQL_HOST,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:endpoint,Optional:nil,},},},EnvVar{Name:MYSQL_PORT,Value:,ValueFrom:&EnvVarSource{FieldRef:nil,ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:&SecretKeySelector{LocalObjectReference:LocalObjectReference{Name:testcrossplane-user-exporter,},Key:port,Optional:nil,},},},EnvVar{Name:MYSQL_TLS_MODE,Value:preferred,ValueFrom:nil,},EnvVar{Name:DATA_SOURCE_NAME,Value:$(MYSQL_USER):$(MYSQL_PASSWORD)@tcp($(MYSQL_HOST):$(MYSQL_PORT))/?tls=$(MYSQL_TLS_MODE),ValueFrom:nil,},},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{VolumeMount{Name:kube-api-access-dzx7d,ReadOnly:true,MountPath:/var/run/secrets/kubernetes.io/serviceaccount,SubPath:,MountPropagation:nil,SubPathExpr:,},},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:IfNotPresent,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:File,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},RestartPolicy:nil,} start failed in pod testcrossplane-exporter-c67cfc58f-vbzl4_crossplane-playground(3d49134d-3378-4ec3-824c-5ff4ea2590a5): CreateContainerConfigError: secret "testcrossplane-user-exporter" not found`, + `E0507 <_> <_> pod_workers.go:1300] "Error syncing pod, skipping" err="failed to \"StartContainer\" for \"grafana\" with ErrImagePull: \"[rpc error: code = NotFound desc = failed to pull and unpack image \\\"<_>\\\": failed to resolve reference \\\"<_>\\\": <_> not found, failed to pull and unpack image \\\"<_>\\\": failed to resolve reference \\\"<_>\\\": unexpected status from HEAD request to <_> 403 Forbidden]\"" pod="<_>" podUID="<_>"`, + }, }, { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/kafka.txt", format: FormatUnknown, patterns: []string{ - `[2024-05-07 10:55:40,626] INFO [LocalLog partition=ingest-6, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=180391157, size=16991045, lastModifiedTime=1715075754780, largestRecordTimestamp=Some(1715075754774)),LogSegment(baseOffset=180393429, size=16997692, lastModifiedTime=1715075760206, largestRecordTimestamp=Some(1715075760186)),LogSegment(baseOffset=180395889, size=16998200, lastModifiedTime=1715075765542, largestRecordTimestamp=Some(1715075765526)),LogSegment(baseOffset=180398373, size=16977347, lastModifiedTime=1715075770515, largestRecordTimestamp=Some(1715075770504)) (kafka.log.LocalLog$)`, `[2024-05-07 10:55:53,038] INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-1, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=447957, size=948, lastModifiedTime=1715059232052, largestRecordTimestamp=Some(1715059232002)),LogSegment(baseOffset=447969, size=948, lastModifiedTime=1715059424352, largestRecordTimestamp=Some(1715059424301)) (kafka.log.LocalLog$)`, `[2024-05-07 10:55:53,<_>] INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-0, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=<_>, size=948, lastModifiedTime=<_>, largestRecordTimestamp=Some(<_>)) (kafka.log.LocalLog$)`, `[2024-05-07 <_>,<_>] INFO Deleted log <_> (kafka.log.LogSegment)`, @@ -233,6 +242,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `[2024-05-07 <_>,<_>] INFO [UnifiedLog partition=<_>, dir=/bitnami/kafka/data] Incremented log start offset to <_> due to leader offset increment (kafka.log.UnifiedLog)`, `[2024-05-07 <_>,<_>] INFO [UnifiedLog partition=<_>, dir=/bitnami/kafka/data] Incremented log start offset to <_> due to segment deletion (kafka.log.UnifiedLog)`, }, + tooManyTokens: []string{ + `[2024-05-07 10:55:40,626] INFO [LocalLog partition=ingest-6, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=180391157, size=16991045, lastModifiedTime=1715075754780, largestRecordTimestamp=Some(1715075754774)),LogSegment(baseOffset=180393429, size=16997692, lastModifiedTime=1715075760206, largestRecordTimestamp=Some(1715075760186)),LogSegment(baseOffset=180395889, size=16998200, lastModifiedTime=1715075765542, largestRecordTimestamp=Some(1715075765526)),LogSegment(baseOffset=180398373, size=16977347, lastModifiedTime=1715075770515, largestRecordTimestamp=Some(1715075770504)) (kafka.log.LocalLog$)`, + }, }, { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), @@ -287,6 +299,13 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/calico.txt", format: FormatUnknown, + tooManyTokens: []string{ + `2024-05-08 15:23:57.942 [WARNING][56] felix/table.go 654: Detected out-of-sync inserts, marking for resync actualRuleIDs=[]string{"", "", "", "", "", "", "", "", "", "", "", "", "tVnHkvAo15HuiPy0", "", ""} chainName="OUTPUT" expectedRuleIDs=[]string{"tVnHkvAo15HuiPy0", "", "", "", "", "", "", "", "", "", "", "", "", "", ""} ipVersion=0x4 table="raw"`, + `2024-05-08 15:23:57.969 [WARNING][56] felix/table.go 654: Detected out-of-sync inserts, marking for resync actualRuleIDs=[]string{"", "", "", "", "Cz_u1IQiXIMmKD4c", "", "", "", "", "", "", "", "", "", "", "", ""} chainName="INPUT" expectedRuleIDs=[]string{"Cz_u1IQiXIMmKD4c", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""} ipVersion=0x4 table="filter"`, + `2024-05-08 15:23:57.969 [WARNING][56] felix/table.go 654: Detected out-of-sync inserts, marking for resync actualRuleIDs=[]string{"", "", "", "", "tVnHkvAo15HuiPy0", "", "", "", "", ""} chainName="OUTPUT" expectedRuleIDs=[]string{"tVnHkvAo15HuiPy0", "", "", "", "", "", "", "", "", ""} ipVersion=0x4 table="filter"`, + `2024-05-08 <_> [DEBUG][<_>] felix/health.go 245: Calculated health summary healthResult=&health.HealthReport{Live:true, Ready:true, Detail:"+------------------+---------+----------------+-----------------+--------+\n| COMPONENT | TIMEOUT | LIVENESS | READINESS | DETAIL |\n+------------------+---------+----------------+-----------------+--------+\n| async_calc_graph | 20s | reporting live | reporting ready | |\n| felix-startup | 0s | reporting live | reporting ready | |\n| int_dataplane | 1m30s | reporting live | reporting ready | |\n+------------------+---------+----------------+-----------------+--------+"}`, + `2024-05-08 <_> [DEBUG][<_>] felix/xdp_state.go 1270: Finished processing pending diff state. bpfActions=intdataplane.xdpBPFActions{CreateMap:set.Typed[string]{}, RemoveMap:set.Typed[string]{}, AddToMap:map[string]map[string]uint32{}, RemoveFromMap:map[string]map[string]uint32{}, InstallXDP:set.Typed[string]{}, UninstallXDP:set.Typed[string]{}, MembersToDrop:map[string]map[string]uint32{}, MembersToAdd:map[string]map[string]uint32{}} family=4 newCS=&intdataplane.xdpSystemState{IfaceNameToData:map[string]intdataplane.xdpIfaceData{}, XDPEligiblePolicies:map[proto.PolicyID]intdataplane.xdpRules{}}`, + }, patterns: []string{ `2024-05-08 15:23:56.403 [DEBUG][615489] felix/table.go 699: Finished loading iptables state ipVersion=0x4 table="filter"`, `2024-05-08 15:23:56.614 [DEBUG][76] felix/int_dataplane.go 1777: Refreshing routes`, @@ -302,10 +321,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `2024-05-08 15:23:56.619 [DEBUG][76] felix/route_table.go 661: Syncing interface routes ifaceName="*NoOIF*" ifaceRegex="^wireguard.cali$" ipVersion=0x4 tableIndex=1`, `2024-05-08 15:23:56.619 [DEBUG][76] felix/route_table.go 686: Reconcile against kernel programming ifaceName="*NoOIF*" ifaceRegex="^wireguard.cali$" ipVersion=0x4 tableIndex=1`, `2024-05-08 15:23:56.624 [INFO][76] felix/summary.go 100: Summarising 1 dataplane reconciliation loops over 200ms: avg=10ms longest=10ms (resync-routes-v4,resync-routes-v4,resync-rules-v4,resync-wg)`, - `2024-05-08 15:23:57.942 [WARNING][56] felix/table.go 654: Detected out-of-sync inserts, marking for resync actualRuleIDs=[]string{"", "", "", "", "", "", "", "", "", "", "", "", "tVnHkvAo15HuiPy0", "", ""} chainName="OUTPUT" expectedRuleIDs=[]string{"tVnHkvAo15HuiPy0", "", "", "", "", "", "", "", "", "", "", "", "", "", ""} ipVersion=0x4 table="raw"`, `2024-05-08 15:23:57.942 [WARNING][56] felix/table.go 654: Detected out-of-sync inserts, marking for resync actualRuleIDs=[]string{"", "", "", "", "6gwbT8clXdHdC1b1"} chainName="PREROUTING" expectedRuleIDs=[]string{"6gwbT8clXdHdC1b1", "", "", "", ""} ipVersion=0x4 table="raw"`, - `2024-05-08 15:23:57.969 [WARNING][56] felix/table.go 654: Detected out-of-sync inserts, marking for resync actualRuleIDs=[]string{"", "", "", "", "Cz_u1IQiXIMmKD4c", "", "", "", "", "", "", "", "", "", "", "", ""} chainName="INPUT" expectedRuleIDs=[]string{"Cz_u1IQiXIMmKD4c", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""} ipVersion=0x4 table="filter"`, - `2024-05-08 15:23:57.969 [WARNING][56] felix/table.go 654: Detected out-of-sync inserts, marking for resync actualRuleIDs=[]string{"", "", "", "", "tVnHkvAo15HuiPy0", "", "", "", "", ""} chainName="OUTPUT" expectedRuleIDs=[]string{"tVnHkvAo15HuiPy0", "", "", "", "", "", "", "", "", ""} ipVersion=0x4 table="filter"`, `2024-05-08 15:23:58.169 [INFO][2333] felix/summary.go 100: Summarising 35 dataplane reconciliation loops over 1m2s: avg=12ms longest=46ms (resync-filter-v4,resync-filter-v6,resync-mangle-v4,resync-mangle-v6,update-filter-v4,update-filter-v6)`, `2024-05-08 15:23:58.566 [DEBUG][3576126] felix/int_dataplane.go 957: Examining link for MTU calculation mtu=1500 name="eth0"`, `2024-05-08 15:23:58.680 [DEBUG][216945] felix/int_dataplane.go 1785: Reschedule kick received`, @@ -341,7 +357,6 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `2024-05-08 <_> [DEBUG][<_>] felix/health.go 196: Checking state of reporter reporter=&health.reporterState{name:"async_calc_graph", reports:health.HealthReport{Live:true, Ready:true, Detail:""}, timeout:20000000000, latest:health.HealthReport{Live:true, Ready:true, Detail:""}, timestamp:time.Time{<_>, <_>, loc:(*time.Location)(0x4ce3aa0)}}`, `2024-05-08 <_> [DEBUG][<_>] felix/health.go 196: Checking state of reporter reporter=&health.reporterState{name:"felix-startup", reports:health.HealthReport{Live:true, Ready:true, Detail:""}, timeout:0, latest:health.HealthReport{Live:true, Ready:true, Detail:""}, timestamp:time.Time{<_>, <_>, loc:(*time.Location)(0x4ce3aa0)}}`, `2024-05-08 <_> [DEBUG][<_>] felix/health.go 196: Checking state of reporter reporter=&health.reporterState{name:"int_dataplane", reports:health.HealthReport{Live:true, Ready:true, Detail:""}, timeout:90000000000, latest:health.HealthReport{Live:true, Ready:true, Detail:""}, timestamp:time.Time{<_>, <_>, loc:(*time.Location)(0x4ce3aa0)}}`, - `2024-05-08 <_> [DEBUG][<_>] felix/health.go 245: Calculated health summary healthResult=&health.HealthReport{Live:true, Ready:true, Detail:"+------------------+---------+----------------+-----------------+--------+\n| COMPONENT | TIMEOUT | LIVENESS | READINESS | DETAIL |\n+------------------+---------+----------------+-----------------+--------+\n| async_calc_graph | 20s | reporting live | reporting ready | |\n| felix-startup | 0s | reporting live | reporting ready | |\n| int_dataplane | 1m30s | reporting live | reporting ready | |\n+------------------+---------+----------------+-----------------+--------+"}`, `2024-05-08 <_> [DEBUG][<_>] felix/health.go <_> GET <_>`, `2024-05-08 <_> [DEBUG][<_>] felix/int_dataplane.go 1773: Refreshing IP sets state`, `2024-05-08 <_> [DEBUG][<_>] felix/int_dataplane.go 1807: Applying dataplane updates`, @@ -367,7 +382,6 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `2024-05-08 <_> [DEBUG][<_>] felix/wireguard.go 652: Wireguard is not enabled, skipping sync ipVersion=0x4`, `2024-05-08 <_> [DEBUG][<_>] felix/xdp_state.go 1004: Updating ipsetIDsToMembers cache. family=4`, `2024-05-08 <_> [DEBUG][<_>] felix/xdp_state.go 1043: Processing pending diff state. cs=&intdataplane.xdpSystemState{IfaceNameToData:map[string]intdataplane.xdpIfaceData{}, XDPEligiblePolicies:map[proto.PolicyID]intdataplane.xdpRules{}} family=4`, - `2024-05-08 <_> [DEBUG][<_>] felix/xdp_state.go 1270: Finished processing pending diff state. bpfActions=intdataplane.xdpBPFActions{CreateMap:set.Typed[string]{}, RemoveMap:set.Typed[string]{}, AddToMap:map[string]map[string]uint32{}, RemoveFromMap:map[string]map[string]uint32{}, InstallXDP:set.Typed[string]{}, UninstallXDP:set.Typed[string]{}, MembersToDrop:map[string]map[string]uint32{}, MembersToAdd:map[string]map[string]uint32{}} family=4 newCS=&intdataplane.xdpSystemState{IfaceNameToData:map[string]intdataplane.xdpIfaceData{}, XDPEligiblePolicies:map[proto.PolicyID]intdataplane.xdpRules{}}`, `2024-05-08 <_> [DEBUG][<_>] felix/xdp_state.go 1605: Getting member changes. family=4 oldMembers=map[string]set.Set[string]{}`, `2024-05-08 <_> [DEBUG][<_>] felix/xdp_state.go 1798: Processing BPF actions. family="ipv4"`, `2024-05-08 <_> [DEBUG][<_>] felix/xdp_state.go 1932: Finished processing BPF actions. family="ipv4"`, @@ -459,7 +473,12 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { } require.Equal(t, tt.patterns, output) - require.Falsef(t, outputPatternsForTestUpdate, `outputPatternsForTestUpdate should only be used locally to update test patterns.`) + require.NotContains(t, tt.tooManyTokens, output) + require.Falsef( + t, + outputPatternsForTestUpdate, + `outputPatternsForTestUpdate should only be used locally to update test patterns.`, + ) }) } } diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 4d758181399b4..41b667b854837 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -8,12 +8,13 @@ import ( "github.com/buger/jsonparser" gologfmt "github.com/go-logfmt/logfmt" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" ) type LineTokenizer interface { - Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) + Tokenize(line string, tokens []string, state interface{}, linesDropped *prometheus.CounterVec) ([]string, interface{}) Join(tokens []string, state interface{}) string Clone(tokens []string, state interface{}) ([]string, interface{}) } @@ -56,8 +57,16 @@ func newPunctuationTokenizer(maxLineLength int) *punctuationTokenizer { } } -func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { +func (p *punctuationTokenizer) Tokenize( + line string, + tokens []string, + state interface{}, + linesDropped *prometheus.CounterVec, +) ([]string, interface{}) { if len(line) > p.maxLineLength { + if linesDropped != nil { + linesDropped.WithLabelValues(LineTooLong).Inc() + } return nil, nil } @@ -131,7 +140,12 @@ func (p *punctuationTokenizer) Clone(tokens []string, state interface{}) ([]stri type splittingTokenizer struct{} -func (splittingTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { +func (splittingTokenizer) Tokenize( + line string, + tokens []string, + state interface{}, + _ *prometheus.CounterVec, +) ([]string, interface{}) { numEquals := strings.Count(line, "=") numColons := strings.Count(line, ":") numSpaces := strings.Count(line, " ") @@ -209,8 +223,16 @@ func newLogfmtTokenizer(varReplace string, maxLineLength int) *logfmtTokenizer { } } -func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) { +func (t *logfmtTokenizer) Tokenize( + line string, + tokens []string, + _ interface{}, + linesDropped *prometheus.CounterVec, +) ([]string, interface{}) { if len(line) > t.maxLineLength { + if linesDropped != nil { + linesDropped.WithLabelValues(LineTooLong).Inc() + } return nil, nil } @@ -277,7 +299,12 @@ func newJSONTokenizer(varReplace string, maxLineLength int, fieldsToTokenize []s } } -func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { +func (t *jsonTokenizer) Tokenize( + line string, + tokens []string, + state interface{}, + linesDropped *prometheus.CounterVec, +) ([]string, interface{}) { var found []byte for _, key := range t.fieldsToTokenize { msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key) @@ -297,7 +324,7 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{} return nil, nil } - return t.punctuationTokenizer.Tokenize(foundLine, tokens, state) + return t.punctuationTokenizer.Tokenize(foundLine, tokens, state, linesDropped) } func (t *jsonTokenizer) Join(tokens []string, state interface{}) string { diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index a2c8013b14c3b..0666747cd418f 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -143,7 +143,7 @@ func TestTokenizer_Tokenize(t *testing.T) { for _, tt := range tests { for _, tc := range testCases { t.Run(tt.name+":"+tc.name, func(t *testing.T) { - got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil) + got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil, nil) require.Equal(t, tc.want[tt.name], got) }) } @@ -168,7 +168,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) { for _, tt := range tests { for _, tc := range testCases { t.Run(tt.name+":"+tc.name, func(t *testing.T) { - got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil)) + got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil, nil)) require.Equal(t, tc.line, got) }) } @@ -184,7 +184,7 @@ func BenchmarkSplittingTokenizer(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - tokenizer.Tokenize(tc.line, nil, nil) + tokenizer.Tokenize(tc.line, nil, nil, nil) } }) } @@ -231,7 +231,7 @@ func TestLogFmtTokenizer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, _ := tokenizer.Tokenize(tt.line, nil, nil) + got, _ := tokenizer.Tokenize(tt.line, nil, nil, nil) require.Equal(t, tt.want, got) }) } @@ -330,7 +330,7 @@ func TestJsonTokenizer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, state := tokenizer.Tokenize(tt.line, nil, nil) + got, state := tokenizer.Tokenize(tt.line, nil, nil, nil) require.Equal(t, tt.want, got) if len(got) == len(tt.want) && len(tt.want) != 0 { pattern := tokenizer.Join(got, state) diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index 0b9e5bff2a43e..552a212273390 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -10,6 +10,9 @@ const ( FormatLogfmt = "logfmt" FormatJSON = "json" FormatUnknown = "unknown" + TooFewTokens = "too_few_tokens" + TooManyTokens = "too_many_tokens" + LineTooLong = "line_too_long" ) var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$") @@ -31,6 +34,7 @@ type Metrics struct { PatternsEvictedTotal prometheus.Counter PatternsPrunedTotal prometheus.Counter PatternsDetectedTotal prometheus.Counter + LinesSkipped *prometheus.CounterVec TokensPerLine prometheus.Observer StatePerLine prometheus.Observer } diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index 25bbd1fd1f6e1..3ebc9c195b9b1 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -9,6 +9,7 @@ type ingesterMetrics struct { flushQueueLength prometheus.Gauge patternsDiscardedTotal *prometheus.CounterVec patternsDetectedTotal *prometheus.CounterVec + linesSkipped *prometheus.CounterVec tokensPerLine *prometheus.HistogramVec statePerLine *prometheus.HistogramVec samples *prometheus.CounterVec @@ -34,6 +35,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "patterns_detected_total", Help: "The total number of patterns detected from incoming log lines.", }, []string{"tenant", "format"}), + linesSkipped: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "patterns_dropped_total", + Help: "The total number of log lines skipped for pattern recognition.", + }, []string{"tenant", "reason"}), tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 7f53c5777cfeb..6e506b3ef2fca 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" ) @@ -37,6 +38,10 @@ func newStream( drainCfg *drain.Config, drainLimits drain.Limits, ) (*stream, error) { + linesSkipped, err := metrics.linesSkipped.CurryWith(prometheus.Labels{"tenant": instanceID}) + if err != nil { + return nil, err + } return &stream{ fp: fp, labels: labels, @@ -47,6 +52,7 @@ func newStream( PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"), PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"), PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat), + LinesSkipped: linesSkipped, TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat), StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat), }),