Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add support for syslog rfc 5424 #15467

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,28 @@ import (
type config struct {
harvester.ForwarderConfig `config:",inline"`
Protocol common.ConfigNamespace `config:"protocol"`
Format syslogFormat `config:"format"`
}

type syslogFormat int

const (
syslogFormatRFC3164 = iota
syslogFormatRFC5424
)

var (
syslogFormats = map[string]syslogFormat{
"rfc3164": syslogFormatRFC3164,
"rfc5424": syslogFormatRFC5424,
}
)

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "syslog",
},
Format: syslogFormatRFC3164,
}

type syslogTCP struct {
Expand Down Expand Up @@ -90,3 +106,13 @@ func factory(
return nil, fmt.Errorf("you must choose between TCP or UDP")
}
}

// Unpack validates and unpack the "format" config option
func (f *syslogFormat) Unpack(value string) error {
format, ok := syslogFormats[value]
if !ok {
return fmt.Errorf("invalid format '%s'", value)
}
*f = format
return nil
}
164 changes: 147 additions & 17 deletions filebeat/input/syslog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"

"github.com/influxdata/go-syslog"
"github.com/influxdata/go-syslog/rfc5424"
)

// Parser is generated from a ragel state machine using the following command:
// RFC 3164 parser is generated from a ragel state machine using the following command:
//go:generate ragel -Z -G2 parser.rl -o parser.go
//go:generate go fmt parser.go

Expand Down Expand Up @@ -126,8 +129,33 @@ func NewInput(
return nil, err
}

var cb inputsource.NetworkFunc
forwarder := harvester.NewForwarder(out)
cb := func(data []byte, metadata inputsource.NetworkMetadata) {
switch config.Format {
case syslogFormatRFC3164:
cb = networkFuncFor3164(log, forwarder)
case syslogFormatRFC5424:
cb = networkFuncFor5424(log, forwarder)
}

server, err := factory(cb, config.Protocol)
if err != nil {
return nil, err
}

return &Input{
outlet: out,
started: false,
server: server,
config: &config,
log: log,
}, nil
}

func networkFuncFor3164(
log *logp.Logger, forwarder *harvester.Forwarder,
) inputsource.NetworkFunc {
return (func(data []byte, metadata inputsource.NetworkMetadata) {
ev := newEvent()
Parse(data, ev)
if !ev.IsValid() {
Expand All @@ -144,22 +172,38 @@ func NewInput(
},
})
} else {
forwarder.Send(createEvent(ev, metadata, time.Local, log))
forwarder.Send(createEvent3164(ev, metadata, time.Local, log))
}
}

server, err := factory(cb, config.Protocol)
if err != nil {
return nil, err
}
})
}

return &Input{
outlet: out,
started: false,
server: server,
config: &config,
log: log,
}, nil
// This is very similar to the network function for RFC 3164, but hard to
// combine because the underlying library API is different, so for now there is
// some duplication.
func networkFuncFor5424(
log *logp.Logger, forwarder *harvester.Forwarder,
) inputsource.NetworkFunc {
parser := rfc5424.NewParser()
return (func(data []byte, metadata inputsource.NetworkMetadata) {
message, err := parser.Parse(data)
if err != nil {
log.Errorw("can't parse event as syslog rfc5424",
"message", string(data), "error", err)
// On error revert to the raw bytes content, we need a better way to communicate this kind of
// error upstream this should be a global effort.
forwarder.Send(beat.Event{
Timestamp: time.Now(),
Meta: common.MapStr{
"truncated": metadata.Truncated,
},
Fields: common.MapStr{
"message": string(data),
},
})
} else {
forwarder.Send(createEvent5424(message, metadata, time.Local, log))
}
})
}

// Run starts listening for Syslog events over the network.
Expand Down Expand Up @@ -198,7 +242,7 @@ func (p *Input) Wait() {
p.Stop()
}

func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time.Location, log *logp.Logger) beat.Event {
func createEvent3164(ev *event, metadata inputsource.NetworkMetadata, timezone *time.Location, log *logp.Logger) beat.Event {
f := common.MapStr{
"message": strings.TrimRight(ev.Message(), "\n"),
"log": common.MapStr{
Expand Down Expand Up @@ -263,6 +307,92 @@ func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time
}
}

func createEvent5424(
message syslog.Message,
metadata inputsource.NetworkMetadata,
timezone *time.Location,
log *logp.Logger,
) beat.Event {
if message.Message() == nil {
}
f := common.MapStr{
"log": common.MapStr{
"source": common.MapStr{
"address": metadata.RemoteAddr.String(),
},
},
}
if message.Message() != nil {
f["message"] = strings.TrimRight(*message.Message(), "\n")
}

syslog := common.MapStr{}
event := common.MapStr{}
process := common.MapStr{}

if message.Hostname() != nil {
hostname := *message.Hostname()
if hostname != "" {
f["hostname"] = hostname
}
}
if message.ProcID() != nil {
process["pid"] = *message.ProcID()
}
if message.Appname() != nil {
process["program"] = *message.Appname()
}
if message.Priority() != nil {
syslog["priority"] = *message.Priority()
}

// Severity and facility meanings are shared between rfc 3264 and 5424, so we
// can reuse the same string tables. go-syslog also allows reading the
// string directly, but the strings differ slightly from the ones we used
// so for output consistency we still use our own lookups here.
if message.Severity() != nil {
severity := *message.Severity()
event["severity"] = severity
v, err := mapValueToName(int(severity), severityLabels)
if err != nil {
log.Debugw("could not find severity label", "error", err)
} else {
syslog["severity_label"] = v
}
}

if message.Facility() != nil {
facility := *message.Facility()
v, err := mapValueToName(int(facility), facilityLabels)
if err != nil {
log.Debugw("could not find facility label", "error", err)
} else {
syslog["facility_label"] = v
}
}
f["syslog"] = syslog
f["event"] = event
if len(process) > 0 {
f["process"] = process
}

var timestamp time.Time
timestampPtr := message.Timestamp()
if timestampPtr != nil {
timestamp = *timestampPtr
} else {
// Fall back to the current time if no timestamp is included.
timestamp = time.Now()
}
return beat.Event{
Timestamp: timestamp,
Meta: common.MapStr{
"truncated": metadata.Truncated,
},
Fields: f,
}
}

func mapValueToName(v int, m mapper) (string, error) {
if v < 0 || v >= len(m) {
return "", errors.Errorf("value out of bound: %d", v)
Expand Down
20 changes: 10 additions & 10 deletions filebeat/input/syslog/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestWhenPriorityIsSet(t *testing.T) {
e.SetPid([]byte("123"))

m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))

expected := common.MapStr{
"log": common.MapStr{
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestWhenPriorityIsNotSet(t *testing.T) {
e.SetPid([]byte("123"))

m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))
expected := common.MapStr{
"log": common.MapStr{
"source": common.MapStr{
Expand All @@ -96,7 +96,7 @@ func TestPid(t *testing.T) {
e.SetMessage([]byte("hello world"))
e.SetPid([]byte("123"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))
v, err := event.GetValue("process")
if !assert.NoError(t, err) {
return
Expand All @@ -108,7 +108,7 @@ func TestPid(t *testing.T) {
e := newEvent()
e.SetMessage([]byte("hello world"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))

_, err := event.GetValue("process")
assert.Equal(t, common.ErrKeyNotFound, err)
Expand All @@ -121,7 +121,7 @@ func TestHostname(t *testing.T) {
e.SetMessage([]byte("hello world"))
e.SetHostname([]byte("wopr"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))
v, err := event.GetValue("hostname")
if !assert.NoError(t, err) {
return
Expand All @@ -133,7 +133,7 @@ func TestHostname(t *testing.T) {
e := newEvent()
e.SetMessage([]byte("hello world"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))

_, err := event.GetValue("hostname")
if !assert.Error(t, err) {
Expand All @@ -148,7 +148,7 @@ func TestProgram(t *testing.T) {
e.SetMessage([]byte("hello world"))
e.SetProgram([]byte("sudo"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))
v, err := event.GetValue("process")
if !assert.NoError(t, err) {
return
Expand All @@ -160,7 +160,7 @@ func TestProgram(t *testing.T) {
e := newEvent()
e.SetMessage([]byte("hello world"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))

_, err := event.GetValue("process")
assert.Equal(t, common.ErrKeyNotFound, err)
Expand All @@ -174,7 +174,7 @@ func TestSequence(t *testing.T) {
e.SetProgram([]byte("sudo"))
e.SetSequence([]byte("123"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))
v, err := event.GetValue("event.sequence")
if !assert.NoError(t, err) {
return
Expand All @@ -186,7 +186,7 @@ func TestSequence(t *testing.T) {
e := newEvent()
e.SetMessage([]byte("hello world"))
m := dummyMetadata()
event := createEvent(e, m, time.Local, logp.NewLogger("syslog"))
event := createEvent3164(e, m, time.Local, logp.NewLogger("syslog"))

_, err := event.GetValue("event.sequence")
assert.Error(t, err)
Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/influxdata/go-syslog/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading