Skip to content

Commit

Permalink
Use 0.0.0.0 as otlp receiver default address (#3325)
Browse files Browse the repository at this point in the history
* operator: expose receivers by default

Signed-off-by: Benedikt Bongartz <[email protected]>

* singleEndpointConfig: use omitempty

Signed-off-by: Benedikt Bongartz <[email protected]>

* components: apply MultiAddressDefaulter

Signed-off-by: Benedikt Bongartz <[email protected]>

* simplify some logic, add tests

* config merge: handle nil values

Signed-off-by: Benedikt Bongartz <[email protected]>

---------

Signed-off-by: Benedikt Bongartz <[email protected]>
Co-authored-by: Jacob Aronoff <[email protected]>
  • Loading branch information
frzifus and jaronoff97 authored Oct 7, 2024
1 parent e84193d commit 8eb13f1
Show file tree
Hide file tree
Showing 22 changed files with 536 additions and 46 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add_receiver_defaults.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: operator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use 0.0.0.0 as otlp receiver default address

# One or more tracking issues related to the change
issues: [3126]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion apis/v1beta1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error {
if len(otelcol.Spec.ManagementState) == 0 {
otelcol.Spec.ManagementState = ManagementStateManaged
}
return nil
return otelcol.Spec.Config.ApplyDefaults(c.logger)
}

func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
Expand Down
66 changes: 66 additions & 0 deletions apis/v1beta1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,72 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
expected v1beta1.OpenTelemetryCollector
shouldFailSar bool
}{
{
name: "update config defaults",
otelcol: v1beta1.OpenTelemetryCollector{
Spec: v1beta1.OpenTelemetryCollectorSpec{
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":null,"http":null}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
expected: v1beta1.OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
Spec: v1beta1.OpenTelemetryCollectorSpec{
OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{
Args: map[string]string{"feature-gates": "-component.UseLocalHostAsDefaultHost"},
ManagementState: v1beta1.ManagementStateManaged,
Replicas: &one,
},
Mode: v1beta1.ModeDeployment,
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
},
{
name: "update config defaults, leave other fields alone",
otelcol: v1beta1.OpenTelemetryCollector{
Spec: v1beta1.OpenTelemetryCollectorSpec{
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
expected: v1beta1.OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
Spec: v1beta1.OpenTelemetryCollectorSpec{
OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{
Args: map[string]string{"feature-gates": "-component.UseLocalHostAsDefaultHost"},
ManagementState: v1beta1.ManagementStateManaged,
Replicas: &one,
},
Mode: v1beta1.ModeDeployment,
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317","headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
},
{
name: "all fields default",
otelcol: v1beta1.OpenTelemetryCollector{},
Expand Down
50 changes: 50 additions & 0 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strconv"
"strings"

"dario.cat/mergo"
"github.com/go-logr/logr"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -225,6 +226,51 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
return ports, nil
}

// applyDefaultForComponentKinds applies defaults to the endpoints for the given ComponentKind(s).
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
enabledComponents := c.GetEnabledComponents()
for _, componentKind := range componentKinds {
var retriever components.ParserRetriever
var cfg AnyConfig
switch componentKind {
case KindReceiver:
retriever = receivers.ReceiverFor
cfg = c.Receivers
case KindExporter:
continue
case KindProcessor:
continue
case KindExtension:
continue
}
for componentName := range enabledComponents[componentKind] {
parser := retriever(componentName)
componentConf := cfg.Object[componentName]
newCfg, err := parser.GetDefaultConfig(logger, componentConf)
if err != nil {
return err
}

// We need to ensure we don't remove any fields in defaulting.
mappedCfg, ok := newCfg.(map[string]interface{})
if !ok || mappedCfg == nil {
logger.V(1).Info("returned default configuration invalid",
"warn", "could not apply component defaults",
"component", componentName,
)
continue
}

if err := mergo.Merge(&mappedCfg, componentConf); err != nil {
return err
}
cfg.Object[componentName] = mappedCfg
}
}

return nil
}

func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentKinds(logger, KindReceiver)
}
Expand All @@ -241,6 +287,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error
return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor)
}

func (c *Config) ApplyDefaults(logger logr.Logger) error {
return c.applyDefaultForComponentKinds(logger, KindReceiver)
}

// GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled
// that provides the hinting for the liveness probe.
func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) {
Expand Down
46 changes: 30 additions & 16 deletions internal/components/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType])

type Settings[ComponentConfigType any] struct {
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
defaultRecAddr string
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
defaultsApplier Defaulter[ComponentConfigType]
}

func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] {
Expand Down Expand Up @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build
o.appProtocol = appProtocol
})
}
func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultRecAddr = defaultRecAddr
})
}
func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.targetPort = intstr.FromInt32(targetPort)
Expand Down Expand Up @@ -118,19 +125,26 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat
})
}

func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultsApplier = defaultsApplier
})
}

func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) {
o := NewEmptySettings[ComponentConfigType]()
o.Apply(b...)
if len(o.name) == 0 {
return nil, fmt.Errorf("invalid settings struct, no name specified")
}
return &GenericParser[ComponentConfigType]{
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
settings: o,
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
defaultsApplier: o.defaultsApplier,
settings: o,
}, nil
}

Expand Down
10 changes: 10 additions & 0 deletions internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type PortRetriever interface {
GetPortNumOrDefault(logr.Logger, int32) int32
}

type AddressProvider = func(name string) (address string, port int32)

// PortParser is a function that returns a list of servicePorts given a config of type Config.
type PortParser[ComponentConfigType any] func(logger logr.Logger, name string, defaultPort *corev1.ServicePort, config ComponentConfigType) ([]corev1.ServicePort, error)

Expand All @@ -49,6 +51,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config
// It's expected that type Config is the configuration used by a parser.
type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error)

// Defaulter is a function that applies given defaults to the passed Config.
// It's expected that type Config is the configuration used by a parser.
type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultAddr string, defaultPort int32, config ComponentConfigType) (map[string]interface{}, error)

// ComponentType returns the type for a given component name.
// components have a name like:
// - mycomponent/custom
Expand Down Expand Up @@ -87,6 +93,10 @@ func PortFromEndpoint(endpoint string) (int32, error) {
type ParserRetriever func(string) Parser

type Parser interface {
// GetDefaultConfig returns a config with set default values.
// NOTE: Config merging must be done by the caller if desired.
GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error)

// Ports returns the service ports parsed based on the component's configuration where name is the component's name
// of the form "name" or "type/name"
Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error)
Expand Down
29 changes: 23 additions & 6 deletions internal/components/generic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,29 @@ var (
// GenericParser serves as scaffolding for custom parsing logic by isolating
// functionality to idempotent functions.
type GenericParser[T any] struct {
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
defaultsApplier Defaulter[T]
}

func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
if g.settings == nil || g.defaultsApplier == nil {
return config, nil
}

if g.settings.defaultRecAddr == "" {
return config, nil
}

var parsed T
if err := mapstructure.Decode(config, &parsed); err != nil {
return nil, err
}
return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.GetServicePort().Port, parsed)
}

func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
Expand Down
Loading

0 comments on commit 8eb13f1

Please sign in to comment.