From b5ec8f05f47edce0acbc7cdc02dee8ed514eb4e4 Mon Sep 17 00:00:00 2001 From: Benedikt Bongartz Date: Fri, 4 Oct 2024 19:12:09 +0200 Subject: [PATCH] operator: expose receivers by default Signed-off-by: Benedikt Bongartz --- apis/v1beta1/collector_webhook.go | 2 +- apis/v1beta1/collector_webhook_test.go | 33 +++++++++++ apis/v1beta1/config.go | 71 ++++++++++++++++++++++++ internal/components/builder.go | 46 +++++++++------ internal/components/component.go | 7 +++ internal/components/generic_parser.go | 29 ++++++++-- internal/components/multi_endpoint.go | 51 ++++++++++++++++- internal/components/receivers/helpers.go | 2 + internal/components/single_endpoint.go | 28 +++++++++- 9 files changed, 243 insertions(+), 26 deletions(-) diff --git a/apis/v1beta1/collector_webhook.go b/apis/v1beta1/collector_webhook.go index b1f8ddd91e..45744ef3a5 100644 --- a/apis/v1beta1/collector_webhook.go +++ b/apis/v1beta1/collector_webhook.go @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error { if len(otelcol.Spec.ManagementState) == 0 { otelcol.Spec.ManagementState = ManagementStateManaged } - return nil + return otelcol.Spec.Config.ApplyDefaults(c.logger) } func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { diff --git a/apis/v1beta1/collector_webhook_test.go b/apis/v1beta1/collector_webhook_test.go index 9ce9cd3c90..588a6cd3b8 100644 --- a/apis/v1beta1/collector_webhook_test.go +++ b/apis/v1beta1/collector_webhook_test.go @@ -144,6 +144,39 @@ func TestCollectorDefaultingWebhook(t *testing.T) { expected v1beta1.OpenTelemetryCollector shouldFailSar bool }{ + { + name: "update config defaults", + otelcol: v1beta1.OpenTelemetryCollector{ + Spec: v1beta1.OpenTelemetryCollectorSpec{ + Config: func() v1beta1.Config { + const input = `{"receivers":{"otlp":{"protocols":{"grpc":null,"http":null}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}` + var cfg v1beta1.Config + require.NoError(t, yaml.Unmarshal([]byte(input), &cfg)) + return cfg + }(), + }, + }, + expected: v1beta1.OpenTelemetryCollector{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + Spec: v1beta1.OpenTelemetryCollectorSpec{ + OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{ + Args: map[string]string{"feature-gates": "-component.UseLocalHostAsDefaultHost"}, + ManagementState: v1beta1.ManagementStateManaged, + Replicas: &one, + }, + Mode: v1beta1.ModeDeployment, + UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic, + Config: func() v1beta1.Config { + const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}` + var cfg v1beta1.Config + require.NoError(t, yaml.Unmarshal([]byte(input), &cfg)) + return cfg + }(), + }, + }, + }, { name: "all fields default", otelcol: v1beta1.OpenTelemetryCollector{}, diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 0eb9af57e9..d8ad760691 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "reflect" + "slices" "sort" "strconv" "strings" @@ -225,6 +226,72 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds .. return ports, nil } +// getPortsForComponentKinds gets the ports for the given ComponentKind(s). +func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error { + enabledComponents := c.GetEnabledComponents() + for _, componentKind := range componentKinds { + var retriever components.ParserRetriever + var cfg AnyConfig + switch componentKind { + case KindReceiver: + retriever = receivers.ReceiverFor + cfg = c.Receivers + case KindExporter: + continue + case KindProcessor: + continue + case KindExtension: + continue + } + for componentName := range enabledComponents[componentKind] { + parser := retriever(componentName) + if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil { + return err + } else { + // NOTE: The internally used parser returns components.SingleEndpointConfig + // as a map value. The following lines normalize this value.. + got, err := yaml.Marshal(newCfg) + if err != nil { + return err + } + out := make(map[string]interface{}, 0) + if err := yaml.Unmarshal(got, out); err != nil { + return err + } + // NOTE: The underlying struct compoents.SingleEndpointConfig adds this listenaddress + // field. It is marshaled due to internal use. To avoid adding invalid fields to the + // collector config, this temporary workaround removes this field. + // TODO: Try to get rid of it or move it into the parser.GetDefaultConfig method. + removeKeysRecursively(out, "listenaddress") + cfg.Object[componentName] = out + } + } + } + + return nil +} + +func removeKeysRecursively(m map[string]interface{}, keysToRemove ...string) { + for k, v := range m { + if slices.Contains(keysToRemove, k) { + delete(m, k) + continue + } + + if nm, ok := v.(map[string]interface{}); ok { + removeKeysRecursively(nm, keysToRemove...) + } + + if ns, ok := v.([]interface{}); ok { + for _, item := range ns { + if nestedMap, ok := item.(map[string]interface{}); ok { + removeKeysRecursively(nestedMap, keysToRemove...) + } + } + } + } +} + func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) { return c.getPortsForComponentKinds(logger, KindReceiver) } @@ -241,6 +308,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor) } +func (c *Config) ApplyDefaults(logger logr.Logger) error { + return c.applyDefaultForComponentKinds(logger, KindReceiver) +} + // GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled // that provides the hinting for the liveness probe. func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) { diff --git a/internal/components/builder.go b/internal/components/builder.go index be459cc513..7abc174703 100644 --- a/internal/components/builder.go +++ b/internal/components/builder.go @@ -26,16 +26,18 @@ import ( type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType]) type Settings[ComponentConfigType any] struct { - protocol corev1.Protocol - appProtocol *string - targetPort intstr.IntOrString - nodePort int32 - name string - port int32 - portParser PortParser[ComponentConfigType] - rbacGen RBACRuleGenerator[ComponentConfigType] - livenessGen ProbeGenerator[ComponentConfigType] - readinessGen ProbeGenerator[ComponentConfigType] + protocol corev1.Protocol + appProtocol *string + targetPort intstr.IntOrString + nodePort int32 + name string + port int32 + defaultRecAddr string + portParser PortParser[ComponentConfigType] + rbacGen RBACRuleGenerator[ComponentConfigType] + livenessGen ProbeGenerator[ComponentConfigType] + readinessGen ProbeGenerator[ComponentConfigType] + defaultsApplier Defaulter[ComponentConfigType] } func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] { @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build o.appProtocol = appProtocol }) } +func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.defaultRecAddr = defaultRecAddr + }) +} func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] { return append(b, func(o *Settings[ComponentConfigType]) { o.targetPort = intstr.FromInt32(targetPort) @@ -118,6 +125,12 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat }) } +func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.defaultsApplier = defaultsApplier + }) +} + func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) { o := NewEmptySettings[ComponentConfigType]() o.Apply(b...) @@ -125,12 +138,13 @@ func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigTyp return nil, fmt.Errorf("invalid settings struct, no name specified") } return &GenericParser[ComponentConfigType]{ - name: o.name, - portParser: o.portParser, - rbacGen: o.rbacGen, - livenessGen: o.livenessGen, - readinessGen: o.readinessGen, - settings: o, + name: o.name, + portParser: o.portParser, + rbacGen: o.rbacGen, + livenessGen: o.livenessGen, + readinessGen: o.readinessGen, + defaultsApplier: o.defaultsApplier, + settings: o, }, nil } diff --git a/internal/components/component.go b/internal/components/component.go index f97daba497..5b62a47407 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config // It's expected that type Config is the configuration used by a parser. type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error) +// Defaulter is a function that applies given defaults to the passed Config. +// It's expected that type Config is the configuration used by a parser. +type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, port int32, config ComponentConfigType) (ComponentConfigType, error) + // ComponentType returns the type for a given component name. // components have a name like: // - mycomponent/custom @@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) { type ParserRetriever func(string) Parser type Parser interface { + // GetDefaultConfig .. TODO + GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) + // Ports returns the service ports parsed based on the component's configuration where name is the component's name // of the form "name" or "type/name" Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) diff --git a/internal/components/generic_parser.go b/internal/components/generic_parser.go index 6bead9442c..58a5c5f9a5 100644 --- a/internal/components/generic_parser.go +++ b/internal/components/generic_parser.go @@ -30,12 +30,29 @@ var ( // GenericParser serves as scaffolding for custom parsing logic by isolating // functionality to idempotent functions. type GenericParser[T any] struct { - name string - settings *Settings[T] - portParser PortParser[T] - rbacGen RBACRuleGenerator[T] - livenessGen ProbeGenerator[T] - readinessGen ProbeGenerator[T] + name string + settings *Settings[T] + portParser PortParser[T] + rbacGen RBACRuleGenerator[T] + livenessGen ProbeGenerator[T] + readinessGen ProbeGenerator[T] + defaultsApplier Defaulter[T] +} + +func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) { + if g.settings == nil || g.defaultsApplier == nil { + return config, nil + } + + if g.settings.defaultRecAddr == "" { + return config, nil + } + + var parsed T + if err := mapstructure.Decode(config, &parsed); err != nil { + return nil, err + } + return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.port, parsed) } func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index 261700bb17..a62c09bcd9 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -40,6 +40,7 @@ type MultiPortOption func(parser *MultiPortReceiver) type MultiPortReceiver struct { name string + addrMappings map[string]string portMappings map[string]*corev1.ServicePort } @@ -72,6 +73,37 @@ func (m *MultiPortReceiver) ParserName() string { return fmt.Sprintf("__%s", m.name) } +func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) { + multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} + if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { + return nil, err + } + tmp := make(map[string]*SingleEndpointConfig, len(multiProtoEndpointCfg.Protocols)) + for protocol, ec := range multiProtoEndpointCfg.Protocols { + var port int32 + if defaultSvc, ok := m.portMappings[protocol]; ok { + port = defaultSvc.Port + if ec != nil { + port = ec.GetPortNumOrDefault(logger, port) + } + } + var addr string + if defaultAddr, ok := m.addrMappings[protocol]; ok { + addr = defaultAddr + } + res, err := AddressDefaulter(logger, addr, port, ec) + if err != nil { + return nil, err + } + tmp[protocol] = res + } + + for protocol, ec := range tmp { + multiProtoEndpointCfg.Protocols[protocol] = ec + } + return config, mapstructure.Decode(multiProtoEndpointCfg, &config) + +} func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { return nil, nil } @@ -91,7 +123,7 @@ func NewMultiPortReceiverBuilder(name string) MultiPortBuilder[*MultiProtocolEnd } func NewProtocolBuilder(name string, port int32) Builder[*MultiProtocolEndpointConfig] { - return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port) + return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port).WithDefaultsApplier(MultiAddressDefaulter) } func (mp MultiPortBuilder[ComponentConfigType]) AddPortMapping(builder Builder[ComponentConfigType]) MultiPortBuilder[ComponentConfigType] { @@ -104,6 +136,7 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err } multiReceiver := &MultiPortReceiver{ name: mp[0].MustBuild().name, + addrMappings: map[string]string{}, portMappings: map[string]*corev1.ServicePort{}, } for _, bu := range mp[1:] { @@ -112,6 +145,9 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err return nil, err } multiReceiver.portMappings[built.name] = built.settings.GetServicePort() + if built.settings != nil { + multiReceiver.addrMappings[built.name] = built.settings.defaultRecAddr + } } return multiReceiver, nil } @@ -123,3 +159,16 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver { return p } } + +// MultiAddressDefaulter ... +func MultiAddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *MultiProtocolEndpointConfig) (*MultiProtocolEndpointConfig, error) { + for protocol, ec := range config.Protocols { + res, err := AddressDefaulter(logger, defaultRecAddr, port, ec) + if err != nil { + return nil, err + } + // TODO: can it be nil? + config.Protocols[protocol].Endpoint = res.Endpoint + } + return config, nil +} diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go index 89a3cb6fe7..8823101cae 100644 --- a/internal/components/receivers/helpers.go +++ b/internal/components/receivers/helpers.go @@ -52,9 +52,11 @@ var ( components.NewMultiPortReceiverBuilder("otlp"). AddPortMapping(components.NewProtocolBuilder("grpc", 4317). WithAppProtocol(&components.GrpcProtocol). + WithDefaultRecAddress("0.0.0.0"). WithTargetPort(4317)). AddPortMapping(components.NewProtocolBuilder("http", 4318). WithAppProtocol(&components.HttpProtocol). + WithDefaultRecAddress("0.0.0.0"). WithTargetPort(4318)). MustBuild(), components.NewMultiPortReceiverBuilder("skywalking"). diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index 914136b568..8b5ee46cb5 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -15,6 +15,9 @@ package components import ( + "fmt" + "strings" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -80,9 +83,30 @@ func internalParseSingleEndpoint(logger logr.Logger, name string, failSilently b } func NewSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] { - return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint) + return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint).WithDefaultsApplier(AddressDefaulter) } func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] { - return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent) + return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter) +} + +// AddressDefaulter ... +func AddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *SingleEndpointConfig) (*SingleEndpointConfig, error) { + if config == nil { + config = &SingleEndpointConfig{} + } + if config.Endpoint == "" { + config.Endpoint = fmt.Sprintf("%s:%d", defaultRecAddr, port) + return config, nil + } + + v := strings.Split(config.Endpoint, ":") + if len(v) < 2 { + return config, nil + } + + if v[0] == "" { + config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1]) + } + return config, nil }