Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelcolconvert: support converting loki exporter #6505

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions converter/internal/otelcolconvert/converter_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/river/token"
"github.com/grafana/river/token/builder"
Expand Down Expand Up @@ -53,3 +54,30 @@ func encodeMapstruct(v any) map[string]any {
}
return res
}

type tokenizedLogsReceiver struct {
loki.LogsReceiver

Expr string // Expr is the string to return during tokenization.
}

func (tl tokenizedLogsReceiver) RiverCapsule() {}

func (tl tokenizedLogsReceiver) RiverTokenize() []builder.Token {
return []builder.Token{{
Tok: token.STRING,
Lit: tl.Expr,
}}
}

func toTokenizedLogsReceivers(components []componentID) []loki.LogsReceiver {
res := make([]loki.LogsReceiver, 0, len(components))

for _, component := range components {
res = append(res, tokenizedLogsReceiver{
Expr: fmt.Sprintf("%s.%s.receiver", strings.Join(component.Name, "."), component.Label),
})
}

return res
}
126 changes: 126 additions & 0 deletions converter/internal/otelcolconvert/converter_lokiexporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package otelcolconvert

import (
"fmt"
"reflect"
"strings"

"github.com/grafana/agent/component/common/config"
"github.com/grafana/agent/component/loki/write"
"github.com/grafana/agent/component/otelcol/exporter/loki"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
"github.com/grafana/river/rivertypes"
pconfig "github.com/prometheus/common/config"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtls"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter"
)

func init() {
converters = append(converters, lokiExporterConverter{})
}

type lokiExporterConverter struct{}

func (lokiExporterConverter) Factory() component.Factory {
return lokiexporter.NewFactory()
}

func (lokiExporterConverter) InputComponentName() string { return "otelcol.exporter.loki" }

func (lokiExporterConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

lokiWriteComponentID := []componentID{{
Name: strings.Split("loki.write", "."),
Label: label,
}}

args1 := toOtelcolExporterLoki(lokiWriteComponentID)
block1 := common.NewBlockWithOverride([]string{"otelcol", "exporter", "loki"}, label, args1)
Comment on lines +43 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: can we give thiese more descriptive names than args1/block1/args2/block2 please? 😛


diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block1)),
)

args2, err := toLokiWrite(label, cfg.(*lokiexporter.Config))
if err != nil {
diags.Add(
diag.SeverityLevelError,
fmt.Sprintf("could not build loki.write block: %s", err),
)
}
block2 := common.NewBlockWithOverride([]string{"loki", "write"}, label, args2)
Comment on lines +52 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure common.NewBlockWithOverride will panic if err != nil here, since args2 == nil if err != nil.


diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block2)),
)
diags.Add(
diag.SeverityLevelInfo,
"Created a loki.write block with a best-effort conversion of the lokiexporter's confighttp, retry and queue configuration settings. You may want to double check the converted configuration as most fields do not have a 1:1 match",
)

state.Body().AppendBlock(block1)
state.Body().AppendBlock(block2)
return diags
}

func toOtelcolExporterLoki(ids []componentID) *loki.Arguments {
return &loki.Arguments{
ForwardTo: toTokenizedLogsReceivers(ids),
}
}

func toLokiWrite(name string, cfg *lokiexporter.Config) (*write.Arguments, error) {
// Defaults for MaxStreams and WAL should be handled on the Flow side.
res := &write.Arguments{}

if cfg.Endpoint != "" {
// TODO(@tpaschalis) Wire in auth from auth extension.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Just a note)

😬 Handling auth here will be tricky, because we can't pass otelcol.auth.basic to loki.write; we'll have to actually probe the auth extension that lokiexporter was using and copy settings from the definition of the auth extension into loki.write.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a great way to do this yet.

e := write.GetDefaultEndpointOptions()
e.Name = name
e.URL = cfg.Endpoint

e.RemoteTimeout = cfg.Timeout
e.TenantID = string(cfg.Headers["X-Scope-OrgID"])
if !reflect.DeepEqual(cfg.TLSSetting, configtls.TLSClientSetting{}) {
minv, ok := pconfig.TLSVersions[cfg.TLSSetting.MinVersion]
if !ok {
return nil, fmt.Errorf("invalid min tls version provided: %s", cfg.TLSSetting.MinVersion)
}
e.HTTPClientConfig.TLSConfig.CA = string(cfg.TLSSetting.CAPem)
e.HTTPClientConfig.TLSConfig.CAFile = cfg.TLSSetting.CAFile
e.HTTPClientConfig.TLSConfig.Cert = string(cfg.TLSSetting.CertPem)
e.HTTPClientConfig.TLSConfig.CertFile = cfg.TLSSetting.CertFile
e.HTTPClientConfig.TLSConfig.Key = rivertypes.Secret(cfg.TLSSetting.KeyPem)
e.HTTPClientConfig.TLSConfig.KeyFile = cfg.TLSSetting.KeyFile
e.HTTPClientConfig.TLSConfig.ServerName = cfg.TLSSetting.ServerName
e.HTTPClientConfig.TLSConfig.InsecureSkipVerify = cfg.TLSSetting.InsecureSkipVerify
e.HTTPClientConfig.TLSConfig.MinVersion = config.TLSVersion(minv)
}

e.MaxBackoff = cfg.RetrySettings.MaxInterval
e.MinBackoff = cfg.RetrySettings.InitialInterval

headers := toHeadersMap(cfg.Headers)
if len(headers) > 0 {
e.Headers = headers
}
tenant, ok := headers["X-Scope-OrgID"]
if ok {
e.TenantID = tenant
}

// After trying to translate all the OTel HTTP Client options onto the
// loki.write component, append it as an endpoint.
res.Endpoints = append(res.Endpoints, e)
}

return res, nil
}
28 changes: 28 additions & 0 deletions converter/internal/otelcolconvert/testdata/lokiexporter.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
otelcol.receiver.otlp "default" {
grpc { }

http { }

output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.loki.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}

otelcol.exporter.otlp "default" {
client {
endpoint = "database:4317"
}
}

otelcol.exporter.loki "default" {
forward_to = [loki.write.default.receiver]
}

loki.write "default" {
endpoint {
name = "default"
url = "https://loki.example.com:3100/loki/api/v1/push"
}
}
38 changes: 38 additions & 0 deletions converter/internal/otelcolconvert/testdata/lokiexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
receivers:
otlp:
protocols:
grpc:
http:

exporters:
loki:
endpoint: https://loki.example.com:3100/loki/api/v1/push
# Our loki.write component uses some different defaults
timeout: "10s"
retry_on_failure:
initial_interval: "500ms"
max_interval: "5m"

otlp:
# Our defaults have drifted from upstream, so we explicitly set our
# defaults below (balancer_name and queue_size).
endpoint: database:4317
balancer_name: pick_first
sending_queue:
queue_size: 5000

service:
pipelines:
traces:
receivers: [otlp]
processors: []
exporters: [otlp]
metrics:
receivers: [otlp]
processors: []
exporters: [otlp]
logs:
receivers: [otlp]
processors: []
exporters: [loki]

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.87.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.87.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter v0.87.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter v0.87.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.87.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.87.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.87.0
Expand Down Expand Up @@ -320,7 +321,7 @@ require (
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dennwc/btrfs v0.0.0-20230312211831-a1f570bd01a1 // indirect
github.com/dennwc/ioctl v1.0.0 // indirect
github.com/dennwc/varint v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.87.0/go.mod h1:WKjmyVi+Xhhvuvj2J+1Z0fXvY38MKRbREe2aR5UPOIw=
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter v0.87.0 h1:+apdZt5DPPIxjBrayu1muKbvUK3zqsfgb+3fMh6Hnyo=
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter v0.87.0/go.mod h1:JXVmcuySy3xyo3JjoU+CrNWy/C12Fw6JB1HWXf26HwQ=
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter v0.87.0 h1:Ftwnj5cMgoz1gJZZA02mIdCieDVNTJ2IG7stygy86X8=
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter v0.87.0/go.mod h1:VApNeWq4DLUuVSotuVV8NCSpGdyVnNRDfVIGN2Le/X8=
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.87.0 h1:5LmBAlLycadwA3AHI2rqPuDjx1HFb/PSn3946Eyp3Jw=
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.87.0/go.mod h1:lgOFfu/GLf6LbvZwlumkUv3iBLqRdtBentZKcrrqb3Y=
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.87.0 h1:52+RVfmzj+JePVJuD07gfppdzF9fsKASIRGzTC05QIg=
Expand Down
Loading