Skip to content

Commit

Permalink
Namespace metrics label (#1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski authored Oct 12, 2023
1 parent 948883f commit d6964bc
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-1.10.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ When cutting a new release, update the `unreleased` heading to the tag being gen

## unreleased

* [ENHANCEMENT] [#1073](https://github.com/k8ssandra/k8ssandra-operator/issues/1073) Add a namespace label to the Cassandra metrics
* [BUGFIX] [#1060](https://github.com/k8ssandra/k8ssandra-operator/issues/1060) Fix restore mapping shuffling nodes when restoring in place
* [BUGFIX] [#1061](https://github.com/k8ssandra/k8ssandra-operator/issues/1061) Point to cass-config-builder 1.0.7 for arm64 compatibility
* [ENHANCEMENT] [#956](https://github.com/k8ssandra/k8ssandra-operator/issues/956) Enable linting in the project
8 changes: 7 additions & 1 deletion config/cass-operator/cluster-scoped/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
- github.com/k8ssandra/cass-operator/config/deployments/cluster?ref=v1.17.2
- github.com/k8ssandra/cass-operator/config/deployments/cluster?ref=master

components:
- ../../components/cass-operator-image-config

# TODO: remove these image changes before release so that they don't pull a dev release of cass-operator.
# This is required when a new feature of cass-operator is needed that is not yet released.
images:
- name: k8ssandra/cass-operator
newTag: v1.18.0-dev.34e2ae6-20231002
10 changes: 8 additions & 2 deletions config/cass-operator/ns-scoped/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
- github.com/k8ssandra/cass-operator/config/deployments/default?ref=v1.17.2
- github.com/k8ssandra/cass-operator/config/deployments/default?ref=master

components:
- ../../components/cass-operator-image-config
- ../../components/cass-operator-image-config

# TODO: remove these image changes before release so that they don't pull a dev release of cass-operator.
# This is required when a new feature of cass-operator is needed that is not yet released.
images:
- name: k8ssandra/cass-operator
newTag: v1.18.0-dev.34e2ae6-20231002
56 changes: 30 additions & 26 deletions docs/content/en/tasks/monitor/vector/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,57 +42,58 @@ The following content will be added automatically to the vector.toml file:
type = "file"
include = [ "/var/log/cassandra/system.log" ]
read_from = "beginning"
fingerprint.strategy = "device_and_inode
fingerprint.strategy = "device_and_inode"
[sources.systemlog.multiline]
start_pattern = "^(INFO|WARN|ERROR|DEBUG|TRACE|FATAL)"
condition_pattern = "^(INFO|WARN|ERROR|DEBUG|TRACE|FATAL)"
mode = "halt_before"
timeout_ms = 10000

[sources.cassandra_metrics]
type = "prometheus_scrape"
endpoints = [ "http://localhost:{{ .ScrapePort }}" ]
scrape_interval_secs = {{ .ScrapeInterval }}

[transforms.parse_cassandra_log]
type = "remap"
inputs = [ "systemlog" ]
source = '''
del(.source_type)
. |= parse_groks!(.message, patterns: [
"%{LOGLEVEL:loglevel}\\s+\\[(?<thread>((.+)))\\]\\s+%{TIMESTAMP_ISO8601:timestamp}\\s+%{JAVACLASS:class}:%{NUMBER:line}\\s+-\\s+(?<message>(.+\\n?)+)",
]
)
pod_name, err = get_env_var("POD_NAME")
if err == null {
.pod_name = pod_name

node_name, err = get_env_var("NODE_NAME")
if err == null {
.node_name = node_name

cluster, err = get_env_var("CLUSTER_NAME")
if err == null {
.cluster = cluster

datacenter, err = get_env_var("DATACENTER_NAME")
if err == null {
.datacenter = datacenter
[sources.cassandra_metrics_raw]
type = "prometheus_scrape"
endpoints = [ "http://localhost:{{ .ScrapePort }}" ]
scrape_interval_secs = {{ .ScrapeInterval }}

rack, err = get_env_var("RACK_NAME")
[transforms.cassandra_metrics]
type = "remap"
inputs = ["cassandra_metrics_raw"]
source = '''
namespace, err = get_env_var("NAMESPACE")
if err == null {
.rack = rack
.namespace = namespace
}
'''
[sinks.console]
[sinks.console_output]
type = "console"
inputs = ["cassandra_metrics"]
target = "stdout"
[sinks.console_output.encoding]
codec = "json"
[sinks.prometheus]
type = "prometheus_exporter"
inputs = ["cassandra_metrics"]
[sinks.console_log]
type = "console"
inputs = ["systemlog"]
target = "stdout"
encoding.codec = "text"
```
The default options are always added to the configuration, but one may override them and if not used, they're automatically cleaned up (see next section).
The default options are always added to the configuration, but one may override them and if not used, they're automatically cleaned up (see next section).
The `cassandra_metrics` transform adds the namespace of the datacenter to the exposed metrics and should be used as the input for any transform or sink that would modify or route the metrics to a remote system.

## Automated cleanup of unused sources

Expand All @@ -105,6 +106,9 @@ They can be used as input in custom components added through configuration.

`systemlog` input is defined as the default source for Cassandra logs.

We provide the `parse_cassandra_log` transform out of the box because it's likely to be a common need for users who ship the logs to a remote system such as Grafana Loki; however by default we don't use it and will be filtered out unless it's referenced by a custom transform/sink.
This transform will parse the Cassandra logs and extract the log level, thread, timestamp, class, line and message fields. It will also remove the `source_type` field which is added by the `systemlog` source.

## Custom Vector configuration

To customize the Vector configuration, you can add [sources](https://vector.dev/docs/reference/configuration/sources/), [transforms](https://vector.dev/docs/reference/configuration/transforms/) and [sinks](https://vector.dev/docs/reference/configuration/sinks/) in a semi-structured way under `.spec.cassandra.telemetry.vector.components`, `.spec.reaper.telemetry.vector.components` and `.spec.stargate.telemetry.vector.components`:
Expand Down
22 changes: 21 additions & 1 deletion pkg/telemetry/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ timeout_ms = 10000
}

metricsInput := telemetry.VectorSourceSpec{
Name: "cassandra_metrics",
Name: "cassandra_metrics_raw",
Type: "prometheus_scrape",
Config: fmt.Sprintf("endpoints = [ \"http://localhost:%v%s\" ]\nscrape_interval_secs = %v", config.ScrapePort, config.ScrapeEndpoint, config.ScrapeInterval),
}
Expand Down Expand Up @@ -157,12 +157,32 @@ rack, err = get_env_var("RACK_NAME")
if err == null {
.rack = rack
}
namespace, err = get_env_var("NAMESPACE")
if err == null {
.namespace = namespace
}
'''
`,
}

transformers = append(transformers, systemLogParser)

// Add the namespace label to the Cassandra metrics
metricsParser := telemetry.VectorTransformSpec{
Name: "cassandra_metrics",
Type: "remap",
Inputs: []string{"cassandra_metrics_raw"},
Config: `source = '''
namespace, err = get_env_var("NAMESPACE")
if err == null {
.namespace = namespace
}
'''
`,
}

transformers = append(transformers, metricsParser)

systemLogSink := telemetry.VectorSinkSpec{
Name: "console_log",
Type: "console",
Expand Down
21 changes: 16 additions & 5 deletions pkg/telemetry/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,22 @@ mode = "halt_before"
timeout_ms = 10000
[sources.cassandra_metrics]
[sources.cassandra_metrics_raw]
type = "prometheus_scrape"
endpoints = [ "http://localhost:9000/metrics" ]
scrape_interval_secs = 30
[transforms.cassandra_metrics]
type = "remap"
inputs = ["cassandra_metrics_raw"]
source = '''
namespace, err = get_env_var("NAMESPACE")
if err == null {
.namespace = namespace
}
'''
[sinks.console]
type = "console"
inputs = ["cassandra_metrics"]
Expand All @@ -246,7 +257,7 @@ func TestDefaultRemoveUnusedSources(t *testing.T) {
assert := assert.New(t)
sources, transformers, sinks := BuildDefaultVectorComponents(vector.VectorConfig{})
assert.Equal(2, len(sources))
assert.Equal(1, len(transformers))
assert.Equal(2, len(transformers))
assert.Equal(1, len(sinks))

sources, transformers, sinks = FilterUnusedPipelines(sources, transformers, sinks)
Expand All @@ -260,15 +271,15 @@ func TestRemoveUnusedSourcesModified(t *testing.T) {
assert := assert.New(t)
sources, transformers, sinks := BuildDefaultVectorComponents(vector.VectorConfig{})
assert.Equal(2, len(sources))
assert.Equal(1, len(transformers))
assert.Equal(2, len(transformers))
assert.Equal(1, len(sinks))

sinks = append(sinks, telemetry.VectorSinkSpec{Name: "a", Inputs: []string{"cassandra_metrics"}})

sources, transformers, sinks = FilterUnusedPipelines(sources, transformers, sinks)

assert.Equal(2, len(sources))
assert.Equal(0, len(transformers))
assert.Equal(1, len(transformers))
assert.Equal(2, len(sinks))
}

Expand Down Expand Up @@ -335,7 +346,7 @@ func TestOverrideSourcePossible(t *testing.T) {
assert := assert.New(t)
sources, transformers, sinks := BuildDefaultVectorComponents(vector.VectorConfig{})
assert.Equal(2, len(sources))
assert.Equal(1, len(transformers))
assert.Equal(2, len(transformers))
assert.Equal(1, len(sinks))

newSources := []telemetry.VectorSourceSpec{
Expand Down

0 comments on commit d6964bc

Please sign in to comment.