Skip to content

Commit

Permalink
[processor/cumulativetodelta] Add metric type filter (#34407)
Browse files Browse the repository at this point in the history
**Description:** Add metric type filter for cumulativetodelta processor

**Link to tracking Issue:** #33673

**Testing:** Added unit tests

**Documentation:** Extended the readme of this component to describe
this new filter

---------

Signed-off-by: Florian Bacher <[email protected]>
  • Loading branch information
bacherfl authored Jan 24, 2025
1 parent 27cab50 commit 4abd68a
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 73 deletions.
27 changes: 27 additions & 0 deletions .chloggen/cumulative-to-delta-processor-metric-type-filter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cumulativetodeltaprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add metric type filter for cumulativetodelta processor

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33673]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
46 changes: 44 additions & 2 deletions processor/cumulativetodeltaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ Configuration is specified through a list of metrics. The processor uses metric

The following settings can be optionally configured:

- `include`: List of metrics names or patterns to convert to delta.
- `exclude`: List of metrics names or patterns to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.**
- `include`: List of metrics names (case-insensitive), patterns or metric types to convert to delta. Valid values are: `sum`, `histogram`.
- `exclude`: List of metrics names (case-insensitive), patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** Valid values are: `sum`, `histogram`.
- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0
- `initial_value`: Handling of the first observed point for a given metric identity.
When the collector (re)starts, there's no record of how much of a given cumulative counter has already been converted to delta values.
Expand Down Expand Up @@ -56,6 +56,17 @@ processors:
match_type: strict
```
```yaml
processors:
# processor name: cumulativetodelta
cumulativetodelta:

# Convert all sum metrics
include:
metric_types:
- sum
```
```yaml
processors:
# processor name: cumulativetodelta
Expand All @@ -69,6 +80,21 @@ processors:
match_type: regexp
```
```yaml
processors:
# processor name: cumulativetodelta
cumulativetodelta:

# Convert cumulative sum metrics to delta
# if and only if 'metric' is in the name
include:
metrics:
- ".*metric.*"
match_type: regexp
metric_types:
- sum
```
```yaml
processors:
# processor name: cumulativetodelta
Expand All @@ -82,6 +108,22 @@ processors:
match_type: regexp
```
```yaml
processors:
# processor name: cumulativetodelta
cumulativetodelta:

# Convert cumulative sum metrics with 'metric' in their name,
# but exclude histogram metrics
include:
metrics:
- ".*metric.*"
match_type: regexp
exclude:
metric_types:
- histogram
```
```yaml
processors:
# processor name: cumulativetodelta
Expand Down
31 changes: 31 additions & 0 deletions processor/cumulativetodeltaprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,24 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele

import (
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
"golang.org/x/exp/maps"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking"
)

var validMetricTypes = map[string]bool{
strings.ToLower(pmetric.MetricTypeSum.String()): true,
strings.ToLower(pmetric.MetricTypeHistogram.String()): true,
}

var validMetricTypeList = maps.Keys(validMetricTypes)

// Config defines the configuration for the processor.
type Config struct {
// MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely.
Expand All @@ -37,6 +47,8 @@ type MatchMetrics struct {
filterset.Config `mapstructure:",squash"`

Metrics []string `mapstructure:"metrics"`

MetricTypes []string `mapstructure:"metric_types"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -52,5 +64,24 @@ func (config *Config) Validate() error {
(len(config.Exclude.MatchType) > 0 && len(config.Exclude.Metrics) == 0) {
return fmt.Errorf("metrics must be supplied if match_type is set")
}

for _, metricType := range config.Exclude.MetricTypes {
if valid := validMetricTypes[strings.ToLower(metricType)]; !valid {
return fmt.Errorf(
"found invalid metric type in exclude.metric_types: %s. Valid values are %s",
metricType,
validMetricTypeList,
)
}
}
for _, metricType := range config.Include.MetricTypes {
if valid := validMetricTypes[strings.ToLower(metricType)]; !valid {
return fmt.Errorf(
"found invalid metric type in include.metric_types: %s. Valid values are %s",
metricType,
validMetricTypeList,
)
}
}
return nil
}
40 changes: 40 additions & 0 deletions processor/cumulativetodeltaprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package cumulativetodeltaprocessor

import (
"fmt"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -82,6 +83,45 @@ func TestLoadConfig(t *testing.T) {
InitialValue: tracking.InitialValueAuto,
},
},
{
id: component.NewIDWithName(metadata.Type, "metric_type_filter"),
expected: &Config{
Include: MatchMetrics{
Metrics: []string{
"a*",
},
Config: filterset.Config{
MatchType: "regexp",
RegexpConfig: nil,
},
MetricTypes: []string{
"sum",
},
},
Exclude: MatchMetrics{
Metrics: []string{
"b*",
},
Config: filterset.Config{
MatchType: "regexp",
RegexpConfig: nil,
},
MetricTypes: []string{
"histogram",
},
},
MaxStaleness: 10 * time.Second,
InitialValue: tracking.InitialValueAuto,
},
},
{
id: component.NewIDWithName(metadata.Type, "invalid_include_metric_type_filter"),
errorMessage: fmt.Sprintf("found invalid metric type in include.metric_types: gauge. Valid values are %s", validMetricTypeList),
},
{
id: component.NewIDWithName(metadata.Type, "invalid_exclude_metric_type_filter"),
errorMessage: fmt.Sprintf("found invalid metric type in exclude.metric_types: Invalid. Valid values are %s", validMetricTypeList),
},
{
id: component.NewIDWithName(metadata.Type, "missing_match_type"),
errorMessage: "match_type must be set if metrics are supplied",
Expand Down
5 changes: 4 additions & 1 deletion processor/cumulativetodeltaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func createMetricsProcessor(
return nil, fmt.Errorf("configuration parsing error")
}

metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, set.Logger)
metricsProcessor, err := newCumulativeToDeltaProcessor(processorConfig, set.Logger)
if err != nil {
return nil, err
}

return processorhelper.NewMetrics(
ctx,
Expand Down
7 changes: 7 additions & 0 deletions processor/cumulativetodeltaprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cumulativetodeltaprocessor
import (
"context"
"path/filepath"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -61,6 +62,12 @@ func TestCreateProcessors(t *testing.T) {
processortest.NewNopSettings(),
cfg,
consumertest.NewNop())

if strings.Contains(k, "invalid") {
assert.Error(t, mErr)
assert.Nil(t, mp)
return
}
assert.NotNil(t, mp)
assert.NoError(t, mErr)
assert.NoError(t, mp.Shutdown(context.Background()))
Expand Down
1 change: 1 addition & 0 deletions processor/cumulativetodeltaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
go.opentelemetry.io/collector/processor/processortest v0.118.1-0.20250121185328-fbefb22cc2b3
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
)

require (
Expand Down
2 changes: 2 additions & 0 deletions processor/cumulativetodeltaprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 54 additions & 14 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele

import (
"context"
"fmt"
"math"
"strings"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
Expand All @@ -15,35 +17,71 @@ import (
)

type cumulativeToDeltaProcessor struct {
includeFS filterset.FilterSet
excludeFS filterset.FilterSet
logger *zap.Logger
deltaCalculator *tracking.MetricTracker
cancelFunc context.CancelFunc
includeFS filterset.FilterSet
excludeFS filterset.FilterSet
includeMetricTypes map[pmetric.MetricType]bool
excludeMetricTypes map[pmetric.MetricType]bool
logger *zap.Logger
deltaCalculator *tracking.MetricTracker
cancelFunc context.CancelFunc
}

func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor {
func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) (*cumulativeToDeltaProcessor, error) {
ctx, cancel := context.WithCancel(context.Background())

p := &cumulativeToDeltaProcessor{
logger: logger,
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue),
cancelFunc: cancel,
logger: logger,
cancelFunc: cancel,
}
if len(config.Include.Metrics) > 0 {
p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config)
}
if len(config.Exclude.Metrics) > 0 {
p.excludeFS, _ = filterset.CreateFilterSet(config.Exclude.Metrics, &config.Exclude.Config)
}
return p

if len(config.Include.MetricTypes) > 0 {
includeMetricTypeFilter, err := getMetricTypeFilter(config.Include.MetricTypes)
if err != nil {
return nil, err
}
p.includeMetricTypes = includeMetricTypeFilter
}

if len(config.Exclude.MetricTypes) > 0 {
excludeMetricTypeFilter, err := getMetricTypeFilter(config.Exclude.MetricTypes)
if err != nil {
return nil, err
}
p.excludeMetricTypes = excludeMetricTypeFilter
}

p.deltaCalculator = tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue)

return p, nil
}

func getMetricTypeFilter(types []string) (map[pmetric.MetricType]bool, error) {
res := map[pmetric.MetricType]bool{}
for _, t := range types {
switch strings.ToLower(t) {
case strings.ToLower(pmetric.MetricTypeSum.String()):
res[pmetric.MetricTypeSum] = true
case strings.ToLower(pmetric.MetricTypeHistogram.String()):
res[pmetric.MetricTypeHistogram] = true
default:
return nil, fmt.Errorf("unsupported metric type filter: %s", t)
}
}
return res, nil
}

// processMetrics implements the ProcessMetricsFunc type.
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool {
rm.ScopeMetrics().RemoveIf(func(ilm pmetric.ScopeMetrics) bool {
ilm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
if !ctdp.shouldConvertMetric(m.Name()) {
if !ctdp.shouldConvertMetric(m) {
return false
}
switch m.Type() {
Expand Down Expand Up @@ -111,9 +149,11 @@ func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error {
return nil
}

func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) bool {
return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metricName)) &&
(ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName))
func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metric pmetric.Metric) bool {
return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metric.Name())) &&
(len(ctdp.includeMetricTypes) == 0 || ctdp.includeMetricTypes[metric.Type()]) &&
(ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metric.Name())) &&
(len(ctdp.excludeMetricTypes) == 0 || !ctdp.excludeMetricTypes[metric.Type()])
}

func (ctdp *cumulativeToDeltaProcessor) convertNumberDataPoints(dps pmetric.NumberDataPointSlice, baseIdentity tracking.MetricIdentity) {
Expand Down
Loading

0 comments on commit 4abd68a

Please sign in to comment.