Skip to content

Commit

Permalink
otelcolconvert: support converting loadbalancingexporter (#6487)
Browse files Browse the repository at this point in the history
Closes #6429.
  • Loading branch information
rfratto authored Feb 22, 2024
1 parent 9868233 commit d2256f1
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 0 deletions.
124 changes: 124 additions & 0 deletions converter/internal/otelcolconvert/converter_loadbalancingexporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package otelcolconvert

import (
"fmt"

"github.com/alecthomas/units"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter/loadbalancing"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, loadbalancingExporterConverter{})
}

type loadbalancingExporterConverter struct{}

func (loadbalancingExporterConverter) Factory() component.Factory {
return loadbalancingexporter.NewFactory()
}

func (loadbalancingExporterConverter) InputComponentName() string {
return "otelcol.exporter.loadbalancing"
}

func (loadbalancingExporterConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

args := toLoadbalancingExporter(cfg.(*loadbalancingexporter.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "loadbalancing"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toLoadbalancingExporter(cfg *loadbalancingexporter.Config) *loadbalancing.Arguments {
return &loadbalancing.Arguments{
Protocol: toProtocol(cfg.Protocol),
Resolver: toResolver(cfg.Resolver),
RoutingKey: cfg.RoutingKey,

DebugMetrics: common.DefaultValue[loadbalancing.Arguments]().DebugMetrics,
}
}

func toProtocol(cfg loadbalancingexporter.Protocol) loadbalancing.Protocol {
return loadbalancing.Protocol{
// NOTE(rfratto): this has a lot of overlap with converting the
// otlpexporter, but otelcol.exporter.loadbalancing uses custom types to
// remove unwanted fields.
OTLP: loadbalancing.OtlpConfig{
Timeout: cfg.OTLP.Timeout,
Queue: toQueueArguments(cfg.OTLP.QueueSettings),
Retry: toRetryArguments(cfg.OTLP.RetrySettings),
Client: loadbalancing.GRPCClientArguments{
Compression: otelcol.CompressionType(cfg.OTLP.Compression),

TLS: toTLSClientArguments(cfg.OTLP.TLSSetting),
Keepalive: toKeepaliveClientArguments(cfg.OTLP.Keepalive),

ReadBufferSize: units.Base2Bytes(cfg.OTLP.ReadBufferSize),
WriteBufferSize: units.Base2Bytes(cfg.OTLP.WriteBufferSize),
WaitForReady: cfg.OTLP.WaitForReady,
Headers: toHeadersMap(cfg.OTLP.Headers),
BalancerName: cfg.OTLP.BalancerName,
Authority: cfg.OTLP.Authority,

// TODO(rfratto): handle auth
},
},
}
}

func toResolver(cfg loadbalancingexporter.ResolverSettings) loadbalancing.ResolverSettings {
return loadbalancing.ResolverSettings{
Static: toStaticResolver(cfg.Static),
DNS: toDNSResolver(cfg.DNS),
Kubernetes: toKubernetesResolver(cfg.K8sSvc),
}
}

func toStaticResolver(cfg *loadbalancingexporter.StaticResolver) *loadbalancing.StaticResolver {
if cfg == nil {
return nil
}

return &loadbalancing.StaticResolver{
Hostnames: cfg.Hostnames,
}
}

func toDNSResolver(cfg *loadbalancingexporter.DNSResolver) *loadbalancing.DNSResolver {
if cfg == nil {
return nil
}

return &loadbalancing.DNSResolver{
Hostname: cfg.Hostname,
Port: cfg.Port,
Interval: cfg.Interval,
Timeout: cfg.Timeout,
}
}

func toKubernetesResolver(cfg *loadbalancingexporter.K8sSvcResolver) *loadbalancing.KubernetesResolver {
if cfg == nil {
return nil
}

return &loadbalancing.KubernetesResolver{
Service: cfg.Service,
Ports: cfg.Ports,
}
}
24 changes: 24 additions & 0 deletions converter/internal/otelcolconvert/testdata/loadbalancing.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
otelcol.receiver.otlp "default" {
grpc { }

output {
metrics = [otelcol.exporter.loadbalancing.default.input]
logs = [otelcol.exporter.loadbalancing.default.input]
traces = [otelcol.exporter.loadbalancing.default.input]
}
}

otelcol.exporter.loadbalancing "default" {
protocol {
otlp {
client { }
}
}

resolver {
static {
hostnames = ["backend-1:4317", "backend-2:4317", "backend-3:4317"]
}
}
routing_key = "service"
}
34 changes: 34 additions & 0 deletions converter/internal/otelcolconvert/testdata/loadbalancing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
receivers:
otlp:
protocols:
grpc:

exporters:
loadbalancing:
routing_key: "service"
protocol:
otlp:
balancer_name: pick_first
sending_queue:
queue_size: 5000
resolver:
static:
hostnames:
- backend-1:4317
- backend-2:4317
- backend-3:4317

service:
pipelines:
metrics:
receivers: [otlp]
processors: []
exporters: [loadbalancing]
logs:
receivers: [otlp]
processors: []
exporters: [loadbalancing]
traces:
receivers: [otlp]
processors: []
exporters: [loadbalancing]

0 comments on commit d2256f1

Please sign in to comment.