From e41c1f47fd31eb4d60bfe768b3cd5eea72d9bd1f Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Fri, 23 Feb 2024 16:17:36 +0200 Subject: [PATCH] otelcolconvert: support converting k8sattributes processor (#6492) Signed-off-by: Paschalis Tsilias --- .../converter_k8sattributesprocessor.go | 149 ++++++++++++++++++ .../testdata/k8sattributes.river | 41 +++++ .../testdata/k8sattributes.yaml | 33 ++++ 3 files changed, 223 insertions(+) create mode 100644 converter/internal/otelcolconvert/converter_k8sattributesprocessor.go create mode 100644 converter/internal/otelcolconvert/testdata/k8sattributes.river create mode 100644 converter/internal/otelcolconvert/testdata/k8sattributes.yaml diff --git a/converter/internal/otelcolconvert/converter_k8sattributesprocessor.go b/converter/internal/otelcolconvert/converter_k8sattributesprocessor.go new file mode 100644 index 000000000000..4cf3ffba6d0e --- /dev/null +++ b/converter/internal/otelcolconvert/converter_k8sattributesprocessor.go @@ -0,0 +1,149 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/processor/k8sattributes" + "github.com/grafana/agent/converter/diag" + "github.com/grafana/agent/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, k8sAttributesProcessorConverter{}) +} + +type k8sAttributesProcessorConverter struct{} + +func (k8sAttributesProcessorConverter) Factory() component.Factory { + return k8sattributesprocessor.NewFactory() +} + +func (k8sAttributesProcessorConverter) InputComponentName() string { + return "otelcol.processor.k8sattributes" +} + +func (k8sAttributesProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.FlowComponentLabel() + + args := toK8SAttributesProcessor(state, id, cfg.(*k8sattributesprocessor.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "processor", "k8sattributes"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toK8SAttributesProcessor(state *state, id component.InstanceID, cfg *k8sattributesprocessor.Config) *k8sattributes.Arguments { + var ( + nextMetrics = state.Next(id, component.DataTypeMetrics) + nextLogs = state.Next(id, component.DataTypeLogs) + nextTraces = state.Next(id, component.DataTypeTraces) + ) + + return &k8sattributes.Arguments{ + AuthType: string(cfg.AuthType), + Passthrough: cfg.Passthrough, + ExtractConfig: k8sattributes.ExtractConfig{ + Metadata: cfg.Extract.Metadata, + Annotations: toFilterExtract(cfg.Extract.Annotations), + Labels: toFilterExtract(cfg.Extract.Labels), + }, + Filter: k8sattributes.FilterConfig{ + Node: cfg.Filter.Node, + Namespace: cfg.Filter.Namespace, + Fields: toFilterFields(cfg.Filter.Fields), + Labels: toFilterFields(cfg.Filter.Labels), + }, + PodAssociations: toPodAssociations(cfg.Association), + Exclude: toExclude(cfg.Exclude), + + Output: &otelcol.ConsumerArguments{ + Metrics: toTokenizedConsumers(nextMetrics), + Logs: toTokenizedConsumers(nextLogs), + Traces: toTokenizedConsumers(nextTraces), + }, + } +} + +func toExclude(cfg k8sattributesprocessor.ExcludeConfig) k8sattributes.ExcludeConfig { + res := k8sattributes.ExcludeConfig{ + Pods: []k8sattributes.ExcludePodConfig{}, + } + + for _, c := range cfg.Pods { + res.Pods = append(res.Pods, k8sattributes.ExcludePodConfig{ + Name: c.Name, + }) + } + + return res +} + +func toPodAssociations(cfg []k8sattributesprocessor.PodAssociationConfig) []k8sattributes.PodAssociation { + if len(cfg) == 0 { + return nil + } + + res := make([]k8sattributes.PodAssociation, 0, len(cfg)) + + for i, c := range cfg { + res = append(res, k8sattributes.PodAssociation{ + Sources: []k8sattributes.PodAssociationSource{}, + }) + + for _, c2 := range c.Sources { + res[i].Sources = append(res[i].Sources, k8sattributes.PodAssociationSource{ + From: c2.From, + Name: c2.Name, + }) + } + } + + return res +} +func toFilterExtract(cfg []k8sattributesprocessor.FieldExtractConfig) []k8sattributes.FieldExtractConfig { + if len(cfg) == 0 { + return nil + } + + res := make([]k8sattributes.FieldExtractConfig, 0, len(cfg)) + + for _, c := range cfg { + res = append(res, k8sattributes.FieldExtractConfig{ + TagName: c.TagName, + Key: c.Key, + KeyRegex: c.KeyRegex, + Regex: c.Regex, + From: c.From, + }) + } + + return res +} + +func toFilterFields(cfg []k8sattributesprocessor.FieldFilterConfig) []k8sattributes.FieldFilterConfig { + if len(cfg) == 0 { + return nil + } + + res := make([]k8sattributes.FieldFilterConfig, 0, len(cfg)) + + for _, c := range cfg { + res = append(res, k8sattributes.FieldFilterConfig{ + Key: c.Key, + Value: c.Value, + Op: c.Op, + }) + } + + return res +} diff --git a/converter/internal/otelcolconvert/testdata/k8sattributes.river b/converter/internal/otelcolconvert/testdata/k8sattributes.river new file mode 100644 index 000000000000..f2819753ce36 --- /dev/null +++ b/converter/internal/otelcolconvert/testdata/k8sattributes.river @@ -0,0 +1,41 @@ +otelcol.receiver.otlp "default" { + grpc { } + + http { } + + output { + metrics = [otelcol.processor.k8sattributes.default.input] + logs = [otelcol.processor.k8sattributes.default.input] + traces = [otelcol.processor.k8sattributes.default.input] + } +} + +otelcol.processor.k8sattributes "default" { + auth_type = "serviceAccount" + + extract { + metadata = ["container.image.name", "container.image.tag", "k8s.deployment.name", "k8s.namespace.name", "k8s.node.name", "k8s.pod.name", "k8s.pod.start_time", "k8s.pod.uid"] + } + + exclude { + pod { + name = "jaeger-agent" + } + + pod { + name = "jaeger-collector" + } + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/converter/internal/otelcolconvert/testdata/k8sattributes.yaml b/converter/internal/otelcolconvert/testdata/k8sattributes.yaml new file mode 100644 index 000000000000..dfeee2cebcdd --- /dev/null +++ b/converter/internal/otelcolconvert/testdata/k8sattributes.yaml @@ -0,0 +1,33 @@ +receivers: + otlp: + protocols: + grpc: + http: + +exporters: + otlp: + # Our defaults have drifted from upstream, so we explicitly set our + # defaults below (balancer_name and queue_size). + endpoint: database:4317 + balancer_name: pick_first + sending_queue: + queue_size: 5000 + +processors: + k8sattributes: + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [k8sattributes] + exporters: [otlp] + logs: + receivers: [otlp] + processors: [k8sattributes] + exporters: [otlp] + traces: + receivers: [otlp] + processors: [k8sattributes] + exporters: [otlp] +