Skip to content

Commit

Permalink
[processor/transform] Remove functions option from config (open-telem…
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth authored Aug 15, 2022
1 parent f7afd4c commit a5b841e
Show file tree
Hide file tree
Showing 15 changed files with 27 additions and 54 deletions.
13 changes: 7 additions & 6 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/telemetryquerylanguage/contexts/tqllogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/telemetryquerylanguage/contexts/tqlmetrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/telemetryquerylanguage/contexts/tqltraces"
Expand All @@ -26,9 +30,6 @@ import (

type SignalConfig struct {
Queries []string `mapstructure:"queries"`

// The functions that have been registered in the extension for processing.
functions map[string]interface{} `mapstructure:"-"`
}

type Config struct {
Expand All @@ -43,15 +44,15 @@ var _ config.Processor = (*Config)(nil)

func (c *Config) Validate() error {
var errors error
_, err := tql.ParseQueries(c.Traces.Queries, c.Traces.functions, tqltraces.ParsePath, tqltraces.ParseEnum)
_, err := tql.ParseQueries(c.Traces.Queries, traces.Functions(), tqltraces.ParsePath, tqltraces.ParseEnum)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = tql.ParseQueries(c.Metrics.Queries, c.Metrics.functions, tqlmetrics.ParsePath, tqlmetrics.ParseEnum)
_, err = tql.ParseQueries(c.Metrics.Queries, metrics.Functions(), tqlmetrics.ParsePath, tqlmetrics.ParseEnum)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = tql.ParseQueries(c.Logs.Queries, c.Logs.functions, tqllogs.ParsePath, tqllogs.ParseEnum)
_, err = tql.ParseQueries(c.Logs.Queries, logs.Functions(), tqllogs.ParsePath, tqllogs.ParseEnum)
if err != nil {
errors = multierr.Append(errors, err)
}
Expand Down
11 changes: 0 additions & 11 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
)

func TestLoadingConfig(t *testing.T) {
Expand All @@ -48,24 +43,18 @@ func TestLoadingConfig(t *testing.T) {
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: traces.DefaultFunctions(),
},
Metrics: SignalConfig{
Queries: []string{
`set(metric.name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: metrics.DefaultFunctions(),
},
Logs: SignalConfig{
Queries: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: logs.DefaultFunctions(),
},
})
}
Expand Down
12 changes: 3 additions & 9 deletions processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,12 @@ func createDefaultConfig() config.Processor {
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: SignalConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
Traces: SignalConfig{
Queries: []string{},

functions: traces.DefaultFunctions(),
},
Metrics: SignalConfig{
Queries: []string{},

functions: metrics.DefaultFunctions(),
},
}
}
Expand All @@ -74,7 +68,7 @@ func createLogsProcessor(
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := logs.NewProcessor(oCfg.Logs.Queries, oCfg.Logs.functions, set)
proc, err := logs.NewProcessor(oCfg.Logs.Queries, logs.Functions(), set)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand All @@ -95,7 +89,7 @@ func createTracesProcessor(
) (component.TracesProcessor, error) {
oCfg := cfg.(*Config)

proc, err := traces.NewProcessor(oCfg.Traces.Queries, oCfg.Traces.functions, set)
proc, err := traces.NewProcessor(oCfg.Traces.Queries, traces.Functions(), set)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand All @@ -116,7 +110,7 @@ func createMetricsProcessor(
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := metrics.NewProcessor(oCfg.Metrics.Queries, oCfg.Metrics.functions, set)
proc, err := metrics.NewProcessor(oCfg.Metrics.Queries, metrics.Functions(), set)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand Down
11 changes: 0 additions & 11 deletions processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
)

func TestFactory_Type(t *testing.T) {
Expand All @@ -45,18 +40,12 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Traces: SignalConfig{
Queries: []string{},

functions: traces.DefaultFunctions(),
},
Metrics: SignalConfig{
Queries: []string{},

functions: metrics.DefaultFunctions(),
},
Logs: SignalConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
})
assert.NoError(t, configtest.CheckConfigStruct(cfg))
Expand Down
2 changes: 1 addition & 1 deletion processor/transformprocessor/internal/common/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ var registry = map[string]interface{}{
"delete_matching_keys": tqlotel.DeleteMatchingKeys,
}

func DefaultFunctions() map[string]interface{} {
func Functions() map[string]interface{} {
return registry
}
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/logs/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func DefaultFunctions() map[string]interface{} {
func Functions() map[string]interface{} {
// No logs-only functions yet.
return common.DefaultFunctions()
return common.Functions()
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import (
)

func Test_DefaultFunctions(t *testing.T) {
assert.Equal(t, common.DefaultFunctions(), DefaultFunctions())
assert.Equal(t, common.Functions(), Functions())
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestProcess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.query, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor([]string{tt.query}, DefaultFunctions(), component.ProcessorCreateSettings{})
processor, err := NewProcessor([]string{tt.query}, Functions(), component.ProcessorCreateSettings{})
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func summaryTest(tests []summaryTestCase, t *testing.T) {
actualMetrics := pmetric.NewMetricSlice()
tt.input.CopyTo(actualMetrics.AppendEmpty())

evaluate, err := tql.NewFunctionCall(tt.inv, DefaultFunctions(), tqlmetrics.ParsePath, tqlmetrics.ParseEnum)
evaluate, err := tql.NewFunctionCall(tt.inv, Functions(), tqlmetrics.ParsePath, tqlmetrics.ParseEnum)
assert.NoError(t, err)
evaluate(tqlmetrics.MetricTransformContext{
InstrumentationScope: pcommon.NewInstrumentationScope(),
Expand Down
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ var registry = map[string]interface{}{

func init() {
// Init metrics registry with default functions common to all signals
for k, v := range common.DefaultFunctions() {
for k, v := range common.Functions() {
registry[k] = v
}
}

func DefaultFunctions() map[string]interface{} {
func Functions() map[string]interface{} {
return registry
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
)

func Test_DefaultFunctions(t *testing.T) {
expectedFunctions := common.DefaultFunctions()
expectedFunctions := common.Functions()
expectedFunctions["convert_sum_to_gauge"] = convertSumToGauge
expectedFunctions["convert_gauge_to_sum"] = convertGaugeToSum
expectedFunctions["convert_summary_sum_val_to_sum"] = convertSummarySumValToSum
expectedFunctions["convert_summary_count_val_to_sum"] = convertSummaryCountValToSum

actual := DefaultFunctions()
actual := Functions()

assert.NotNil(t, actual)
assert.Equal(t, len(expectedFunctions), len(actual))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestProcess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.query[0], func(t *testing.T) {
td := constructMetrics()
processor, err := NewProcessor(tt.query, DefaultFunctions(), component.ProcessorCreateSettings{})
processor, err := NewProcessor(tt.query, Functions(), component.ProcessorCreateSettings{})
assert.NoError(t, err)

_, err = processor.ProcessMetrics(context.Background(), td)
Expand Down
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/traces/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func DefaultFunctions() map[string]interface{} {
func Functions() map[string]interface{} {
// No trace-only functions yet.
return common.DefaultFunctions()
return common.Functions()
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import (
)

func Test_DefaultFunctions(t *testing.T) {
assert.Equal(t, common.DefaultFunctions(), DefaultFunctions())
assert.Equal(t, common.Functions(), Functions())
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestProcess(t *testing.T) {
for _, tt := range tests {
t.Run(tt.query, func(t *testing.T) {
td := constructTraces()
processor, err := NewProcessor([]string{tt.query}, DefaultFunctions(), component.ProcessorCreateSettings{})
processor, err := NewProcessor([]string{tt.query}, Functions(), component.ProcessorCreateSettings{})
assert.NoError(t, err)

_, err = processor.ProcessTraces(context.Background(), td)
Expand Down Expand Up @@ -220,7 +220,7 @@ func BenchmarkTwoSpans(b *testing.B) {

for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
processor, err := NewProcessor(tt.queries, DefaultFunctions(), component.ProcessorCreateSettings{})
processor, err := NewProcessor(tt.queries, Functions(), component.ProcessorCreateSettings{})
assert.NoError(b, err)
b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -262,7 +262,7 @@ func BenchmarkHundredSpans(b *testing.B) {
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
processor, err := NewProcessor(tt.queries, DefaultFunctions(), component.ProcessorCreateSettings{})
processor, err := NewProcessor(tt.queries, Functions(), component.ProcessorCreateSettings{})
assert.NoError(b, err)
b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand Down

0 comments on commit a5b841e

Please sign in to comment.