Skip to content

Commit

Permalink
fix(loki.source.syslog): UDP path returns EOF
Browse files Browse the repository at this point in the history
  • Loading branch information
github-vincent-miszczak committed Feb 6, 2024
1 parent 755e1b7 commit a27615c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ type formatFunc func(string) string
var (
fmtOctetCounting = func(s string) string { return fmt.Sprintf("%d %s", len(s), s) }
fmtNewline = func(s string) string { return s + "\n" }
fmtIdentity = func(s string) string { return s }
)

func Benchmark_SyslogTarget(b *testing.B) {
Expand All @@ -307,7 +308,7 @@ func Benchmark_SyslogTarget(b *testing.B) {
formatFunc formatFunc
}{
{"tcp", protocolTCP, fmtOctetCounting},
{"udp", protocolUDP, fmtOctetCounting},
{"udp", protocolUDP, fmtIdentity},
} {
tt := tt
b.Run(tt.name, func(b *testing.B) {
Expand Down Expand Up @@ -366,8 +367,7 @@ func TestSyslogTarget(t *testing.T) {
}{
{"tcp newline separated", protocolTCP, fmtNewline},
{"tcp octetcounting", protocolTCP, fmtOctetCounting},
{"udp newline separated", protocolUDP, fmtNewline},
{"udp octetcounting", protocolUDP, fmtOctetCounting},
{"udp", protocolUDP, fmtIdentity},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
Expand Down
18 changes: 5 additions & 13 deletions component/loki/source/syslog/internal/syslogtarget/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package syslogtarget
// to other loki components.

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/influxdata/go-syslog/v3"
"github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/prometheus/common/config"
"github.com/prometheus/prometheus/model/labels"

Expand Down Expand Up @@ -444,18 +444,10 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) {
continue
}

r := bytes.NewReader(datagram[:n])

err = syslogparser.ParseStream(r, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
} else {
t.handleMessage(lbs.Copy(), result.Message)
}
}, t.maxMessageLength())

if err != nil {
level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)
if parsed, err := rfc5424.NewParser(rfc5424.WithBestEffort()).Parse(datagram[:n]); err != nil {
t.handleMessageError(err)
} else {
t.handleMessage(lbs.Copy(), parsed)
}
}
}
Expand Down

0 comments on commit a27615c

Please sign in to comment.