From a27615cc20f7ebe28560cd6ed7cb7c1ff8d105cb Mon Sep 17 00:00:00 2001 From: Vincent Miszczak Date: Mon, 5 Feb 2024 15:05:18 +0100 Subject: [PATCH] fix(loki.source.syslog): UDP path returns EOF --- .../internal/syslogtarget/syslogtarget_test.go | 6 +++--- .../syslog/internal/syslogtarget/transport.go | 18 +++++------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go b/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go index 64eda40836a8..0c866e132e67 100644 --- a/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go +++ b/component/loki/source/syslog/internal/syslogtarget/syslogtarget_test.go @@ -298,6 +298,7 @@ type formatFunc func(string) string var ( fmtOctetCounting = func(s string) string { return fmt.Sprintf("%d %s", len(s), s) } fmtNewline = func(s string) string { return s + "\n" } + fmtIdentity = func(s string) string { return s } ) func Benchmark_SyslogTarget(b *testing.B) { @@ -307,7 +308,7 @@ func Benchmark_SyslogTarget(b *testing.B) { formatFunc formatFunc }{ {"tcp", protocolTCP, fmtOctetCounting}, - {"udp", protocolUDP, fmtOctetCounting}, + {"udp", protocolUDP, fmtIdentity}, } { tt := tt b.Run(tt.name, func(b *testing.B) { @@ -366,8 +367,7 @@ func TestSyslogTarget(t *testing.T) { }{ {"tcp newline separated", protocolTCP, fmtNewline}, {"tcp octetcounting", protocolTCP, fmtOctetCounting}, - {"udp newline separated", protocolUDP, fmtNewline}, - {"udp octetcounting", protocolUDP, fmtOctetCounting}, + {"udp", protocolUDP, fmtIdentity}, } { tt := tt t.Run(tt.name, func(t *testing.T) { diff --git a/component/loki/source/syslog/internal/syslogtarget/transport.go b/component/loki/source/syslog/internal/syslogtarget/transport.go index 3348f27b15d3..b9bcdc4aa7fe 100644 --- a/component/loki/source/syslog/internal/syslogtarget/transport.go +++ b/component/loki/source/syslog/internal/syslogtarget/transport.go @@ -5,7 +5,6 @@ package syslogtarget // to other loki components. import ( - "bytes" "context" "crypto/tls" "crypto/x509" @@ -23,6 +22,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/influxdata/go-syslog/v3" + "github.com/influxdata/go-syslog/v3/rfc5424" "github.com/prometheus/common/config" "github.com/prometheus/prometheus/model/labels" @@ -444,18 +444,10 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) { continue } - r := bytes.NewReader(datagram[:n]) - - err = syslogparser.ParseStream(r, func(result *syslog.Result) { - if err := result.Error; err != nil { - t.handleMessageError(err) - } else { - t.handleMessage(lbs.Copy(), result.Message) - } - }, t.maxMessageLength()) - - if err != nil { - level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + if parsed, err := rfc5424.NewParser(rfc5424.WithBestEffort()).Parse(datagram[:n]); err != nil { + t.handleMessageError(err) + } else { + t.handleMessage(lbs.Copy(), parsed) } } }