Skip to content

Commit

Permalink
otelcolconvert: support converting k8sattributes processor (#6492)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis authored Feb 23, 2024
1 parent 9900801 commit e41c1f4
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 0 deletions.
149 changes: 149 additions & 0 deletions converter/internal/otelcolconvert/converter_k8sattributesprocessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor/k8sattributes"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, k8sAttributesProcessorConverter{})
}

type k8sAttributesProcessorConverter struct{}

func (k8sAttributesProcessorConverter) Factory() component.Factory {
return k8sattributesprocessor.NewFactory()
}

func (k8sAttributesProcessorConverter) InputComponentName() string {
return "otelcol.processor.k8sattributes"
}

func (k8sAttributesProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

args := toK8SAttributesProcessor(state, id, cfg.(*k8sattributesprocessor.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "processor", "k8sattributes"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toK8SAttributesProcessor(state *state, id component.InstanceID, cfg *k8sattributesprocessor.Config) *k8sattributes.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
nextTraces = state.Next(id, component.DataTypeTraces)
)

return &k8sattributes.Arguments{
AuthType: string(cfg.AuthType),
Passthrough: cfg.Passthrough,
ExtractConfig: k8sattributes.ExtractConfig{
Metadata: cfg.Extract.Metadata,
Annotations: toFilterExtract(cfg.Extract.Annotations),
Labels: toFilterExtract(cfg.Extract.Labels),
},
Filter: k8sattributes.FilterConfig{
Node: cfg.Filter.Node,
Namespace: cfg.Filter.Namespace,
Fields: toFilterFields(cfg.Filter.Fields),
Labels: toFilterFields(cfg.Filter.Labels),
},
PodAssociations: toPodAssociations(cfg.Association),
Exclude: toExclude(cfg.Exclude),

Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
},
}
}

func toExclude(cfg k8sattributesprocessor.ExcludeConfig) k8sattributes.ExcludeConfig {
res := k8sattributes.ExcludeConfig{
Pods: []k8sattributes.ExcludePodConfig{},
}

for _, c := range cfg.Pods {
res.Pods = append(res.Pods, k8sattributes.ExcludePodConfig{
Name: c.Name,
})
}

return res
}

func toPodAssociations(cfg []k8sattributesprocessor.PodAssociationConfig) []k8sattributes.PodAssociation {
if len(cfg) == 0 {
return nil
}

res := make([]k8sattributes.PodAssociation, 0, len(cfg))

for i, c := range cfg {
res = append(res, k8sattributes.PodAssociation{
Sources: []k8sattributes.PodAssociationSource{},
})

for _, c2 := range c.Sources {
res[i].Sources = append(res[i].Sources, k8sattributes.PodAssociationSource{
From: c2.From,
Name: c2.Name,
})
}
}

return res
}
func toFilterExtract(cfg []k8sattributesprocessor.FieldExtractConfig) []k8sattributes.FieldExtractConfig {
if len(cfg) == 0 {
return nil
}

res := make([]k8sattributes.FieldExtractConfig, 0, len(cfg))

for _, c := range cfg {
res = append(res, k8sattributes.FieldExtractConfig{
TagName: c.TagName,
Key: c.Key,
KeyRegex: c.KeyRegex,
Regex: c.Regex,
From: c.From,
})
}

return res
}

func toFilterFields(cfg []k8sattributesprocessor.FieldFilterConfig) []k8sattributes.FieldFilterConfig {
if len(cfg) == 0 {
return nil
}

res := make([]k8sattributes.FieldFilterConfig, 0, len(cfg))

for _, c := range cfg {
res = append(res, k8sattributes.FieldFilterConfig{
Key: c.Key,
Value: c.Value,
Op: c.Op,
})
}

return res
}
41 changes: 41 additions & 0 deletions converter/internal/otelcolconvert/testdata/k8sattributes.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
otelcol.receiver.otlp "default" {
grpc { }

http { }

output {
metrics = [otelcol.processor.k8sattributes.default.input]
logs = [otelcol.processor.k8sattributes.default.input]
traces = [otelcol.processor.k8sattributes.default.input]
}
}

otelcol.processor.k8sattributes "default" {
auth_type = "serviceAccount"

extract {
metadata = ["container.image.name", "container.image.tag", "k8s.deployment.name", "k8s.namespace.name", "k8s.node.name", "k8s.pod.name", "k8s.pod.start_time", "k8s.pod.uid"]
}

exclude {
pod {
name = "jaeger-agent"
}

pod {
name = "jaeger-collector"
}
}

output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}

otelcol.exporter.otlp "default" {
client {
endpoint = "database:4317"
}
}
33 changes: 33 additions & 0 deletions converter/internal/otelcolconvert/testdata/k8sattributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
receivers:
otlp:
protocols:
grpc:
http:

exporters:
otlp:
# Our defaults have drifted from upstream, so we explicitly set our
# defaults below (balancer_name and queue_size).
endpoint: database:4317
balancer_name: pick_first
sending_queue:
queue_size: 5000

processors:
k8sattributes:

service:
pipelines:
metrics:
receivers: [otlp]
processors: [k8sattributes]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [k8sattributes]
exporters: [otlp]
traces:
receivers: [otlp]
processors: [k8sattributes]
exporters: [otlp]

0 comments on commit e41c1f4

Please sign in to comment.