Skip to content

Commit

Permalink
[receiver/tcp|udplog] Migrate config unmarshaling from yaml to mapstr…
Browse files Browse the repository at this point in the history
…ucture (open-telemetry#13294)

* [receiver/tcp|udp] Migrate config unmarshaling from yaml to mapstructure

pkg/stanza historically used yaml for configuration unmarshaling.
The process of migrating to mapstructure was begun long ago but
was never completed. This PR switches tcplog and udplog underlying operators
to use mapstructure unmarshaling.
  • Loading branch information
djaglowski authored Aug 15, 2022
1 parent eb95d08 commit acfcfbe
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 56 deletions.
9 changes: 2 additions & 7 deletions internal/components/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
tcpop "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/chronyreceiver"
Expand Down Expand Up @@ -328,19 +327,15 @@ func TestDefaultReceivers(t *testing.T) {
receiver: "tcplog",
getConfigFn: func() config.Receiver {
cfg := rcvrFactories["tcplog"].CreateDefaultConfig().(*tcplogreceiver.TCPLogConfig)
cfg.Input = adapter.InputConfig{
"listen_address": "0.0.0.0:0",
}
cfg.ListenAddress = "0.0.0.0:0"
return cfg
},
},
{
receiver: "udplog",
getConfigFn: func() config.Receiver {
cfg := rcvrFactories["udplog"].CreateDefaultConfig().(*udplogreceiver.UDPLogConfig)
cfg.Input = adapter.InputConfig{
"listen_address": "0.0.0.0:0",
}
cfg.ListenAddress = "0.0.0.0:0"
return cfg
},
},
Expand Down
17 changes: 10 additions & 7 deletions pkg/stanza/operator/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,16 @@ func TestParserCSV(t *testing.T) {
},
[]entry.Entry{
{
Attributes: map[string]interface{}{
"columns": "name age height number",
"name": "stanza dev",
"age": "1",
"height": "400",
"number": "555-555-5555",
},
Attributes: func() map[string]interface{} {
m := map[string]interface{}{
"name": "stanza dev",
"age": "1",
"height": "400",
"number": "555-555-5555",
}
m["columns"] = "name age height number"
return m
}(),
Body: "stanza dev 1 400 555-555-5555",
},
},
Expand Down
2 changes: 1 addition & 1 deletion receiver/tcplogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.58.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.58.0
gopkg.in/yaml.v2 v2.4.0
)

require (
Expand Down Expand Up @@ -44,6 +43,7 @@ require (
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
14 changes: 3 additions & 11 deletions receiver/tcplogreceiver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package tcplogreceiver // import "github.com/open-telemetry/opentelemetry-collec
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"gopkg.in/yaml.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (f ReceiverType) CreateDefaultConfig() config.Receiver {
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Operators: adapter.OperatorConfigs{},
},
Input: adapter.InputConfig{},
Config: *tcp.NewConfig("tcp_input"),
}
}

Expand All @@ -61,19 +60,12 @@ func (f ReceiverType) BaseConfig(cfg config.Receiver) adapter.BaseConfig {

// TCPLogConfig defines configuration for the tcp receiver
type TCPLogConfig struct {
tcp.Config `mapstructure:",squash"`
adapter.BaseConfig `mapstructure:",squash"`
Input adapter.InputConfig `mapstructure:",remain"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*TCPLogConfig)
yamlBytes, _ := yaml.Marshal(logConfig.Input)
inputCfg := tcp.NewConfig("tcp_input")

if err := yaml.Unmarshal(yamlBytes, &inputCfg); err != nil {
return nil, err
}

return &operator.Config{Builder: inputCfg}, nil
return &operator.Config{Builder: &logConfig.Config}, nil
}
23 changes: 14 additions & 9 deletions receiver/tcplogreceiver/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
)

func TestTcp(t *testing.T) {
testTCP(t, testdataConfigYamlAsMap())
testTCP(t, testdataConfigYaml())
}

func testTCP(t *testing.T, cfg *TCPLogConfig) {
Expand Down Expand Up @@ -82,10 +83,10 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 1)
assert.Equal(t, testdataConfigYamlAsMap(), cfg.Receivers[config.NewComponentID(typeStr)])
assert.Equal(t, testdataConfigYaml(), cfg.Receivers[config.NewComponentID(typeStr)])
}

func testdataConfigYamlAsMap() *TCPLogConfig {
func testdataConfigYaml() *TCPLogConfig {
return &TCPLogConfig{
BaseConfig: adapter.BaseConfig{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Expand All @@ -94,9 +95,11 @@ func testdataConfigYamlAsMap() *TCPLogConfig {
WorkerCount: 1,
},
},
Input: adapter.InputConfig{
"listen_address": "0.0.0.0:29018",
},
Config: func() tcp.Config {
c := tcp.NewConfig("tcp_input")
c.ListenAddress = "0.0.0.0:29018"
return *c
}(),
}
}

Expand All @@ -107,9 +110,11 @@ func TestDecodeInputConfigFailure(t *testing.T) {
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Operators: adapter.OperatorConfigs{},
},
Input: adapter.InputConfig{
"max_buffer_size": "0.1.0.1-",
},
Config: func() tcp.Config {
c := tcp.NewConfig("tcp_input")
c.Encoding.Encoding = "fake"
return *c
}(),
}
receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), badCfg, consumertest.NewNop())
require.Error(t, err, "receiver creation should fail if input config isn't valid")
Expand Down
2 changes: 1 addition & 1 deletion receiver/udplogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.58.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.58.0
gopkg.in/yaml.v2 v2.4.0
)

require (
Expand Down Expand Up @@ -43,6 +42,7 @@ require (
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
14 changes: 3 additions & 11 deletions receiver/udplogreceiver/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package udplogreceiver // import "github.com/open-telemetry/opentelemetry-collec
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"gopkg.in/yaml.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (f ReceiverType) CreateDefaultConfig() config.Receiver {
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Operators: adapter.OperatorConfigs{},
},
Input: adapter.InputConfig{},
Config: *udp.NewConfig("udp_input"),
}
}

Expand All @@ -61,19 +60,12 @@ func (f ReceiverType) BaseConfig(cfg config.Receiver) adapter.BaseConfig {

// UDPLogConfig defines configuration for the udp receiver
type UDPLogConfig struct {
udp.Config `mapstructure:",squash"`
adapter.BaseConfig `mapstructure:",squash"`
Input adapter.InputConfig `mapstructure:",remain"`
}

// DecodeInputConfig unmarshals the input operator
func (f ReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Config, error) {
logConfig := cfg.(*UDPLogConfig)
yamlBytes, _ := yaml.Marshal(logConfig.Input)
inputCfg := udp.NewConfig("udp_input")

if err := yaml.Unmarshal(yamlBytes, &inputCfg); err != nil {
return nil, err
}

return &operator.Config{Builder: inputCfg}, nil
return &operator.Config{Builder: &logConfig.Config}, nil
}
23 changes: 14 additions & 9 deletions receiver/udplogreceiver/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
)

func TestUdp(t *testing.T) {
testUDP(t, testdataConfigYamlAsMap())
testUDP(t, testdataConfigYaml())
}

func testUDP(t *testing.T, cfg *UDPLogConfig) {
Expand Down Expand Up @@ -86,18 +87,20 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 1)
assert.Equal(t, testdataConfigYamlAsMap(), cfg.Receivers[config.NewComponentID("udplog")])
assert.Equal(t, testdataConfigYaml(), cfg.Receivers[config.NewComponentID("udplog")])
}

func testdataConfigYamlAsMap() *UDPLogConfig {
func testdataConfigYaml() *UDPLogConfig {
return &UDPLogConfig{
BaseConfig: adapter.BaseConfig{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("udplog")),
Operators: adapter.OperatorConfigs{},
},
Input: adapter.InputConfig{
"listen_address": "0.0.0.0:29018",
},
Config: func() udp.Config {
c := udp.NewConfig("udp_input")
c.ListenAddress = "0.0.0.0:29018"
return *c
}(),
}
}

Expand All @@ -109,9 +112,11 @@ func TestDecodeInputConfigFailure(t *testing.T) {
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("udplog")),
Operators: adapter.OperatorConfigs{},
},
Input: adapter.InputConfig{
"max_buffer_size": "0.1.0.1-",
},
Config: func() udp.Config {
c := udp.NewConfig("udp_input")
c.Encoding.Encoding = "fake"
return *c
}(),
}
receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), badCfg, sink)
require.Error(t, err, "receiver creation should fail if input config isn't valid")
Expand Down

0 comments on commit acfcfbe

Please sign in to comment.