From d6964bc9c786400c6403abddac3da533967827e1 Mon Sep 17 00:00:00 2001 From: Alexander Dejanovski Date: Thu, 12 Oct 2023 17:36:14 +0200 Subject: [PATCH] Namespace metrics label (#1086) --- CHANGELOG/CHANGELOG-1.10.md | 1 + .../cluster-scoped/kustomization.yaml | 8 ++- .../ns-scoped/kustomization.yaml | 10 +++- .../content/en/tasks/monitor/vector/_index.md | 56 ++++++++++--------- pkg/telemetry/vector.go | 22 +++++++- pkg/telemetry/vector_test.go | 21 +++++-- 6 files changed, 83 insertions(+), 35 deletions(-) diff --git a/CHANGELOG/CHANGELOG-1.10.md b/CHANGELOG/CHANGELOG-1.10.md index 7a05a8fd8..d9b51b977 100644 --- a/CHANGELOG/CHANGELOG-1.10.md +++ b/CHANGELOG/CHANGELOG-1.10.md @@ -15,6 +15,7 @@ When cutting a new release, update the `unreleased` heading to the tag being gen ## unreleased +* [ENHANCEMENT] [#1073](https://github.com/k8ssandra/k8ssandra-operator/issues/1073) Add a namespace label to the Cassandra metrics * [BUGFIX] [#1060](https://github.com/k8ssandra/k8ssandra-operator/issues/1060) Fix restore mapping shuffling nodes when restoring in place * [BUGFIX] [#1061](https://github.com/k8ssandra/k8ssandra-operator/issues/1061) Point to cass-config-builder 1.0.7 for arm64 compatibility * [ENHANCEMENT] [#956](https://github.com/k8ssandra/k8ssandra-operator/issues/956) Enable linting in the project diff --git a/config/cass-operator/cluster-scoped/kustomization.yaml b/config/cass-operator/cluster-scoped/kustomization.yaml index a10a3347d..e1be4ca2d 100644 --- a/config/cass-operator/cluster-scoped/kustomization.yaml +++ b/config/cass-operator/cluster-scoped/kustomization.yaml @@ -2,7 +2,13 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: -- github.com/k8ssandra/cass-operator/config/deployments/cluster?ref=v1.17.2 +- github.com/k8ssandra/cass-operator/config/deployments/cluster?ref=master components: - ../../components/cass-operator-image-config + +# TODO: remove these image changes before release so that they don't pull a dev release of cass-operator. +# This is required when a new feature of cass-operator is needed that is not yet released. +images: +- name: k8ssandra/cass-operator + newTag: v1.18.0-dev.34e2ae6-20231002 \ No newline at end of file diff --git a/config/cass-operator/ns-scoped/kustomization.yaml b/config/cass-operator/ns-scoped/kustomization.yaml index 517219b62..137baddbc 100644 --- a/config/cass-operator/ns-scoped/kustomization.yaml +++ b/config/cass-operator/ns-scoped/kustomization.yaml @@ -2,7 +2,13 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: -- github.com/k8ssandra/cass-operator/config/deployments/default?ref=v1.17.2 +- github.com/k8ssandra/cass-operator/config/deployments/default?ref=master components: - - ../../components/cass-operator-image-config \ No newline at end of file + - ../../components/cass-operator-image-config + +# TODO: remove these image changes before release so that they don't pull a dev release of cass-operator. +# This is required when a new feature of cass-operator is needed that is not yet released. +images: +- name: k8ssandra/cass-operator + newTag: v1.18.0-dev.34e2ae6-20231002 \ No newline at end of file diff --git a/docs/content/en/tasks/monitor/vector/_index.md b/docs/content/en/tasks/monitor/vector/_index.md index a4c4d3d51..c93713aab 100644 --- a/docs/content/en/tasks/monitor/vector/_index.md +++ b/docs/content/en/tasks/monitor/vector/_index.md @@ -42,18 +42,13 @@ The following content will be added automatically to the vector.toml file: type = "file" include = [ "/var/log/cassandra/system.log" ] read_from = "beginning" -fingerprint.strategy = "device_and_inode +fingerprint.strategy = "device_and_inode" [sources.systemlog.multiline] start_pattern = "^(INFO|WARN|ERROR|DEBUG|TRACE|FATAL)" condition_pattern = "^(INFO|WARN|ERROR|DEBUG|TRACE|FATAL)" mode = "halt_before" timeout_ms = 10000 -[sources.cassandra_metrics] -type = "prometheus_scrape" -endpoints = [ "http://localhost:{{ .ScrapePort }}" ] -scrape_interval_secs = {{ .ScrapeInterval }} - [transforms.parse_cassandra_log] type = "remap" inputs = [ "systemlog" ] @@ -61,38 +56,44 @@ source = ''' del(.source_type) . |= parse_groks!(.message, patterns: [ "%{LOGLEVEL:loglevel}\\s+\\[(?((.+)))\\]\\s+%{TIMESTAMP_ISO8601:timestamp}\\s+%{JAVACLASS:class}:%{NUMBER:line}\\s+-\\s+(?(.+\\n?)+)", - ] -) -pod_name, err = get_env_var("POD_NAME") -if err == null { - .pod_name = pod_name -node_name, err = get_env_var("NODE_NAME") -if err == null { - .node_name = node_name - -cluster, err = get_env_var("CLUSTER_NAME") -if err == null { - .cluster = cluster - -datacenter, err = get_env_var("DATACENTER_NAME") -if err == null { - .datacenter = datacenter +[sources.cassandra_metrics_raw] +type = "prometheus_scrape" +endpoints = [ "http://localhost:{{ .ScrapePort }}" ] +scrape_interval_secs = {{ .ScrapeInterval }} -rack, err = get_env_var("RACK_NAME") +[transforms.cassandra_metrics] +type = "remap" +inputs = ["cassandra_metrics_raw"] +source = ''' +namespace, err = get_env_var("NAMESPACE") if err == null { - .rack = rack + .namespace = namespace } ''' -[sinks.console] + +[sinks.console_output] +type = "console" +inputs = ["cassandra_metrics"] +target = "stdout" +[sinks.console_output.encoding] +codec = "json" + + +[sinks.prometheus] +type = "prometheus_exporter" +inputs = ["cassandra_metrics"] + +[sinks.console_log] type = "console" inputs = ["systemlog"] target = "stdout" encoding.codec = "text" ``` -The default options are always added to the configuration, but one may override them and if not used, they're automatically cleaned up (see next section). +The default options are always added to the configuration, but one may override them and if not used, they're automatically cleaned up (see next section). +The `cassandra_metrics` transform adds the namespace of the datacenter to the exposed metrics and should be used as the input for any transform or sink that would modify or route the metrics to a remote system. ## Automated cleanup of unused sources @@ -105,6 +106,9 @@ They can be used as input in custom components added through configuration. `systemlog` input is defined as the default source for Cassandra logs. +We provide the `parse_cassandra_log` transform out of the box because it's likely to be a common need for users who ship the logs to a remote system such as Grafana Loki; however by default we don't use it and will be filtered out unless it's referenced by a custom transform/sink. +This transform will parse the Cassandra logs and extract the log level, thread, timestamp, class, line and message fields. It will also remove the `source_type` field which is added by the `systemlog` source. + ## Custom Vector configuration To customize the Vector configuration, you can add [sources](https://vector.dev/docs/reference/configuration/sources/), [transforms](https://vector.dev/docs/reference/configuration/transforms/) and [sinks](https://vector.dev/docs/reference/configuration/sinks/) in a semi-structured way under `.spec.cassandra.telemetry.vector.components`, `.spec.reaper.telemetry.vector.components` and `.spec.stargate.telemetry.vector.components`: diff --git a/pkg/telemetry/vector.go b/pkg/telemetry/vector.go index 8cfd30bf7..b797da298 100644 --- a/pkg/telemetry/vector.go +++ b/pkg/telemetry/vector.go @@ -117,7 +117,7 @@ timeout_ms = 10000 } metricsInput := telemetry.VectorSourceSpec{ - Name: "cassandra_metrics", + Name: "cassandra_metrics_raw", Type: "prometheus_scrape", Config: fmt.Sprintf("endpoints = [ \"http://localhost:%v%s\" ]\nscrape_interval_secs = %v", config.ScrapePort, config.ScrapeEndpoint, config.ScrapeInterval), } @@ -157,12 +157,32 @@ rack, err = get_env_var("RACK_NAME") if err == null { .rack = rack } +namespace, err = get_env_var("NAMESPACE") +if err == null { + .namespace = namespace +} ''' `, } transformers = append(transformers, systemLogParser) + // Add the namespace label to the Cassandra metrics + metricsParser := telemetry.VectorTransformSpec{ + Name: "cassandra_metrics", + Type: "remap", + Inputs: []string{"cassandra_metrics_raw"}, + Config: `source = ''' +namespace, err = get_env_var("NAMESPACE") +if err == null { + .namespace = namespace +} +''' +`, + } + + transformers = append(transformers, metricsParser) + systemLogSink := telemetry.VectorSinkSpec{ Name: "console_log", Type: "console", diff --git a/pkg/telemetry/vector_test.go b/pkg/telemetry/vector_test.go index 0031162ea..9bef3b06a 100644 --- a/pkg/telemetry/vector_test.go +++ b/pkg/telemetry/vector_test.go @@ -220,11 +220,22 @@ mode = "halt_before" timeout_ms = 10000 -[sources.cassandra_metrics] +[sources.cassandra_metrics_raw] type = "prometheus_scrape" endpoints = [ "http://localhost:9000/metrics" ] scrape_interval_secs = 30 +[transforms.cassandra_metrics] +type = "remap" +inputs = ["cassandra_metrics_raw"] +source = ''' +namespace, err = get_env_var("NAMESPACE") +if err == null { + .namespace = namespace +} +''' + + [sinks.console] type = "console" inputs = ["cassandra_metrics"] @@ -246,7 +257,7 @@ func TestDefaultRemoveUnusedSources(t *testing.T) { assert := assert.New(t) sources, transformers, sinks := BuildDefaultVectorComponents(vector.VectorConfig{}) assert.Equal(2, len(sources)) - assert.Equal(1, len(transformers)) + assert.Equal(2, len(transformers)) assert.Equal(1, len(sinks)) sources, transformers, sinks = FilterUnusedPipelines(sources, transformers, sinks) @@ -260,7 +271,7 @@ func TestRemoveUnusedSourcesModified(t *testing.T) { assert := assert.New(t) sources, transformers, sinks := BuildDefaultVectorComponents(vector.VectorConfig{}) assert.Equal(2, len(sources)) - assert.Equal(1, len(transformers)) + assert.Equal(2, len(transformers)) assert.Equal(1, len(sinks)) sinks = append(sinks, telemetry.VectorSinkSpec{Name: "a", Inputs: []string{"cassandra_metrics"}}) @@ -268,7 +279,7 @@ func TestRemoveUnusedSourcesModified(t *testing.T) { sources, transformers, sinks = FilterUnusedPipelines(sources, transformers, sinks) assert.Equal(2, len(sources)) - assert.Equal(0, len(transformers)) + assert.Equal(1, len(transformers)) assert.Equal(2, len(sinks)) } @@ -335,7 +346,7 @@ func TestOverrideSourcePossible(t *testing.T) { assert := assert.New(t) sources, transformers, sinks := BuildDefaultVectorComponents(vector.VectorConfig{}) assert.Equal(2, len(sources)) - assert.Equal(1, len(transformers)) + assert.Equal(2, len(transformers)) assert.Equal(1, len(sinks)) newSources := []telemetry.VectorSourceSpec{