Skip to content

Commit

Permalink
[connector/routing] Disconnect 'match_once' parameter (#37095)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Jan 9, 2025
1 parent 70ab6e9 commit dcf42df
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 351 deletions.
29 changes: 29 additions & 0 deletions .chloggen/routing-disconnect-matchonce-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change `match_once` parameter from `bool` to `*bool`.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29882]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Boolean values should still unmarshal successfully, but direct instantiation in code will fail.
The change allows us to check for usage and warn of the upcoming removal in v0.120.0.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
28 changes: 28 additions & 0 deletions .chloggen/routing-disconnect-matchonce.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Disconnect `match_once` parameter from functionality.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29882]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The parameter will be ignored, except to trigger a warning log about its upcoming removal in v0.120.0.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
8 changes: 2 additions & 6 deletions connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type Config struct {
Table []RoutingTableItem `mapstructure:"table"`

// MatchOnce determines whether the connector matches multiple statements.
// Optional.
MatchOnce bool `mapstructure:"match_once"`
// Unused. Deprecated in v0.116.0. Will be removed in v0.120.0.
MatchOnce *bool `mapstructure:"match_once"`
}

// Validate checks if the processor configuration is valid.
Expand Down Expand Up @@ -77,10 +77,6 @@ func (c *Config) Validate() error {
return err
}
fallthrough
case "span", "metric", "datapoint", "log": // ok
if !c.MatchOnce {
return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context)
}
default:
return errors.New("invalid context: " + item.Context)
}
Expand Down
68 changes: 0 additions & 68 deletions connector/routingconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestLoadConfig(t *testing.T) {
configPath: filepath.Join("testdata", "config", "traces.yaml"),
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
MatchOnce: true,
DefaultPipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp-all"),
},
Expand All @@ -53,7 +52,6 @@ func TestLoadConfig(t *testing.T) {
configPath: filepath.Join("testdata", "config", "metrics.yaml"),
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
MatchOnce: true,
DefaultPipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalMetrics, "otlp-all"),
},
Expand All @@ -79,7 +77,6 @@ func TestLoadConfig(t *testing.T) {
configPath: filepath.Join("testdata", "config", "logs.yaml"),
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
MatchOnce: true,
DefaultPipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalLogs, "otlp-all"),
},
Expand Down Expand Up @@ -221,70 +218,6 @@ func TestValidateConfig(t *testing.T) {
},
error: "invalid context: invalid",
},
{
name: "span context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "span",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: `"span" context is not supported with "match_once: false"`,
},
{
name: "metric context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "metric",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: `"metric" context is not supported with "match_once: false"`,
},
{
name: "datapoint context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "datapoint",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: `"datapoint" context is not supported with "match_once: false"`,
},
{
name: "log context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "log",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: `"log" context is not supported with "match_once: false"`,
},
{
name: "request context with statement",
config: &Config{
Expand Down Expand Up @@ -349,7 +282,6 @@ func withDefault(pipelines ...pipeline.ID) testConfigOption {

func testConfig(opts ...testConfigOption) *Config {
cfg := createDefaultConfig().(*Config)
cfg.MatchOnce = true
for _, opt := range opts {
opt(cfg)
}
Expand Down
1 change: 0 additions & 1 deletion connector/routingconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func NewFactory() connector.Factory {
func createDefaultConfig() component.Config {
return &Config{
ErrorMode: ottl.PropagateError,
MatchOnce: true,
}
}

Expand Down
49 changes: 2 additions & 47 deletions connector/routingconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func newLogsConnector(
) (*logsConnector, error) {
cfg := config.(*Config)

if !cfg.MatchOnce {
set.Logger.Error("The 'match_once' field has been deprecated and will be removed in v0.120.0. Remove usage of the parameter to suppress this warning.")
if cfg.MatchOnce != nil {
set.Logger.Error("The 'match_once' field has been deprecated and no longer has any effect. It will be removed in v0.120.0.")
}

lr, ok := logs.(connector.LogsRouterAndConsumer)
Expand Down Expand Up @@ -65,15 +65,6 @@ func (c *logsConnector) Capabilities() consumer.Capabilities {
}

func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if c.config.MatchOnce {
return c.switchLogs(ctx, ld)
}
return c.matchAllLogs(ctx, ld)
}

// switchLogs removes items from the original plog.Logs as they are matched,
// and sends them to the appropriate consumer.
func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error {
groups := make(map[consumer.Logs]plog.Logs)
var errs error
for i := 0; i < len(c.router.routeSlice) && ld.ResourceLogs().Len() > 0; i++ {
Expand Down Expand Up @@ -120,42 +111,6 @@ func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error {
return errs
}

func (c *logsConnector) matchAllLogs(ctx context.Context, ld plog.Logs) error {
// routingEntry is used to group plog.ResourceLogs that are routed to
// the same set of exporters.
// This way we're not ending up with all the logs split up which would cause
// higher CPU usage.
groups := make(map[consumer.Logs]plog.Logs)
var errs error
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)
rtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs)
noRoutesMatch := true
for _, route := range c.router.routeSlice {
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
return err
}
groupLogs(groups, c.router.defaultConsumer, rlogs)
continue
}
if isMatch {
noRoutesMatch = false
groupLogs(groups, route.consumer, rlogs)
}
}
if noRoutesMatch {
// no route conditions are matched, add resource logs to default exporters group
groupLogs(groups, c.router.defaultConsumer, rlogs)
}
}
for consumer, group := range groups {
errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group))
}
return errs
}

func groupAllLogs(
groups map[consumer.Logs]plog.Logs,
cons consumer.Logs,
Expand Down
52 changes: 0 additions & 52 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,57 +160,6 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
assert.Empty(t, sink1.AllLogs())
})

t.Run("logs matched by two expressions", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "x_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Empty(t, defaultSink.AllLogs())
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 1)

assert.Equal(t, 2, sink0.AllLogs()[0].LogRecordCount())
assert.Equal(t, 2, sink1.AllLogs()[0].LogRecordCount())
assert.Equal(t, sink0.AllLogs(), sink1.AllLogs())
})

t.Run("one log matched by multiple expressions, other matched none", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 1)

assert.Equal(t, sink0.AllLogs(), sink1.AllLogs())

rlog := defaultSink.AllLogs()[0].ResourceLogs().At(0)
attr, ok := rlog.Resource().Attributes().Get("X-Tenant")
assert.True(t, ok, "routing attribute must exists")
assert.Equal(t, "something-else", attr.AsString())
})

t.Run("logs matched by one expression, multiple pipelines", func(t *testing.T) {
resetSinks()

Expand Down Expand Up @@ -253,7 +202,6 @@ func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
Pipelines: []pipeline.ID{logsDefault, logs0},
},
},
MatchOnce: true,
}

var defaultSink, sink0, sink1 consumertest.LogsSink
Expand Down
48 changes: 2 additions & 46 deletions connector/routingconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func newMetricsConnector(
) (*metricsConnector, error) {
cfg := config.(*Config)

if !cfg.MatchOnce {
set.Logger.Error("The 'match_once' field has been deprecated and will be removed in v0.120.0. Remove usage of the parameter to suppress this warning.")
if cfg.MatchOnce != nil {
set.Logger.Error("The 'match_once' field has been deprecated and no longer has any effect. It will be removed in v0.120.0.")
}

mr, ok := metrics.(connector.MetricsRouterAndConsumer)
Expand Down Expand Up @@ -66,13 +66,6 @@ func (c *metricsConnector) Capabilities() consumer.Capabilities {
}

func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if c.config.MatchOnce {
return c.switchMetrics(ctx, md)
}
return c.matchAllMetrics(ctx, md)
}

func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics) error {
groups := make(map[consumer.Metrics]pmetric.Metrics)
var errs error
for i := 0; i < len(c.router.routeSlice) && md.ResourceMetrics().Len() > 0; i++ {
Expand Down Expand Up @@ -128,43 +121,6 @@ func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics
return errs
}

func (c *metricsConnector) matchAllMetrics(ctx context.Context, md pmetric.Metrics) error {
// groups is used to group pmetric.ResourceMetrics that are routed to
// the same set of exporters. This way we're not ending up with all the
// metrics split up which would cause higher CPU usage.
groups := make(map[consumer.Metrics]pmetric.Metrics)

var errs error
for i := 0; i < md.ResourceMetrics().Len(); i++ {
rmetrics := md.ResourceMetrics().At(i)
rtx := ottlresource.NewTransformContext(rmetrics.Resource(), rmetrics)

noRoutesMatch := true
for _, route := range c.router.routeSlice {
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
return err
}
groupMetrics(groups, c.router.defaultConsumer, rmetrics)
continue
}
if isMatch {
noRoutesMatch = false
groupMetrics(groups, route.consumer, rmetrics)
}
}
if noRoutesMatch {
// no route conditions are matched, add resource metrics to default exporters group
groupMetrics(groups, c.router.defaultConsumer, rmetrics)
}
}
for consumer, group := range groups {
errs = errors.Join(errs, consumer.ConsumeMetrics(ctx, group))
}
return errs
}

func groupAllMetrics(
groups map[consumer.Metrics]pmetric.Metrics,
cons consumer.Metrics,
Expand Down
Loading

0 comments on commit dcf42df

Please sign in to comment.