From 02cb8827efbb22bd05bb7d04a7220ecf27581b46 Mon Sep 17 00:00:00 2001 From: Benedikt Bongartz Date: Fri, 4 Oct 2024 19:12:09 +0200 Subject: [PATCH] wip Signed-off-by: Benedikt Bongartz --- apis/v1beta1/collector_webhook.go | 2 +- apis/v1beta1/config.go | 38 ++++++++++++++++++++ internal/components/builder.go | 46 +++++++++++++++--------- internal/components/component.go | 7 ++++ internal/components/generic_parser.go | 46 ++++++++++++++++++++---- internal/components/multi_endpoint.go | 17 +++++++++ internal/components/receivers/helpers.go | 2 ++ 7 files changed, 135 insertions(+), 23 deletions(-) diff --git a/apis/v1beta1/collector_webhook.go b/apis/v1beta1/collector_webhook.go index b1f8ddd91e..45744ef3a5 100644 --- a/apis/v1beta1/collector_webhook.go +++ b/apis/v1beta1/collector_webhook.go @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error { if len(otelcol.Spec.ManagementState) == 0 { otelcol.Spec.ManagementState = ManagementStateManaged } - return nil + return otelcol.Spec.Config.ApplyDefaults(c.logger) } func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 0eb9af57e9..695ffb19f9 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -225,6 +225,40 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds .. return ports, nil } +// getPortsForComponentKinds gets the ports for the given ComponentKind(s). +func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error { + enabledComponents := c.GetEnabledComponents() + for _, componentKind := range componentKinds { + var retriever components.ParserRetriever + var cfg AnyConfig + switch componentKind { + case KindReceiver: + retriever = receivers.ReceiverFor + cfg = c.Receivers + case KindExporter: + continue + case KindProcessor: + continue + case KindExtension: + continue + } + for componentName := range enabledComponents[componentKind] { + parser := retriever(componentName) + if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil { + return err + } else { + cc, ok := newCfg.(map[string]interface{}) + if !ok { + return fmt.Errorf("could not apply defaults to receiver: %s", componentName) + } + cfg.Object = cc + } + } + } + + return nil +} + func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) { return c.getPortsForComponentKinds(logger, KindReceiver) } @@ -241,6 +275,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor) } +func (c *Config) ApplyDefaults(logger logr.Logger) error { + return c.applyDefaultForComponentKinds(logger, KindReceiver) +} + // GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled // that provides the hinting for the liveness probe. func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) { diff --git a/internal/components/builder.go b/internal/components/builder.go index be459cc513..7abc174703 100644 --- a/internal/components/builder.go +++ b/internal/components/builder.go @@ -26,16 +26,18 @@ import ( type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType]) type Settings[ComponentConfigType any] struct { - protocol corev1.Protocol - appProtocol *string - targetPort intstr.IntOrString - nodePort int32 - name string - port int32 - portParser PortParser[ComponentConfigType] - rbacGen RBACRuleGenerator[ComponentConfigType] - livenessGen ProbeGenerator[ComponentConfigType] - readinessGen ProbeGenerator[ComponentConfigType] + protocol corev1.Protocol + appProtocol *string + targetPort intstr.IntOrString + nodePort int32 + name string + port int32 + defaultRecAddr string + portParser PortParser[ComponentConfigType] + rbacGen RBACRuleGenerator[ComponentConfigType] + livenessGen ProbeGenerator[ComponentConfigType] + readinessGen ProbeGenerator[ComponentConfigType] + defaultsApplier Defaulter[ComponentConfigType] } func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] { @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build o.appProtocol = appProtocol }) } +func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.defaultRecAddr = defaultRecAddr + }) +} func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] { return append(b, func(o *Settings[ComponentConfigType]) { o.targetPort = intstr.FromInt32(targetPort) @@ -118,6 +125,12 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat }) } +func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.defaultsApplier = defaultsApplier + }) +} + func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) { o := NewEmptySettings[ComponentConfigType]() o.Apply(b...) @@ -125,12 +138,13 @@ func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigTyp return nil, fmt.Errorf("invalid settings struct, no name specified") } return &GenericParser[ComponentConfigType]{ - name: o.name, - portParser: o.portParser, - rbacGen: o.rbacGen, - livenessGen: o.livenessGen, - readinessGen: o.readinessGen, - settings: o, + name: o.name, + portParser: o.portParser, + rbacGen: o.rbacGen, + livenessGen: o.livenessGen, + readinessGen: o.readinessGen, + defaultsApplier: o.defaultsApplier, + settings: o, }, nil } diff --git a/internal/components/component.go b/internal/components/component.go index f97daba497..6cc59cd9c7 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config // It's expected that type Config is the configuration used by a parser. type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error) +// Defaulter is a function that applies given defaults to the passed Config. +// It's expected that type Config is the configuration used by a parser. +type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, config ComponentConfigType) (ComponentConfigType, error) + // ComponentType returns the type for a given component name. // components have a name like: // - mycomponent/custom @@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) { type ParserRetriever func(string) Parser type Parser interface { + // GetDefaultConfig .. TODO + GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) + // Ports returns the service ports parsed based on the component's configuration where name is the component's name // of the form "name" or "type/name" Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) diff --git a/internal/components/generic_parser.go b/internal/components/generic_parser.go index 6bead9442c..c9b51f4e43 100644 --- a/internal/components/generic_parser.go +++ b/internal/components/generic_parser.go @@ -16,6 +16,7 @@ package components import ( "fmt" + "strings" "github.com/go-logr/logr" "github.com/mitchellh/mapstructure" @@ -27,15 +28,48 @@ var ( _ Parser = &GenericParser[SingleEndpointConfig]{} ) +// AddressDefaulter ... +func AddressDefaulter(logger logr.Logger, defaultRecAddr string, config *SingleEndpointConfig) (*SingleEndpointConfig, error) { + if config.Endpoint == "" { + return config, nil + } + + v := strings.Split(config.Endpoint, ":") + if len(v) < 1 { + return config, nil + } + if v[0] == "" { + config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1]) + } + return config, nil +} + // GenericParser serves as scaffolding for custom parsing logic by isolating // functionality to idempotent functions. type GenericParser[T any] struct { - name string - settings *Settings[T] - portParser PortParser[T] - rbacGen RBACRuleGenerator[T] - livenessGen ProbeGenerator[T] - readinessGen ProbeGenerator[T] + name string + settings *Settings[T] + portParser PortParser[T] + rbacGen RBACRuleGenerator[T] + livenessGen ProbeGenerator[T] + readinessGen ProbeGenerator[T] + defaultsApplier Defaulter[T] +} + +func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) { + if g.settings == nil { + return config, nil + } + + if g.settings.defaultRecAddr == "" { + return config, nil + } + + var parsed T + if err := mapstructure.Decode(config, &parsed); err != nil { + return nil, err + } + return g.defaultsApplier(logger, g.settings.defaultRecAddr, parsed) } func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index 261700bb17..12a53ee1fe 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -72,6 +72,23 @@ func (m *MultiPortReceiver) ParserName() string { return fmt.Sprintf("__%s", m.name) } +func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) { + multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} + if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { + return nil, err + } + for protocol, ec := range multiProtoEndpointCfg.Protocols { + // TODO: Should we take default receiver address from settngs? + res, err := AddressDefaulter(logger, "0.0.0.0", ec) + if err != nil { + return nil, err + } + multiProtoEndpointCfg.Protocols[protocol].Endpoint = res.Endpoint + } + // Encode and return. + return config, mapstructure.Decode(multiProtoEndpointCfg, config) + +} func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) { return nil, nil } diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go index 89a3cb6fe7..8823101cae 100644 --- a/internal/components/receivers/helpers.go +++ b/internal/components/receivers/helpers.go @@ -52,9 +52,11 @@ var ( components.NewMultiPortReceiverBuilder("otlp"). AddPortMapping(components.NewProtocolBuilder("grpc", 4317). WithAppProtocol(&components.GrpcProtocol). + WithDefaultRecAddress("0.0.0.0"). WithTargetPort(4317)). AddPortMapping(components.NewProtocolBuilder("http", 4318). WithAppProtocol(&components.HttpProtocol). + WithDefaultRecAddress("0.0.0.0"). WithTargetPort(4318)). MustBuild(), components.NewMultiPortReceiverBuilder("skywalking").