Skip to content

Commit

Permalink
[receiver/syslog] Migrate config unmarshaling from yaml to mapstructu…
Browse files Browse the repository at this point in the history
…re (open-telemetry#13253)

* [receiver/syslog] Migrate config unmarshaling from yaml to mapstructure

pkg/stanza historically used yaml for configuration unmarshaling.
The process of migrating to mapstructure was begun long ago but
was never completed. This PR switches syslog's underlying operators
to use mapstructure unmarshaling.
  • Loading branch information
djaglowski authored Aug 12, 2022
1 parent 9d56ebd commit dba69bb
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 47 deletions.
9 changes: 3 additions & 6 deletions internal/components/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,9 @@ func TestDefaultReceivers(t *testing.T) {
receiver: "syslog",
getConfigFn: func() config.Receiver {
cfg := rcvrFactories["syslog"].CreateDefaultConfig().(*syslogreceiver.SysLogConfig)
cfg.Input = adapter.InputConfig{
"tcp": map[string]interface{}{
"listen_address": "0.0.0.0:0",
},
"protocol": "rfc5424",
}
cfg.TCP = syslogreceiver.CreateDefaultTCPConfigBase()
cfg.TCP.ListenAddress = "0.0.0.0:0"
cfg.Protocol = "rfc5424"
return cfg
},
},
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/operator/input/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func NewConfig(operatorID string) *Config {
}

type Config struct {
helper.InputConfig `yaml:",inline"`
syslog.BaseConfig `yaml:",inline"`
TCP *tcp.BaseConfig `json:"tcp" yaml:"tcp"`
UDP *udp.BaseConfig `json:"udp" yaml:"udp"`
helper.InputConfig `mapstructure:",squash" yaml:",inline"`
syslog.BaseConfig `mapstructure:",squash" yaml:",inline"`
TCP *tcp.BaseConfig `mapstructure:"tcp" json:"tcp" yaml:"tcp"`
UDP *udp.BaseConfig `mapstructure:"udp" json:"udp" yaml:"udp"`
}

func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func NewConfig(operatorID string) *Config {

// Config is the configuration of a tcp input operator.
type Config struct {
helper.InputConfig `yaml:",inline"`
BaseConfig `yaml:",inline"`
helper.InputConfig `mapstructure:",squash" yaml:",inline"`
BaseConfig `mapstructure:",squash" yaml:",inline"`
}

// BaseConfig is the detailed configuration of a tcp input operator.
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func NewConfig(operatorID string) *Config {

// Config is the configuration of a udp input operator.
type Config struct {
helper.InputConfig `yaml:",inline"`
BaseConfig `yaml:",inline"`
helper.InputConfig `mapstructure:",squash" yaml:",inline"`
BaseConfig `mapstructure:",squash" yaml:",inline"`
}

// BaseConfig is the details configuration of a udp input operator.
Expand Down
2 changes: 1 addition & 1 deletion receiver/syslogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.58.0
go.opentelemetry.io/collector/pdata v0.58.0
gopkg.in/yaml.v2 v2.4.0
)

require (
Expand Down Expand Up @@ -45,6 +44,7 @@ require (
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
37 changes: 27 additions & 10 deletions receiver/syslogreceiver/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package syslogreceiver // import "github.com/open-telemetry/opentelemetry-collec
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"gopkg.in/yaml.v2"
"go.opentelemetry.io/collector/confmap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/syslog"
syslogparser "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/syslog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
)

const (
Expand Down Expand Up @@ -51,7 +52,7 @@ func (f ReceiverType) CreateDefaultConfig() config.Receiver {
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Operators: adapter.OperatorConfigs{},
},
Input: adapter.InputConfig{},
Config: *syslog.NewConfig("syslog_input"),
}
}

Expand All @@ -62,19 +63,35 @@ func (f ReceiverType) BaseConfig(cfg config.Receiver) adapter.BaseConfig {

// SysLogConfig defines configuration for the syslog receiver
type SysLogConfig struct {
syslog.Config `mapstructure:",squash"`
adapter.BaseConfig `mapstructure:",squash"`
Input adapter.InputConfig `mapstructure:",remain"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*SysLogConfig)
yamlBytes, _ := yaml.Marshal(logConfig.Input)
inputCfg := syslog.NewConfig("syslog_input")
inputCfg.BaseConfig = syslogparser.NewConfig("syslog_parser").BaseConfig
return &operator.Config{Builder: &logConfig.Config}, nil
}

func (cfg *SysLogConfig) Unmarshal(componentParser *confmap.Conf) error {
if componentParser == nil {
// Nothing to do if there is no config given.
return nil
}

if err := yaml.Unmarshal(yamlBytes, &inputCfg); err != nil {
return nil, err
if componentParser.IsSet("tcp") {
cfg.TCP = CreateDefaultTCPConfigBase()
} else if componentParser.IsSet("udp") {
cfg.UDP = CreateDefaultUDPConfigBase()
}
return &operator.Config{Builder: inputCfg}, nil

return componentParser.UnmarshalExact(cfg)
}

func CreateDefaultTCPConfigBase() *tcp.BaseConfig {
return &tcp.NewConfig("tcp_input").BaseConfig
}

func CreateDefaultUDPConfigBase() *udp.BaseConfig {
return &udp.NewConfig("udp_input").BaseConfig
}
49 changes: 27 additions & 22 deletions receiver/syslogreceiver/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/syslog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
)

func TestSyslogWithTcp(t *testing.T) {
testSyslog(t, testdataConfigYamlAsMap())
testSyslog(t, testdataConfigYaml())
}

func TestSyslogWithUdp(t *testing.T) {
Expand All @@ -51,7 +54,7 @@ func testSyslog(t *testing.T, cfg *SysLogConfig) {
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))

var conn net.Conn
if cfg.Input["tcp"] != nil {
if cfg.Config.TCP != nil {
conn, err = net.Dial("tcp", "0.0.0.0:29018")
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -94,10 +97,10 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 1)
assert.Equal(t, testdataConfigYamlAsMap(), cfg.Receivers[config.NewComponentID(typeStr)])
assert.Equal(t, testdataConfigYaml(), cfg.Receivers[config.NewComponentID(typeStr)])
}

func testdataConfigYamlAsMap() *SysLogConfig {
func testdataConfigYaml() *SysLogConfig {
return &SysLogConfig{
BaseConfig: adapter.BaseConfig{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Expand All @@ -107,12 +110,13 @@ func testdataConfigYamlAsMap() *SysLogConfig {
WorkerCount: 1,
},
},
Input: adapter.InputConfig{
"tcp": map[string]interface{}{
"listen_address": "0.0.0.0:29018",
},
"protocol": "rfc5424",
},
Config: func() syslog.Config {
c := syslog.NewConfig("syslog_input")
c.TCP = &tcp.NewConfig("tcp_input").BaseConfig
c.TCP.ListenAddress = "0.0.0.0:29018"
c.Protocol = "rfc5424"
return *c
}(),
}
}

Expand All @@ -126,12 +130,13 @@ func testdataUDPConfig() *SysLogConfig {
WorkerCount: 1,
},
},
Input: adapter.InputConfig{
"udp": map[string]interface{}{
"listen_address": "0.0.0.0:29018",
},
"protocol": "rfc5424",
},
Config: func() syslog.Config {
c := syslog.NewConfig("syslog_input")
c.UDP = &udp.NewConfig("udp_input").BaseConfig
c.UDP.ListenAddress = "0.0.0.0:29018"
c.Protocol = "rfc5424"
return *c
}(),
}
}

Expand All @@ -143,12 +148,12 @@ func TestDecodeInputConfigFailure(t *testing.T) {
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Operators: adapter.OperatorConfigs{},
},
Input: adapter.InputConfig{
"tcp": map[string]interface{}{
"max_buffer_size": "0.1.0.1-",
},
"protocol": "rfc5424",
},
Config: func() syslog.Config {
c := syslog.NewConfig("syslog_input")
c.TCP = &tcp.NewConfig("tcp_input").BaseConfig
c.Protocol = "fake"
return *c
}(),
}
receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), badCfg, sink)
require.Error(t, err, "receiver creation should fail if input config isn't valid")
Expand Down

0 comments on commit dba69bb

Please sign in to comment.