Skip to content

Commit

Permalink
Fix issue with metrics exporter in otelcol.exporter.loadbalancing (#5684
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ptodev authored Nov 15, 2023
1 parent b76dc33 commit a2817f5
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 22 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ Main (unreleased)
- Fixed an issue in the static config converter where exporter instance values
were not being mapped when translating to flow. (@erikbaranowski)

- Fix a bug which prevented Agent from running `otelcol.exporter.loadbalancing`
with a `routing_key` of `traceID`. (@ptodev)

v0.37.4 (2023-11-06)
-----------------

Expand All @@ -159,7 +162,7 @@ v0.37.4 (2023-11-06)
- Fix a bug where reloading the configuration of a `loki.write` component lead
to a panic. (@tpaschalis)

- Added Kubernetes service resolver to static node's loadbalancing exporter
- Added Kubernetes service resolver to static node's loadbalancing exporter
and to Flow's `otelcol.exporter.loadbalancing`. (@ptodev)

v0.37.3 (2023-10-26)
Expand Down
74 changes: 58 additions & 16 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ type Arguments interface {
DebugMetricsConfig() otelcol.DebugMetricsArguments
}

// TypeSignal is a bit field to indicate which telemetry signals the exporter supports.
type TypeSignal byte

const (
TypeLogs TypeSignal = 1 << iota // 1
TypeMetrics // 2
TypeTraces // 4
)

// TypeAll indicates that the exporter supports all telemetry signals.
const TypeAll = TypeLogs | TypeMetrics | TypeTraces

// SupportsLogs returns true if the exporter supports logs.
func (s TypeSignal) SupportsLogs() bool {
return s&TypeLogs != 0
}

// SupportsMetrics returns true if the exporter supports metrics.
func (s TypeSignal) SupportsMetrics() bool {
return s&TypeMetrics != 0
}

// SupportsTraces returns true if the exporter supports traces.
func (s TypeSignal) SupportsTraces() bool {
return s&TypeTraces != 0
}

// Exporter is a Flow component shim which manages an OpenTelemetry Collector
// exporter component.
type Exporter struct {
Expand All @@ -56,6 +83,10 @@ type Exporter struct {

sched *scheduler.Scheduler
collector *lazycollector.Collector

// Signals which the exporter is able to export.
// Can be logs, metrics, traces or any combination of them.
supportedSignals TypeSignal
}

var (
Expand All @@ -69,7 +100,7 @@ var (
//
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelexporter.Factory, args Arguments) (*Exporter, error) {
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)
Expand All @@ -96,6 +127,8 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments) (*Expor

sched: scheduler.New(opts.Logger),
collector: collector,

supportedSignals: supportedSignals,
}
if err := e.Update(args); err != nil {
return nil, err
Expand Down Expand Up @@ -162,25 +195,34 @@ func (e *Exporter) Update(args component.Arguments) error {
// supported telemetry signals.
var components []otelcomponent.Component

tracesExporter, err := e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesExporter != nil {
components = append(components, tracesExporter)
var tracesExporter otelexporter.Traces
if e.supportedSignals.SupportsTraces() {
tracesExporter, err = e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesExporter != nil {
components = append(components, tracesExporter)
}
}

metricsExporter, err := e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if metricsExporter != nil {
components = append(components, metricsExporter)
var metricsExporter otelexporter.Metrics
if e.supportedSignals.SupportsMetrics() {
metricsExporter, err = e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if metricsExporter != nil {
components = append(components, metricsExporter)
}
}

logsExporter, err := e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if logsExporter != nil {
components = append(components, logsExporter)
var logsExporter otelexporter.Logs
if e.supportedSignals.SupportsLogs() {
logsExporter, err = e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if logsExporter != nil {
components = append(components, logsExporter)
}
}

// Schedule the components to run once our component is running.
Expand Down
36 changes: 35 additions & 1 deletion component/otelcol/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment {
}, otelcomponent.StabilityLevelUndefined),
)

return exporter.New(opts, factory, args.(exporter.Arguments))
return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeAll)
},
}

Expand Down Expand Up @@ -198,3 +198,37 @@ func createTestTraces() ptrace.Traces {
}
return data
}

func TestExporterSignalType(t *testing.T) {
//
// Check if ExporterAll supports all signals
//
require.True(t, exporter.TypeAll.SupportsLogs())
require.True(t, exporter.TypeAll.SupportsMetrics())
require.True(t, exporter.TypeAll.SupportsTraces())

//
// Make sure each of the 3 signals supports itself
//
require.True(t, exporter.TypeLogs.SupportsLogs())
require.True(t, exporter.TypeMetrics.SupportsMetrics())
require.True(t, exporter.TypeTraces.SupportsTraces())

//
// Make sure Logs does not support Metrics and Traces.
//
require.False(t, exporter.TypeLogs.SupportsMetrics())
require.False(t, exporter.TypeLogs.SupportsTraces())

//
// Make sure Metrics does not support Logs and Traces.
//
require.False(t, exporter.TypeMetrics.SupportsLogs())
require.False(t, exporter.TypeMetrics.SupportsTraces())

//
// Make sure Traces does not support Logs and Metrics.
//
require.False(t, exporter.TypeTraces.SupportsLogs())
require.False(t, exporter.TypeTraces.SupportsMetrics())
}
5 changes: 4 additions & 1 deletion component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := loadbalancingexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments))
//TODO(ptodev): LB exporter cannot yet work with metrics due to a limitation in the Agent:
// https://github.com/grafana/agent/pull/5684
// Once the limitation is removed, we may be able to remove the need for exporter.TypeSignal altogether.
return exporter.New(opts, fact, args.(Arguments), exporter.TypeLogs|exporter.TypeTraces)
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/exporter/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := loggingexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments))
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/exporter/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := otlpexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments))
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/exporter/otlphttp/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := otlphttpexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments))
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
},
})
}
Expand Down

0 comments on commit a2817f5

Please sign in to comment.