Skip to content

Commit

Permalink
[receiver/kubeletstatsreceiver] Emit network metrics without a direct…
Browse files Browse the repository at this point in the history
…ion attribute (#12588)

Update the kubeletstatsreceiver so that it can emit metrics without a direction attribute. The behavior of the direction attribute is currently controlled by the feature gates `receiver.kubeletstatsreceiver.emitMetricsWithoutDirectionAttribute` and `receiver.kubeletstatsreceiver.emitMetricsWithDirectionAttribute`. The feature gates will be available for a grace period to help users transition to metrics without a direction attribute, which will eventually become the default.
  • Loading branch information
mwear authored Aug 12, 2022
1 parent 56bd8ab commit 64bc925
Show file tree
Hide file tree
Showing 11 changed files with 841 additions and 35 deletions.
24 changes: 24 additions & 0 deletions receiver/kubeletstatsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,29 @@ The following parameters can also be specified:
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).

## Metrics

Details about the metrics produced by this receiver can be found in [metadata.yaml](./metadata.yaml) with further documentation in [documentation.md](./documentation.md)

### Feature gate configurations

#### Transition from metrics with "direction" attribute

Some kubeletstats metrics reported are transitioning from being reported with a `direction` attribute to being reported with the
direction included in the metric name to adhere to the OpenTelemetry specification
(https://github.com/open-telemetry/opentelemetry-specification/pull/2617):

- `k8s.node.network.io` will become:
- `k8s.node.network.io.transmit`
- `k8s.node.network.io.receive`
- `k8s.node.network.errors` will become:
- `k8s.node.network.errors.transmit`
- `k8s.node.network.errors.receive`

The following feature gates control the transition process:

- **receiver.kubeletstatsreceiver.emitMetricsWithoutDirectionAttribute**: controls if the new metrics without `direction` attribute are emitted by the receiver.
- **receiver.kubeletstatsreceiver.emitMetricsWithDirectionAttribute**: controls if the deprecated metrics with `direction` attribute are emitted by the receiver.

[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
8 changes: 8 additions & 0 deletions receiver/kubeletstatsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ These are the metrics available for this scraper.
| **k8s.node.memory.usage** | Node memory usage | By | Gauge(Int) | <ul> </ul> |
| **k8s.node.memory.working_set** | Node memory working_set | By | Gauge(Int) | <ul> </ul> |
| **k8s.node.network.errors** | Node network errors | 1 | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.node.network.errors.receive** | Node network receive errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.node.network.errors.transmit** | Node network transmission errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.node.network.io** | Node network IO | By | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.node.network.io.receive** | Node network IO received | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.node.network.io.transmit** | Node network IO transmitted | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.cpu.time** | Pod CPU time | s | Sum(Double) | <ul> </ul> |
| **k8s.pod.cpu.utilization** | Pod CPU utilization | 1 | Gauge(Double) | <ul> </ul> |
| **k8s.pod.filesystem.available** | Pod filesystem available | By | Gauge(Int) | <ul> </ul> |
Expand All @@ -44,7 +48,11 @@ These are the metrics available for this scraper.
| **k8s.pod.memory.usage** | Pod memory usage | By | Gauge(Int) | <ul> </ul> |
| **k8s.pod.memory.working_set** | Pod memory working_set | By | Gauge(Int) | <ul> </ul> |
| **k8s.pod.network.errors** | Pod network errors | 1 | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.pod.network.errors.receive** | Pod network receive errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.network.errors.transmit** | Pod network transmission errors | 1 | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.network.io** | Pod network IO | By | Sum(Int) | <ul> <li>interface</li> <li>direction</li> </ul> |
| **k8s.pod.network.io.receive** | Pod network IO received | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.pod.network.io.transmit** | Pod network IO transmitted | By | Sum(Int) | <ul> <li>interface</li> </ul> |
| **k8s.volume.available** | The number of available bytes in the volume. | By | Gauge(Int) | <ul> </ul> |
| **k8s.volume.capacity** | The total capacity in bytes of the volume. | By | Gauge(Int) | <ul> </ul> |
| **k8s.volume.inodes** | The total inodes in the filesystem. | 1 | Gauge(Int) | <ul> </ul> |
Expand Down
28 changes: 20 additions & 8 deletions receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ var ValidMetricGroups = map[MetricGroup]bool{
}

type metricDataAccumulator struct {
m []pmetric.Metrics
metadata Metadata
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
time time.Time
mbs *metadata.MetricsBuilders
m []pmetric.Metrics
metadata Metadata
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
time time.Time
mbs *metadata.MetricsBuilders
emitMetricsWithDirectionAttribute bool
emitMetricsWithoutDirectionAttribute bool
}

func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
Expand All @@ -61,7 +63,12 @@ func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
addCPUMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeFilesystemMetrics, s.Fs, currentTime)
addNetworkMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetrics, s.Network, currentTime)
if a.emitMetricsWithDirectionAttribute {
addNetworkMetricsWithDirection(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetricsWithDirection, s.Network, currentTime)
}
if a.emitMetricsWithoutDirectionAttribute {
addNetworkMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetrics, s.Network, currentTime)
}
// todo s.Runtime.ImageFs

a.m = append(a.m, a.mbs.NodeMetricsBuilder.Emit(
Expand All @@ -79,7 +86,12 @@ func (a *metricDataAccumulator) podStats(s stats.PodStats) {
addCPUMetrics(a.mbs.PodMetricsBuilder, metadata.PodCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.PodMetricsBuilder, metadata.PodMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.PodMetricsBuilder, metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)
if a.emitMetricsWithDirectionAttribute {
addNetworkMetricsWithDirection(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetricsWithDirection, s.Network, currentTime)
}
if a.emitMetricsWithoutDirectionAttribute {
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)
}

a.m = append(a.m, a.mbs.PodMetricsBuilder.Emit(
metadata.WithStartTimeOverride(pcommon.NewTimestampFromTime(s.StartTime.Time)),
Expand Down
16 changes: 10 additions & 6 deletions receiver/kubeletstatsreceiver/internal/kubelet/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ func MetricsData(
logger *zap.Logger, summary *stats.Summary,
metadata Metadata,
metricGroupsToCollect map[MetricGroup]bool,
mbs *metadata.MetricsBuilders) []pmetric.Metrics {
mbs *metadata.MetricsBuilders,
emitMetricsWithDirectionAttribute,
emitMetricsWithoutDirectionAttribute bool) []pmetric.Metrics {
acc := &metricDataAccumulator{
metadata: metadata,
logger: logger,
metricGroupsToCollect: metricGroupsToCollect,
time: time.Now(),
mbs: mbs,
metadata: metadata,
logger: logger,
metricGroupsToCollect: metricGroupsToCollect,
time: time.Now(),
mbs: mbs,
emitMetricsWithDirectionAttribute: emitMetricsWithDirectionAttribute,
emitMetricsWithoutDirectionAttribute: emitMetricsWithoutDirectionAttribute,
}
acc.nodeStats(summary.Node)
for _, podStats := range summary.Pods {
Expand Down
60 changes: 51 additions & 9 deletions receiver/kubeletstatsreceiver/internal/kubelet/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func TestMetricAccumulator(t *testing.T) {
ContainerMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
OtherMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
}
requireMetricsOk(t, MetricsData(zap.NewNop(), summary, k8sMetadata, ValidMetricGroups, mbs))
requireMetricsOk(t, MetricsData(zap.NewNop(), summary, k8sMetadata, ValidMetricGroups, mbs, true, false))
// Disable all groups
mbs.NodeMetricsBuilder.Reset()
mbs.PodMetricsBuilder.Reset()
mbs.OtherMetricsBuilder.Reset()
require.Equal(t, 0, len(MetricsData(zap.NewNop(), summary, k8sMetadata, map[MetricGroup]bool{}, mbs)))
require.Equal(t, 0, len(MetricsData(zap.NewNop(), summary, k8sMetadata, map[MetricGroup]bool{}, mbs, true, false)))
}

func requireMetricsOk(t *testing.T, mds []pmetric.Metrics) {
Expand Down Expand Up @@ -112,7 +112,7 @@ func requireResourceOk(t *testing.T, resource pcommon.Resource) {
}

func TestWorkingSetMem(t *testing.T) {
metrics := indexedFakeMetrics()
metrics := indexedFakeMetrics(true, false)
requireContains(t, metrics, "k8s.pod.memory.working_set")
requireContains(t, metrics, "container.memory.working_set")

Expand All @@ -122,7 +122,7 @@ func TestWorkingSetMem(t *testing.T) {
}

func TestPageFaults(t *testing.T) {
metrics := indexedFakeMetrics()
metrics := indexedFakeMetrics(true, false)
requireContains(t, metrics, "k8s.pod.memory.page_faults")
requireContains(t, metrics, "container.memory.page_faults")

Expand All @@ -132,7 +132,7 @@ func TestPageFaults(t *testing.T) {
}

func TestMajorPageFaults(t *testing.T) {
metrics := indexedFakeMetrics()
metrics := indexedFakeMetrics(true, false)
requireContains(t, metrics, "k8s.pod.memory.major_page_faults")
requireContains(t, metrics, "container.memory.major_page_faults")

Expand All @@ -141,13 +141,55 @@ func TestMajorPageFaults(t *testing.T) {
require.Equal(t, int64(12), value)
}

func TestEmitMetricsWithDirectionAttribute(t *testing.T) {
metrics := indexedFakeMetrics(true, false)
metricNamesWithDirectionAttr := []string{
"k8s.node.network.io",
"k8s.node.network.errors",
"k8s.pod.network.io",
"k8s.pod.network.errors",
}
for _, name := range metricNamesWithDirectionAttr {
requireContains(t, metrics, name)
metric := metrics[name][0]
for i := 0; i < metric.Sum().DataPoints().Len(); i++ {
dp := metric.Sum().DataPoints().At(i)
_, found := dp.Attributes().Get("direction")
require.True(t, found, "expected direction attribute")
}
}
}

func TestEmitMetricsWithoutDirectionAttribute(t *testing.T) {
metrics := indexedFakeMetrics(false, true)
metricNamesWithoutDirectionAttr := []string{
"k8s.node.network.io.receive",
"k8s.node.network.io.transmit",
"k8s.node.network.errors.receive",
"k8s.node.network.errors.transmit",
"k8s.pod.network.io.receive",
"k8s.pod.network.io.transmit",
"k8s.pod.network.errors.receive",
"k8s.pod.network.errors.transmit",
}
for _, name := range metricNamesWithoutDirectionAttr {
requireContains(t, metrics, name)
metric := metrics[name][0]
for i := 0; i < metric.Sum().DataPoints().Len(); i++ {
dp := metric.Sum().DataPoints().At(i)
_, found := dp.Attributes().Get("direction")
require.False(t, found, "unexpected direction attribute")
}
}
}

func requireContains(t *testing.T, metrics map[string][]pmetric.Metric, metricName string) {
_, found := metrics[metricName]
require.True(t, found)
}

func indexedFakeMetrics() map[string][]pmetric.Metric {
mds := fakeMetrics()
func indexedFakeMetrics(emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute bool) map[string][]pmetric.Metric {
mds := fakeMetrics(emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute)
metrics := make(map[string][]pmetric.Metric)
for _, md := range mds {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
Expand All @@ -167,7 +209,7 @@ func indexedFakeMetrics() map[string][]pmetric.Metric {
return metrics
}

func fakeMetrics() []pmetric.Metrics {
func fakeMetrics(emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute bool) []pmetric.Metrics {
rc := &fakeRestClient{}
statsProvider := NewStatsProvider(rc)
summary, _ := statsProvider.StatsSummary()
Expand All @@ -182,5 +224,5 @@ func fakeMetrics() []pmetric.Metrics {
ContainerMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
OtherMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings(), componenttest.NewNopReceiverCreateSettings().BuildInfo),
}
return MetricsData(zap.NewNop(), summary, Metadata{}, mgs, mbs)
return MetricsData(zap.NewNop(), summary, Metadata{}, mgs, mbs, emitMetricsWithDirectionAttribute, emitMetricsWithoutDirectionAttribute)
}
20 changes: 19 additions & 1 deletion receiver/kubeletstatsreceiver/internal/kubelet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,25 @@ func addNetworkMetrics(mb *metadata.MetricsBuilder, networkMetrics metadata.Netw
recordNetworkDataPoint(mb, networkMetrics.Errors, s, currentTime)
}

func recordNetworkDataPoint(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordIntDataPointWithDirectionFunc, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
func recordNetworkDataPoint(mb *metadata.MetricsBuilder, r metadata.NetworkMetricsRecorder, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
if s.RxBytes == nil && s.TxBytes == nil {
return
}

r.RecordReceiveDataPoint(mb, currentTime, int64(*s.RxBytes), s.Name)
r.RecordTransmitDataPoint(mb, currentTime, int64(*s.TxBytes), s.Name)
}

func addNetworkMetricsWithDirection(mb *metadata.MetricsBuilder, networkMetrics metadata.NetworkMetricsWithDirection, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
if s == nil {
return
}

recordNetworkDataPointWithDirection(mb, networkMetrics.IO, s, currentTime)
recordNetworkDataPointWithDirection(mb, networkMetrics.Errors, s, currentTime)
}

func recordNetworkDataPointWithDirection(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordIntDataPointWithDirectionFunc, s *stats.NetworkStats, currentTime pcommon.Timestamp) {
if s.RxBytes == nil && s.TxBytes == nil {
return
}
Expand Down
Loading

0 comments on commit 64bc925

Please sign in to comment.