Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use 0.0.0.0 as otlp receiver default address #3325

Merged
merged 6 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/add_receiver_defaults.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: operator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use 0.0.0.0 as otlp receiver default address

# One or more tracking issues related to the change
issues: [3126]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion apis/v1beta1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error {
if len(otelcol.Spec.ManagementState) == 0 {
otelcol.Spec.ManagementState = ManagementStateManaged
}
return nil
return otelcol.Spec.Config.ApplyDefaults(c.logger)
}

func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
Expand Down
33 changes: 33 additions & 0 deletions apis/v1beta1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,39 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
expected v1beta1.OpenTelemetryCollector
shouldFailSar bool
}{
{
name: "update config defaults",
otelcol: v1beta1.OpenTelemetryCollector{
Spec: v1beta1.OpenTelemetryCollectorSpec{
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":null,"http":null}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
expected: v1beta1.OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
Spec: v1beta1.OpenTelemetryCollectorSpec{
OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{
Args: map[string]string{"feature-gates": "-component.UseLocalHostAsDefaultHost"},
ManagementState: v1beta1.ManagementStateManaged,
Replicas: &one,
},
Mode: v1beta1.ModeDeployment,
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
},
{
name: "all fields default",
otelcol: v1beta1.OpenTelemetryCollector{},
Expand Down
71 changes: 71 additions & 0 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"reflect"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -225,6 +226,72 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
return ports, nil
}

// getPortsForComponentKinds gets the ports for the given ComponentKind(s).
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
enabledComponents := c.GetEnabledComponents()
for _, componentKind := range componentKinds {
var retriever components.ParserRetriever
var cfg AnyConfig
switch componentKind {
case KindReceiver:
retriever = receivers.ReceiverFor
cfg = c.Receivers
case KindExporter:
continue
case KindProcessor:
continue
case KindExtension:
continue
}
for componentName := range enabledComponents[componentKind] {
parser := retriever(componentName)
if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil {
return err
} else {
// NOTE: The internally used parser returns components.SingleEndpointConfig
// as a map value. The following lines normalize this value..
got, err := yaml.Marshal(newCfg)
frzifus marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
out := make(map[string]interface{}, 0)
if err := yaml.Unmarshal(got, out); err != nil {
return err
}
// NOTE: The underlying struct compoents.SingleEndpointConfig adds this listenaddress
// field. It is marshaled due to internal use. To avoid adding invalid fields to the
// collector config, this temporary workaround removes this field.
// TODO: Try to get rid of it or move it into the parser.GetDefaultConfig method.
removeKeysRecursively(out, "listenaddress")
frzifus marked this conversation as resolved.
Show resolved Hide resolved
cfg.Object[componentName] = out
}
}
}

return nil
}

func removeKeysRecursively(m map[string]interface{}, keysToRemove ...string) {
for k, v := range m {
if slices.Contains(keysToRemove, k) {
delete(m, k)
continue
}

if nm, ok := v.(map[string]interface{}); ok {
removeKeysRecursively(nm, keysToRemove...)
}

if ns, ok := v.([]interface{}); ok {
for _, item := range ns {
if nestedMap, ok := item.(map[string]interface{}); ok {
removeKeysRecursively(nestedMap, keysToRemove...)
}
}
}
}
}

func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentKinds(logger, KindReceiver)
}
Expand All @@ -241,6 +308,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error
return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor)
}

func (c *Config) ApplyDefaults(logger logr.Logger) error {
return c.applyDefaultForComponentKinds(logger, KindReceiver)
}

// GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled
// that provides the hinting for the liveness probe.
func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) {
Expand Down
46 changes: 30 additions & 16 deletions internal/components/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType])

type Settings[ComponentConfigType any] struct {
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
defaultRecAddr string
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
defaultsApplier Defaulter[ComponentConfigType]
}

func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] {
Expand Down Expand Up @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build
o.appProtocol = appProtocol
})
}
func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultRecAddr = defaultRecAddr
})
}
func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.targetPort = intstr.FromInt32(targetPort)
Expand Down Expand Up @@ -118,19 +125,26 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat
})
}

func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultsApplier = defaultsApplier
})
}

func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) {
o := NewEmptySettings[ComponentConfigType]()
o.Apply(b...)
if len(o.name) == 0 {
return nil, fmt.Errorf("invalid settings struct, no name specified")
}
return &GenericParser[ComponentConfigType]{
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
settings: o,
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
defaultsApplier: o.defaultsApplier,
settings: o,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config
// It's expected that type Config is the configuration used by a parser.
type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error)

// Defaulter is a function that applies given defaults to the passed Config.
// It's expected that type Config is the configuration used by a parser.
type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, port int32, config ComponentConfigType) (ComponentConfigType, error)
frzifus marked this conversation as resolved.
Show resolved Hide resolved

// ComponentType returns the type for a given component name.
// components have a name like:
// - mycomponent/custom
Expand Down Expand Up @@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) {
type ParserRetriever func(string) Parser

type Parser interface {
// GetDefaultConfig returns a config with set default values.
GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error)

// Ports returns the service ports parsed based on the component's configuration where name is the component's name
// of the form "name" or "type/name"
Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error)
Expand Down
29 changes: 23 additions & 6 deletions internal/components/generic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,29 @@ var (
// GenericParser serves as scaffolding for custom parsing logic by isolating
// functionality to idempotent functions.
type GenericParser[T any] struct {
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
defaultsApplier Defaulter[T]
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}

func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
if g.settings == nil || g.defaultsApplier == nil {
return config, nil
}

if g.settings.defaultRecAddr == "" {
return config, nil
}

var parsed T
if err := mapstructure.Decode(config, &parsed); err != nil {
return nil, err
}
return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.port, parsed)
frzifus marked this conversation as resolved.
Show resolved Hide resolved
}

func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
Expand Down
49 changes: 48 additions & 1 deletion internal/components/multi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type MultiPortOption func(parser *MultiPortReceiver)
type MultiPortReceiver struct {
name string

addrMappings map[string]string
portMappings map[string]*corev1.ServicePort
}

Expand Down Expand Up @@ -72,6 +73,37 @@ func (m *MultiPortReceiver) ParserName() string {
return fmt.Sprintf("__%s", m.name)
}

func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
frzifus marked this conversation as resolved.
Show resolved Hide resolved
multiProtoEndpointCfg := &MultiProtocolEndpointConfig{}
if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil {
return nil, err
}
tmp := make(map[string]*SingleEndpointConfig, len(multiProtoEndpointCfg.Protocols))
for protocol, ec := range multiProtoEndpointCfg.Protocols {
var port int32
if defaultSvc, ok := m.portMappings[protocol]; ok {
port = defaultSvc.Port
if ec != nil {
port = ec.GetPortNumOrDefault(logger, port)
}
}
var addr string
if defaultAddr, ok := m.addrMappings[protocol]; ok {
addr = defaultAddr
}
res, err := AddressDefaulter(logger, addr, port, ec)
if err != nil {
return nil, err
}
tmp[protocol] = res
}

for protocol, ec := range tmp {
multiProtoEndpointCfg.Protocols[protocol] = ec
}
return config, mapstructure.Decode(multiProtoEndpointCfg, &config)
frzifus marked this conversation as resolved.
Show resolved Hide resolved

}
func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
return nil, nil
}
Expand All @@ -91,7 +123,7 @@ func NewMultiPortReceiverBuilder(name string) MultiPortBuilder[*MultiProtocolEnd
}

func NewProtocolBuilder(name string, port int32) Builder[*MultiProtocolEndpointConfig] {
return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port)
return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port).WithDefaultsApplier(MultiAddressDefaulter)
}

func (mp MultiPortBuilder[ComponentConfigType]) AddPortMapping(builder Builder[ComponentConfigType]) MultiPortBuilder[ComponentConfigType] {
Expand All @@ -104,6 +136,7 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err
}
multiReceiver := &MultiPortReceiver{
name: mp[0].MustBuild().name,
addrMappings: map[string]string{},
portMappings: map[string]*corev1.ServicePort{},
}
for _, bu := range mp[1:] {
Expand All @@ -112,6 +145,9 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err
return nil, err
}
multiReceiver.portMappings[built.name] = built.settings.GetServicePort()
if built.settings != nil {
multiReceiver.addrMappings[built.name] = built.settings.defaultRecAddr
}
}
return multiReceiver, nil
}
Expand All @@ -123,3 +159,14 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver {
return p
}
}

func MultiAddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *MultiProtocolEndpointConfig) (*MultiProtocolEndpointConfig, error) {
for protocol, ec := range config.Protocols {
res, err := AddressDefaulter(logger, defaultRecAddr, port, ec)
if err != nil {
return nil, err
}
config.Protocols[protocol].Endpoint = res.Endpoint
}
return config, nil
}
2 changes: 2 additions & 0 deletions internal/components/receivers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ var (
components.NewMultiPortReceiverBuilder("otlp").
AddPortMapping(components.NewProtocolBuilder("grpc", 4317).
WithAppProtocol(&components.GrpcProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4317)).
AddPortMapping(components.NewProtocolBuilder("http", 4318).
WithAppProtocol(&components.HttpProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4318)).
MustBuild(),
components.NewMultiPortReceiverBuilder("skywalking").
Expand Down
Loading
Loading