Skip to content

Commit

Permalink
[pkg/stanza] Add NewConfigWithID func to each operator (open-telemetr…
Browse files Browse the repository at this point in the history
…y#13293)

The notion of operator IDs in this module was once very prominent.
No operator could be specified without an ID, in any context,
including in a config file. Usability improvements have since
made operator IDs optional in the config file, in most cases.
This change does the same for development, where often operator
IDs are irrelevant to the scneario.
  • Loading branch information
djaglowski authored Aug 15, 2022
1 parent 090af80 commit e26ca3b
Show file tree
Hide file tree
Showing 72 changed files with 415 additions and 234 deletions.
2 changes: 1 addition & 1 deletion internal/components/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestDefaultReceivers(t *testing.T) {
receiver: "syslog",
getConfigFn: func() config.Receiver {
cfg := rcvrFactories["syslog"].CreateDefaultConfig().(*syslogreceiver.SysLogConfig)
cfg.TCP = &tcpop.NewConfig("tcp_input").BaseConfig
cfg.TCP = &tcpop.NewConfig().BaseConfig
cfg.TCP.ListenAddress = "0.0.0.0:0"
cfg.Protocol = "rfc5424"
return cfg
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func createNoopReceiver(workerCount int, nextConsumer consumer.Logs) (*receiver,
pipe, err := pipeline.Config{
Operators: []operator.Config{
{
Builder: noop.NewConfig(""),
Builder: noop.NewConfig(),
},
},
}.Build(zap.NewNop().Sugar())
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (f TestReceiverType) DecodeInputConfig(cfg config.Receiver) (*operator.Conf

// Allow tests to run without implementing input config
if testConfig.Input["type"] == nil {
return &operator.Config{Builder: noop.NewConfig("nop")}, nil
return &operator.Config{Builder: noop.NewConfig()}, nil
}

// Allow tests to explicitly prompt a failure
Expand Down
12 changes: 6 additions & 6 deletions pkg/stanza/operator/input/file/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func BenchmarkFileInput(b *testing.B) {
"file0.log",
},
config: func() *Config {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.Include = []string{
"file0.log",
}
Expand All @@ -72,7 +72,7 @@ func BenchmarkFileInput(b *testing.B) {
"file3.log",
},
config: func() *Config {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.Include = []string{"file*.log"}
return cfg
},
Expand All @@ -86,7 +86,7 @@ func BenchmarkFileInput(b *testing.B) {
"log1.log",
},
config: func() *Config {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.Include = []string{
"file*.log",
"log*.log",
Expand All @@ -103,7 +103,7 @@ func BenchmarkFileInput(b *testing.B) {
"file3.log",
},
config: func() *Config {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.Include = []string{
"file*.log",
}
Expand All @@ -117,7 +117,7 @@ func BenchmarkFileInput(b *testing.B) {
"file0.log",
},
config: func() *Config {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.Include = []string{
"file*.log",
}
Expand All @@ -131,7 +131,7 @@ func BenchmarkFileInput(b *testing.B) {
"file0.log",
},
config: func() *Config {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.Include = []string{
"file*.log",
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/stanza/operator/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,21 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "file_input"

func init() {
operator.Register("file_input", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new input config with default values
func NewConfig(operatorID string) *Config {
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfigWithID creates a new input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
InputConfig: helper.NewInputConfig(operatorID, operatorType),
Config: *fileconsumer.NewConfig(),
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/operator/input/file/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestUnmarshal(t *testing.T) {
{
Name: "id_custom",
ExpectErr: false,
Expect: NewConfig("test_id"),
Expect: NewConfigWithID("test_id"),
},
{
Name: "include_one",
Expand Down Expand Up @@ -520,7 +520,7 @@ func TestBuild(t *testing.T) {
fakeOutput := testutil.NewMockOperator("fake")

basicConfig := func() *Config {
cfg := NewConfig("testfile")
cfg := NewConfigWithID("testfile")
cfg.OutputIDs = []string{"fake"}
cfg.Include = []string{"/var/log/testpath.*"}
cfg.Exclude = []string{"/var/log/testpath.ex*"}
Expand Down Expand Up @@ -747,11 +747,11 @@ func requireSamePreEmitOptions(t *testing.T, expect, actual []preEmitOption) {
}

func defaultCfg() *Config {
return NewConfig("file_input")
return NewConfig()
}

func NewTestConfig() *Config {
cfg := NewConfig("config_test")
cfg := NewConfigWithID("config_test")
cfg.Include = []string{"i1", "i2"}
cfg.Exclude = []string{"e1", "e2"}
cfg.Splitter = helper.NewSplitterConfig()
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/input/file/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

func newDefaultConfig(tempDir string) *Config {
cfg := NewConfig("testfile")
cfg := NewConfigWithID("testfile")
cfg.PollInterval = helper.Duration{Duration: 200 * time.Millisecond}
cfg.StartAt = "beginning"
cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)}
Expand Down
14 changes: 11 additions & 3 deletions pkg/stanza/operator/input/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "journald_input"

func init() {
operator.Register("journald_input", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new input config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

func NewConfig(operatorID string) *Config {
// NewConfigWithID creates a new input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, "journald_input"),
InputConfig: helper.NewInputConfig(operatorID, operatorType),
StartAt: "end",
Priority: "info",
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/operator/input/journald/journald_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (f *fakeJournaldCmd) StdoutPipe() (io.ReadCloser, error) {
}

func TestInputJournald(t *testing.T) {
cfg := NewConfig("my_journald_input")
cfg := NewConfigWithID("my_journald_input")
cfg.OutputIDs = []string{"output"}

op, err := cfg.Build(testutil.Logger(t))
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestInputJournald(t *testing.T) {
}

func TestConfig(t *testing.T) {
expect := NewConfig("my_journald_input")
expect := NewConfigWithID("my_journald_input")

input := map[string]interface{}{
"id": "my_journald_input",
Expand Down
21 changes: 15 additions & 6 deletions pkg/stanza/operator/input/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,21 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/syslog"
)

const operatorType = "syslog_input"

func init() {
operator.Register("syslog_input", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new input config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}
func NewConfig(operatorID string) *Config {

// NewConfigWithID creates a new input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, "syslog_input"),
InputConfig: helper.NewInputConfig(operatorID, operatorType),
}
}

Expand All @@ -48,7 +57,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
return nil, err
}

syslogParserCfg := syslog.NewConfig(inputBase.ID() + "_internal_tcp")
syslogParserCfg := syslog.NewConfigWithID(inputBase.ID() + "_internal_tcp")
syslogParserCfg.BaseConfig = c.BaseConfig
syslogParserCfg.SetID(inputBase.ID() + "_internal_parser")
syslogParserCfg.OutputIDs = c.OutputIDs
Expand All @@ -58,7 +67,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

if c.TCP != nil {
tcpInputCfg := tcp.NewConfig(inputBase.ID() + "_internal_tcp")
tcpInputCfg := tcp.NewConfigWithID(inputBase.ID() + "_internal_tcp")
tcpInputCfg.BaseConfig = *c.TCP

tcpInput, err := tcpInputCfg.Build(logger)
Expand All @@ -79,7 +88,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

if c.UDP != nil {
udpInputCfg := udp.NewConfig(inputBase.ID() + "_internal_udp")
udpInputCfg := udp.NewConfigWithID(inputBase.ID() + "_internal_udp")
udpInputCfg.BaseConfig = *c.UDP

udpInput, err := udpInputCfg.Build(logger)
Expand Down
12 changes: 6 additions & 6 deletions pkg/stanza/operator/input/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func TestInput(t *testing.T) {
basicConfig := func() *syslog.Config {
cfg := syslog.NewConfig("test_syslog_parser")
cfg := syslog.NewConfigWithID("test_syslog_parser")
return cfg
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func InputTest(t *testing.T, cfg *Config, tc syslog.Case) {

func TestSyslogIDs(t *testing.T) {
basicConfig := func() *syslog.BaseConfig {
cfg := syslog.NewConfig("test_syslog_parser")
cfg := syslog.NewConfigWithID("test_syslog_parser")
cfg.Protocol = "RFC3164"
return &cfg.BaseConfig
}
Expand Down Expand Up @@ -128,18 +128,18 @@ func TestSyslogIDs(t *testing.T) {
}

func NewConfigWithTCP(syslogCfg *syslog.BaseConfig) *Config {
cfg := NewConfig("test_syslog")
cfg := NewConfigWithID("test_syslog")
cfg.BaseConfig = *syslogCfg
cfg.TCP = &tcp.NewConfig("test_syslog_tcp").BaseConfig
cfg.TCP = &tcp.NewConfigWithID("test_syslog_tcp").BaseConfig
cfg.TCP.ListenAddress = ":14201"
cfg.OutputIDs = []string{"fake"}
return cfg
}

func NewConfigWithUDP(syslogCfg *syslog.BaseConfig) *Config {
cfg := NewConfig("test_syslog")
cfg := NewConfigWithID("test_syslog")
cfg.BaseConfig = *syslogCfg
cfg.UDP = &udp.NewConfig("test_syslog_udp").BaseConfig
cfg.UDP = &udp.NewConfigWithID("test_syslog_udp").BaseConfig
cfg.UDP.ListenAddress = ":12032"
cfg.OutputIDs = []string{"fake"}
return cfg
Expand Down
15 changes: 11 additions & 4 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
)

const (
operatorType = "tcp_input"

// minMaxLogSize is the minimal size which can be used for buffering
// TCP input
minMaxLogSize = 64 * 1024
Expand All @@ -43,13 +45,18 @@ const (
)

func init() {
operator.Register("tcp_input", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfigWithID creates a new TCP input config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfig creates a new TCP input config with default values
func NewConfig(operatorID string) *Config {
// NewConfigWithID creates a new TCP input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, "tcp_input"),
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/stanza/operator/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ zv9WEy+9p05Aet+12x3dzRu93+yRIEYbSZ35NOUWfQ+gspF5rGgpxA==

func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.ListenAddress = ":0"

op, err := cfg.Build(testutil.Logger(t))
Expand Down Expand Up @@ -135,7 +135,7 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {

func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.ListenAddress = ":0"
cfg.AddAttributes = true

Expand Down Expand Up @@ -216,7 +216,7 @@ func tlsInputTest(input []byte, expected []string) func(t *testing.T) {
require.NoError(t, err)
f.Close()

cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.ListenAddress = ":0"
cfg.TLS = helper.NewTLSServerConfig(&configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestBuild(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.ListenAddress = tc.inputBody.ListenAddress
cfg.MaxLogSize = tc.inputBody.MaxLogSize
cfg.TLS = tc.inputBody.TLS
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestFailToBind(t *testing.T) {
}

var startTCP = func(int) (*Input, error) {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.ListenAddress = net.JoinHostPort(ip, strconv.Itoa(port))
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)
Expand All @@ -419,7 +419,7 @@ func TestFailToBind(t *testing.T) {
}

func BenchmarkTCPInput(b *testing.B) {
cfg := NewConfig("test_id")
cfg := NewConfigWithID("test_id")
cfg.ListenAddress = ":0"

op, err := cfg.Build(testutil.Logger(b))
Expand Down
13 changes: 10 additions & 3 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,25 @@ import (
)

const (
operatorType = "udp_input"

// Maximum UDP packet size
MaxUDPSize = 64 * 1024
)

func init() {
operator.Register("udp_input", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new UDP input config with default values
func NewConfig(operatorID string) *Config {
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfigWithID creates a new UDP input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, "udp_input"),
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Encoding: helper.NewEncodingConfig(),
Multiline: helper.MultilineConfig{
Expand Down
Loading

0 comments on commit e26ca3b

Please sign in to comment.