diff --git a/src/main/java/io/tealc/Constants.java b/src/main/java/io/tealc/Constants.java
new file mode 100644
index 0000000..59a3fc5
--- /dev/null
+++ b/src/main/java/io/tealc/Constants.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright Tealc authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.tealc;
+
+/**
+ * Interface for keep global constants used across system tests.
+ */
+public interface Constants {
+
+ /**
+ * Basic paths to examples
+ */
+ String PATH_TO_EXAMPLES = Utils.USER_PATH + "/src/main/resources";
+ String PATH_TO_PACKAGING_INSTALL_FILES = Utils.USER_PATH + "/../packaging/install";
+
+ /**
+ * File paths for metrics YAMLs
+ */
+ String PATH_TO_KAFKA_METRICS_CONFIG = PATH_TO_EXAMPLES + "/kafka/kafka.yaml";
+ String PATH_TO_METRICS_CM = PATH_TO_EXAMPLES + "/kafka/metrics.yaml";
+
+ String KAFKA_METRICS_CONFIG_MAP_SUFFIX = "-kafka-metrics";
+
+ /**
+ * Strimzi domain used for the Strimzi labels
+ */
+ String STRIMZI_DOMAIN = "strimzi.io/";
+
+ /**
+ * Kubernetes domain used for Kubernetes labels
+ */
+ String KUBERNETES_DOMAIN = "app.kubernetes.io/";
+
+ /**
+ * The kind of a Kubernetes / OpenShift Resource. It contains the same value as the Kind of the corresponding
+ * Custom Resource. It should have on of the following values:
+ *
+ *
+ * - Kafka
+ * - KafkaConnect
+ * - KafkaMirrorMaker
+ * - KafkaBridge
+ * - KafkaUser
+ * - KafkaTopic
+ *
+ */
+ String STRIMZI_KIND_LABEL = STRIMZI_DOMAIN + "kind";
+
+ /**
+ * The Strimzi cluster the resource is part of. This is typically the name of the custom resource.
+ */
+ String STRIMZI_CLUSTER_LABEL = STRIMZI_DOMAIN + "cluster";
+ /**
+ * Annotation for enabling or disabling the Node Pools. This annotation is used on the Kafka CR
+ */
+ String ANNO_STRIMZI_IO_NODE_POOLS = STRIMZI_DOMAIN + "node-pools";
+}
diff --git a/src/main/java/io/tealc/Environment.java b/src/main/java/io/tealc/Environment.java
index 76f370c..7bca21a 100644
--- a/src/main/java/io/tealc/Environment.java
+++ b/src/main/java/io/tealc/Environment.java
@@ -8,6 +8,7 @@
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
@@ -28,6 +29,7 @@ public class Environment {
public static final String WORKER_03_USERNAME_ENV = "WORKER_03_USERNAME";
public static final String WORKER_03_PASSWORD_ENV = "WORKER_03_PASSWORD";
public static final String WORKER_03_URL_ENV = "WORKER_03_URL";
+ public static final String KAFKA_VERSION_ENV = "KAFKA_VERSION";
/**
* Set values
@@ -41,6 +43,7 @@ public class Environment {
public static final String WORKER_03_USERNAME = getOrDefault(WORKER_03_USERNAME_ENV, null);
public static final String WORKER_03_PASSWORD = getOrDefault(WORKER_03_PASSWORD_ENV, null);
public static final String WORKER_03_URL = getOrDefault(WORKER_03_URL_ENV, null);
+ public static final String KAFKA_VERSION = getOrDefault(KAFKA_VERSION_ENV, "3.5.1");
private Environment() { }
@@ -49,7 +52,13 @@ private Environment() { }
LOGGER.info("Used environment variables:");
VALUES.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
- .forEach(entry -> LOGGER.info(debugFormat, entry.getKey(), entry.getValue()));
+ .forEach(entry -> {
+ if (!entry.getKey().toLowerCase(Locale.ROOT).contains("pass")) {
+ LOGGER.info(debugFormat, entry.getKey(), entry.getValue());
+ } else {
+ LOGGER.info(debugFormat, entry.getKey(), "*****");
+ }
+ });
}
private static String getOrDefault(String varName, String defaultValue) {
diff --git a/src/main/java/io/tealc/KubeClient.java b/src/main/java/io/tealc/KubeClient.java
index f55606d..4d6531d 100644
--- a/src/main/java/io/tealc/KubeClient.java
+++ b/src/main/java/io/tealc/KubeClient.java
@@ -26,9 +26,11 @@
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.KafkaList;
import io.strimzi.api.kafka.KafkaMirrorMaker2List;
+import io.strimzi.api.kafka.KafkaNodePoolList;
import io.strimzi.api.kafka.KafkaRebalanceList;
import io.strimzi.api.kafka.KafkaTopicList;
import io.strimzi.api.kafka.KafkaUserList;
+import io.strimzi.api.kafka.StrimziPodSetList;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaConnector;
@@ -36,6 +38,8 @@
import io.strimzi.api.kafka.model.KafkaRebalance;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.KafkaUser;
+import io.strimzi.api.kafka.model.StrimziPodSet;
+import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -309,4 +313,12 @@ public MixedOperation> kafkaRebalanceClient() {
return Crds.kafkaRebalanceOperation(client);
}
+
+ public MixedOperation> strimziPodSetClient() {
+ return Crds.strimziPodSetOperation(client);
+ }
+
+ public MixedOperation> kafkaNodePoolClient() {
+ return Crds.kafkaNodePoolOperation(client);
+ }
}
diff --git a/src/main/java/io/tealc/Utils.java b/src/main/java/io/tealc/Utils.java
new file mode 100644
index 0000000..0a60c56
--- /dev/null
+++ b/src/main/java/io/tealc/Utils.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright Tealc authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.tealc;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.InvalidFormatException;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BooleanSupplier;
+
+public class Utils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+ public static final String USER_PATH = System.getProperty("user.dir");
+ public static T configFromYaml(String yamlPath, Class c) {
+ return configFromYaml(new File(yamlPath), c);
+ }
+
+ public static ConfigMap configMapFromYaml(String yamlPath, String name) {
+ try {
+ YAMLFactory yaml = new YAMLFactory();
+ ObjectMapper mapper = new ObjectMapper(yaml);
+ YAMLParser yamlParser = yaml.createParser(new File(yamlPath));
+ List list = mapper.readValues(yamlParser, new TypeReference() { }).readAll();
+ Optional cmOpt = list.stream().filter(cm -> "ConfigMap".equals(cm.getKind()) && name.equals(cm.getMetadata().getName())).findFirst();
+ if (cmOpt.isPresent()) {
+ return cmOpt.get();
+ } else {
+ LOGGER.warn("ConfigMap {} not found in file {}", name, yamlPath);
+ return null;
+ }
+ } catch (InvalidFormatException e) {
+ throw new IllegalArgumentException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public static T configFromYaml(File yamlFile, Class c) {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ try {
+ return mapper.readValue(yamlFile, c);
+ } catch (InvalidFormatException e) {
+ throw new IllegalArgumentException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Poll the given {@code ready} function every {@code pollIntervalMs} milliseconds until it returns true,
+ * or throw a WaitException if it doesn't return true within {@code timeoutMs} milliseconds.
+ * @return The remaining time left until timeout occurs
+ * (helpful if you have several calls which need to share a common timeout),
+ * */
+ public static long waitFor(String description, long pollIntervalMs, long timeoutMs, BooleanSupplier ready) {
+ return waitFor(description, pollIntervalMs, timeoutMs, ready, () -> { });
+ }
+
+ public static long waitFor(String description, long pollIntervalMs, long timeoutMs, BooleanSupplier ready, Runnable onTimeout) {
+ LOGGER.debug("Waiting for {}", description);
+ long deadline = System.currentTimeMillis() + timeoutMs;
+
+ String exceptionMessage = null;
+ String previousExceptionMessage = null;
+
+ // in case we are polling every 1s, we want to print exception after x tries, not on the first try
+ // for minutes poll interval will 2 be enough
+ int exceptionAppearanceCount = Duration.ofMillis(pollIntervalMs).toMinutes() > 0 ? 2 : Math.max((int) (timeoutMs / pollIntervalMs) / 4, 2);
+ int exceptionCount = 0;
+ int newExceptionAppearance = 0;
+
+ StringWriter stackTraceError = new StringWriter();
+
+ while (true) {
+ boolean result;
+ try {
+ result = ready.getAsBoolean();
+ } catch (Exception e) {
+ exceptionMessage = e.getMessage();
+
+ if (++exceptionCount == exceptionAppearanceCount && exceptionMessage != null && exceptionMessage.equals(previousExceptionMessage)) {
+ LOGGER.error("While waiting for {} exception occurred: {}", description, exceptionMessage);
+ // log the stacktrace
+ e.printStackTrace(new PrintWriter(stackTraceError));
+ } else if (exceptionMessage != null && !exceptionMessage.equals(previousExceptionMessage) && ++newExceptionAppearance == 2) {
+ previousExceptionMessage = exceptionMessage;
+ }
+
+ result = false;
+ }
+ long timeLeft = deadline - System.currentTimeMillis();
+ if (result) {
+ return timeLeft;
+ }
+ if (timeLeft <= 0) {
+ if (exceptionCount > 1) {
+ LOGGER.error("Exception waiting for {}, {}", description, exceptionMessage);
+
+ if (!stackTraceError.toString().isEmpty()) {
+ // printing handled stacktrace
+ LOGGER.error(stackTraceError.toString());
+ }
+ }
+ onTimeout.run();
+ WaitException waitException = new WaitException("Timeout after " + timeoutMs + " ms waiting for " + description);
+ waitException.printStackTrace();
+ throw waitException;
+ }
+ long sleepTime = Math.min(pollIntervalMs, timeLeft);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("{} not ready, will try again in {} ms ({}ms till timeout)", description, sleepTime, timeLeft);
+ }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ return deadline - System.currentTimeMillis();
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/io/tealc/WaitException.java b/src/main/java/io/tealc/WaitException.java
new file mode 100644
index 0000000..00f9bda
--- /dev/null
+++ b/src/main/java/io/tealc/WaitException.java
@@ -0,0 +1,15 @@
+/*
+ * Copyright Tealc authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.tealc;
+
+public class WaitException extends RuntimeException {
+ public WaitException(String message) {
+ super(message);
+ }
+
+ public WaitException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/src/main/java/io/tealc/templates/ConfigMapTemplates.java b/src/main/java/io/tealc/templates/ConfigMapTemplates.java
new file mode 100644
index 0000000..8ff0598
--- /dev/null
+++ b/src/main/java/io/tealc/templates/ConfigMapTemplates.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright Tealc authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.tealc.templates;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.tealc.Constants;
+import io.tealc.Utils;
+
+public class ConfigMapTemplates {
+ private ConfigMapTemplates() {}
+
+ public static ConfigMapBuilder kafkaConfigMap(String namespace, String clusterName) {
+ ConfigMap configMap = getConfigMapFromYaml(Constants.PATH_TO_METRICS_CM);
+ return new ConfigMapBuilder(configMap)
+ .editMetadata()
+ .withName(clusterName + Constants.KAFKA_METRICS_CONFIG_MAP_SUFFIX)
+ .withNamespace(namespace)
+ .endMetadata();
+ }
+
+ private static ConfigMap getConfigMapFromYaml(String yamlPath) {
+ return Utils.configFromYaml(yamlPath, ConfigMap.class);
+ }
+}
diff --git a/src/main/java/io/tealc/templates/KafkaNodePoolTemplates.java b/src/main/java/io/tealc/templates/KafkaNodePoolTemplates.java
new file mode 100644
index 0000000..3600fc5
--- /dev/null
+++ b/src/main/java/io/tealc/templates/KafkaNodePoolTemplates.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright Tealc authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.tealc.templates;
+
+import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
+import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
+import io.tealc.Constants;
+
+import java.util.Map;
+
+public class KafkaNodePoolTemplates {
+
+ private KafkaNodePoolTemplates() {}
+
+ public static KafkaNodePoolBuilder defaultKafkaNodePool(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
+ return new KafkaNodePoolBuilder()
+ .withNewMetadata()
+ .withNamespace(namespaceName)
+ .withName(nodePoolName)
+ .withLabels(Map.of(Constants.STRIMZI_CLUSTER_LABEL, kafkaClusterName))
+ .endMetadata()
+ .withNewSpec()
+ .withReplicas(kafkaReplicas)
+ .endSpec();
+ }
+
+ public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
+ return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
+ .editOrNewSpec()
+ .addToRoles(ProcessRoles.BROKER)
+ .endSpec();
+ }
+}
diff --git a/src/main/java/io/tealc/templates/KafkaTemplates.java b/src/main/java/io/tealc/templates/KafkaTemplates.java
new file mode 100644
index 0000000..3e11d45
--- /dev/null
+++ b/src/main/java/io/tealc/templates/KafkaTemplates.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright Tealc authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.tealc.templates;
+
+import io.strimzi.api.kafka.model.Kafka;
+import io.strimzi.api.kafka.model.KafkaBuilder;
+import io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListenerBuilder;
+import io.strimzi.api.kafka.model.listener.arraylistener.KafkaListenerType;
+import io.tealc.Constants;
+import io.tealc.Environment;
+import io.tealc.Utils;
+
+public class KafkaTemplates {
+
+ private KafkaTemplates() {}
+
+ private static final String KAFKA_METRICS_CONFIG_REF_KEY = "kafka-metrics-config.yml";
+ private static final String ZOOKEEPER_METRICS_CONFIG_REF_KEY = "zookeeper-metrics-config.yml";
+ private static final String CRUISE_CONTROL_METRICS_CONFIG_REF_KEY = "cruise-control-metrics-config.yml";
+
+ public static KafkaBuilder kafka(String namespace, String name, int kafkaReplicas, int zookeeperReplicas) {
+ Kafka kafka = getKafkaFromYaml(Constants.PATH_TO_KAFKA_METRICS_CONFIG);
+ String metricsConfigMapName = name + Constants.KAFKA_METRICS_CONFIG_MAP_SUFFIX;
+ return defaultKafka(kafka, namespace, name, kafkaReplicas, zookeeperReplicas)
+ .editOrNewMetadata()
+ .withNamespace(namespace)
+ .endMetadata()
+ .editSpec()
+ .withNewKafkaExporter()
+ .endKafkaExporter()
+ .editKafka()
+ .withNewJmxPrometheusExporterMetricsConfig()
+ .withNewValueFrom()
+ .withNewConfigMapKeyRef(KAFKA_METRICS_CONFIG_REF_KEY, metricsConfigMapName, false)
+ .endValueFrom()
+ .endJmxPrometheusExporterMetricsConfig()
+ .endKafka()
+ .editZookeeper()
+ .withNewJmxPrometheusExporterMetricsConfig()
+ .withNewValueFrom()
+ .withNewConfigMapKeyRef(ZOOKEEPER_METRICS_CONFIG_REF_KEY, metricsConfigMapName, false)
+ .endValueFrom()
+ .endJmxPrometheusExporterMetricsConfig()
+ .endZookeeper()
+ .editCruiseControl()
+ // Extend active users tasks as we
+ .addToConfig("max.active.user.tasks", 10)
+ .withNewJmxPrometheusExporterMetricsConfig()
+ .withNewValueFrom()
+ .withNewConfigMapKeyRef(CRUISE_CONTROL_METRICS_CONFIG_REF_KEY, metricsConfigMapName, false)
+ .endValueFrom()
+ .endJmxPrometheusExporterMetricsConfig()
+ .endCruiseControl()
+ .endSpec();
+ }
+
+ private static KafkaBuilder defaultKafka(Kafka kafka, String namespace, String name, int kafkaReplicas, int zookeeperReplicas) {
+ KafkaBuilder kb = new KafkaBuilder(kafka)
+ .withNewMetadata()
+ .withName(name)
+ .withNamespace(namespace)
+ .addToAnnotations(Constants.ANNO_STRIMZI_IO_NODE_POOLS, "enabled")
+ .addToLabels("mode", "kraft")
+ .endMetadata()
+ .editSpec()
+ .editKafka()
+ .withVersion(Environment.KAFKA_VERSION)
+ .withReplicas(kafkaReplicas)
+ .addToConfig("log.message.format.version", Environment.KAFKA_VERSION)
+ .addToConfig("inter.broker.protocol.version", Environment.KAFKA_VERSION)
+ .addToConfig("offsets.topic.replication.factor", Math.min(kafkaReplicas, 3))
+ .addToConfig("transaction.state.log.min.isr", Math.min(kafkaReplicas, 2))
+ .addToConfig("transaction.state.log.replication.factor", Math.min(kafkaReplicas, 3))
+ .addToConfig("default.replication.factor", Math.min(kafkaReplicas, 3))
+ .addToConfig("min.insync.replicas", Math.min(Math.max(kafkaReplicas - 1, 1), 2))
+ .withListeners(new GenericKafkaListenerBuilder()
+ .withName("plain")
+ .withPort(9092)
+ .withType(KafkaListenerType.INTERNAL)
+ .withTls(false)
+ .build(),
+ new GenericKafkaListenerBuilder()
+ .withName("tls")
+ .withPort(9093)
+ .withType(KafkaListenerType.INTERNAL)
+ .withTls(true)
+ .build())
+ .withNewInlineLogging()
+ .addToLoggers("kafka.root.logger.level", "DEBUG")
+ .endInlineLogging()
+ .endKafka()
+ .editZookeeper()
+ .withReplicas(zookeeperReplicas)
+ .withNewInlineLogging()
+ .addToLoggers("zookeeper.root.logger", "DEBUG")
+ .endInlineLogging()
+ .endZookeeper()
+ .editEntityOperator()
+ .editUserOperator()
+ .withNewInlineLogging()
+ .addToLoggers("rootLogger.level", "DEBUG")
+ .endInlineLogging()
+ .endUserOperator()
+ .editTopicOperator()
+ .withNewInlineLogging()
+ .addToLoggers("rootLogger.level", "DEBUG")
+ .endInlineLogging()
+ .endTopicOperator()
+ .endEntityOperator()
+ .endSpec();
+
+ return kb;
+ }
+
+ private static Kafka getKafkaFromYaml(String yamlPath) {
+ return Utils.configFromYaml(yamlPath, Kafka.class);
+ }
+}
diff --git a/src/main/resources/kafka/kafka.yaml b/src/main/resources/kafka/kafka.yaml
new file mode 100644
index 0000000..a27f251
--- /dev/null
+++ b/src/main/resources/kafka/kafka.yaml
@@ -0,0 +1,48 @@
+apiVersion: kafka.strimzi.io/v1beta2
+kind: Kafka
+metadata:
+ name: my-cluster
+ annotations:
+ strimzi.io/node-pools: enabled
+spec:
+ kafka:
+ version: 3.5.1
+ # The replicas field is required by the Kafka CRD schema while the KafkaNodePools feature gate is in alpha phase.
+ # But it will be ignored when Kafka Node Pools are used
+ replicas: 3
+ listeners:
+ - name: plain
+ port: 9092
+ type: internal
+ tls: false
+ - name: tls
+ port: 9093
+ type: internal
+ tls: true
+ config:
+ offsets.topic.replication.factor: 3
+ transaction.state.log.replication.factor: 3
+ transaction.state.log.min.isr: 2
+ default.replication.factor: 3
+ min.insync.replicas: 2
+ inter.broker.protocol.version: "3.5"
+ # The storage field is required by the Kafka CRD schema while the KafkaNodePools feature gate is in alpha phase.
+ # But it will be ignored when Kafka Node Pools are used
+ storage:
+ type: jbod
+ volumes:
+ - id: 0
+ type: persistent-claim
+ size: 100Gi
+ deleteClaim: false
+ zookeeper:
+ replicas: 3
+ storage:
+ type: persistent-claim
+ size: 100Gi
+ deleteClaim: false
+ entityOperator:
+ topicOperator: {}
+ userOperator: {}
+ kafkaExporter: {}
+ cruiseControl: {}
diff --git a/src/main/resources/kafka/metrics.yaml b/src/main/resources/kafka/metrics.yaml
new file mode 100644
index 0000000..490ad30
--- /dev/null
+++ b/src/main/resources/kafka/metrics.yaml
@@ -0,0 +1,190 @@
+kind: ConfigMap
+apiVersion: v1
+metadata:
+ name: kafka-metrics
+ labels:
+ app: strimzi
+data:
+ kafka-metrics-config.yml: |
+ # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
+ lowercaseOutputName: true
+ rules:
+ # Special cases and very specific rules
+ - pattern: kafka.server<>Value
+ name: kafka_server_$1_$2
+ type: GAUGE
+ labels:
+ clientId: "$3"
+ topic: "$4"
+ partition: "$5"
+ - pattern: kafka.server<>Value
+ name: kafka_server_$1_$2
+ type: GAUGE
+ labels:
+ clientId: "$3"
+ broker: "$4:$5"
+ - pattern: kafka.server<>connections
+ name: kafka_server_$1_connections_tls_info
+ type: GAUGE
+ labels:
+ cipher: "$2"
+ protocol: "$3"
+ listener: "$4"
+ networkProcessor: "$5"
+ - pattern: kafka.server<>connections
+ name: kafka_server_$1_connections_software
+ type: GAUGE
+ labels:
+ clientSoftwareName: "$2"
+ clientSoftwareVersion: "$3"
+ listener: "$4"
+ networkProcessor: "$5"
+ - pattern: "kafka.server<>(.+):"
+ name: kafka_server_$1_$4
+ type: GAUGE
+ labels:
+ listener: "$2"
+ networkProcessor: "$3"
+ - pattern: kafka.server<>(.+)
+ name: kafka_server_$1_$4
+ type: GAUGE
+ labels:
+ listener: "$2"
+ networkProcessor: "$3"
+ # Some percent metrics use MeanRate attribute
+ # Ex) kafka.server<>MeanRate
+ - pattern: kafka.(\w+)<>MeanRate
+ name: kafka_$1_$2_$3_percent
+ type: GAUGE
+ # Generic gauges for percents
+ - pattern: kafka.(\w+)<>Value
+ name: kafka_$1_$2_$3_percent
+ type: GAUGE
+ - pattern: kafka.(\w+)<>Value
+ name: kafka_$1_$2_$3_percent
+ type: GAUGE
+ labels:
+ "$4": "$5"
+ # Generic per-second counters with 0-2 key/value pairs
+ - pattern: kafka.(\w+)<>Count
+ name: kafka_$1_$2_$3_total
+ type: COUNTER
+ labels:
+ "$4": "$5"
+ "$6": "$7"
+ - pattern: kafka.(\w+)<>Count
+ name: kafka_$1_$2_$3_total
+ type: COUNTER
+ labels:
+ "$4": "$5"
+ - pattern: kafka.(\w+)<>Count
+ name: kafka_$1_$2_$3_total
+ type: COUNTER
+ # Generic gauges with 0-2 key/value pairs
+ - pattern: kafka.(\w+)<>Value
+ name: kafka_$1_$2_$3
+ type: GAUGE
+ labels:
+ "$4": "$5"
+ "$6": "$7"
+ - pattern: kafka.(\w+)<>Value
+ name: kafka_$1_$2_$3
+ type: GAUGE
+ labels:
+ "$4": "$5"
+ - pattern: kafka.(\w+)<>Value
+ name: kafka_$1_$2_$3
+ type: GAUGE
+ # Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
+ # Note that these are missing the '_sum' metric!
+ - pattern: kafka.(\w+)<>Count
+ name: kafka_$1_$2_$3_count
+ type: COUNTER
+ labels:
+ "$4": "$5"
+ "$6": "$7"
+ - pattern: kafka.(\w+)<>(\d+)thPercentile
+ name: kafka_$1_$2_$3
+ type: GAUGE
+ labels:
+ "$4": "$5"
+ "$6": "$7"
+ quantile: "0.$8"
+ - pattern: kafka.(\w+)<>Count
+ name: kafka_$1_$2_$3_count
+ type: COUNTER
+ labels:
+ "$4": "$5"
+ - pattern: kafka.(\w+)<>(\d+)thPercentile
+ name: kafka_$1_$2_$3
+ type: GAUGE
+ labels:
+ "$4": "$5"
+ quantile: "0.$6"
+ - pattern: kafka.(\w+)<>Count
+ name: kafka_$1_$2_$3_count
+ type: COUNTER
+ - pattern: kafka.(\w+)<>(\d+)thPercentile
+ name: kafka_$1_$2_$3
+ type: GAUGE
+ labels:
+ quantile: "0.$4"
+ # KRaft mode: uncomment the following lines to export KRaft related metrics
+ # KRaft overall related metrics
+ # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
+ #- pattern: "kafka.server<>(.+-total|.+-max):"
+ # name: kafka_server_raftmetrics_$1
+ # type: COUNTER
+ #- pattern: "kafka.server<>(.+):"
+ # name: kafka_server_raftmetrics_$1
+ # type: GAUGE
+ # KRaft "low level" channels related metrics
+ # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
+ #- pattern: "kafka.server<>(.+-total|.+-max):"
+ # name: kafka_server_raftchannelmetrics_$1
+ # type: COUNTER
+ #- pattern: "kafka.server<>(.+):"
+ # name: kafka_server_raftchannelmetrics_$1
+ # type: GAUGE
+ # Broker metrics related to fetching metadata topic records in KRaft mode
+ #- pattern: "kafka.server<>(.+):"
+ # name: kafka_server_brokermetadatametrics_$1
+ # type: GAUGE
+ zookeeper-metrics-config.yml: |
+ # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
+ lowercaseOutputName: true
+ rules:
+ # replicated Zookeeper
+ - pattern: "org.apache.ZooKeeperService<>(\\w+)"
+ name: "zookeeper_$2"
+ type: GAUGE
+ - pattern: "org.apache.ZooKeeperService<>(\\w+)"
+ name: "zookeeper_$3"
+ type: GAUGE
+ labels:
+ replicaId: "$2"
+ - pattern: "org.apache.ZooKeeperService<>(Packets\\w+)"
+ name: "zookeeper_$4"
+ type: COUNTER
+ labels:
+ replicaId: "$2"
+ memberType: "$3"
+ - pattern: "org.apache.ZooKeeperService<>(\\w+)"
+ name: "zookeeper_$4"
+ type: GAUGE
+ labels:
+ replicaId: "$2"
+ memberType: "$3"
+ - pattern: "org.apache.ZooKeeperService<>(\\w+)"
+ name: "zookeeper_$4_$5"
+ type: GAUGE
+ labels:
+ replicaId: "$2"
+ memberType: "$3"
+ cruise-control-metrics-config.yml: |
+ # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
+ lowercaseOutputName: true
+ rules:
+ - pattern: kafka.cruisecontrol<>(\w+)
+ name: kafka_cruisecontrol_$1_$2
+ type: GAUGE
diff --git a/src/main/resources/kafka/nodepool.yaml b/src/main/resources/kafka/nodepool.yaml
new file mode 100644
index 0000000..1169334
--- /dev/null
+++ b/src/main/resources/kafka/nodepool.yaml
@@ -0,0 +1,17 @@
+apiVersion: kafka.strimzi.io/v1beta2
+kind: KafkaNodePool
+metadata:
+ name: pool-a
+ labels:
+ strimzi.io/cluster: my-cluster
+spec:
+ replicas: 3
+ roles:
+ - broker
+ storage:
+ type: jbod
+ volumes:
+ - id: 0
+ type: persistent-claim
+ size: 100Gi
+ deleteClaim: false
diff --git a/src/test/java/io/tealc/worker01/kafka/KafkaMaintenance.java b/src/test/java/io/tealc/worker01/kafka/KafkaMaintenance.java
new file mode 100644
index 0000000..bdeb980
--- /dev/null
+++ b/src/test/java/io/tealc/worker01/kafka/KafkaMaintenance.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright Tealc authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.tealc.worker01.kafka;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.api.model.NamespaceBuilder;
+import io.strimzi.api.kafka.model.Kafka;
+import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
+import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
+import io.strimzi.api.kafka.model.status.KafkaStatus;
+import io.tealc.Abstract;
+import io.tealc.ClusterManager;
+import io.tealc.EClusters;
+import io.tealc.Utils;
+import io.tealc.templates.ConfigMapTemplates;
+import io.tealc.templates.KafkaNodePoolTemplates;
+import io.tealc.templates.KafkaTemplates;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class KafkaMaintenance extends Abstract {
+
+ public static final String NAMESPACE = "test-strimzi-kafka";
+
+ @Test
+ void deployNodepoolKafka() {
+ String kafkaClusterName = "eredin";
+ String pool1Name = "dual";
+ String pool2Name = "controller";
+ String pool3Name = "broker";
+
+ ConfigMap configMap = ConfigMapTemplates.kafkaConfigMap(NAMESPACE, kafkaClusterName).build();
+ ClusterManager.getInstance().getClient(EClusters.WORKER_01).getClient().configMaps().inNamespace(NAMESPACE).resource(configMap).create();
+
+ // Create nodepools
+ KafkaNodePool kafkaNodePool1 = KafkaNodePoolTemplates.defaultKafkaNodePool(NAMESPACE, pool1Name, kafkaClusterName, 5)
+ .editSpec()
+ .addToRoles(ProcessRoles.BROKER)
+ .addToRoles(ProcessRoles.CONTROLLER)
+ .withNewPersistentClaimStorage()
+ .withDeleteClaim(true)
+ .withSize("7Gi")
+ .endPersistentClaimStorage()
+ .endSpec()
+ .build();
+ ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaNodePoolClient().inNamespace(NAMESPACE).resource(kafkaNodePool1).create();
+
+ KafkaNodePool kafkaNodePool2 = KafkaNodePoolTemplates.defaultKafkaNodePool(NAMESPACE, pool2Name, kafkaClusterName, 3)
+ .editSpec()
+ .addToRoles(ProcessRoles.CONTROLLER)
+ .withNewPersistentClaimStorage()
+ .withDeleteClaim(true)
+ .withSize("5Gi")
+ .endPersistentClaimStorage()
+ .endSpec()
+ .build();
+ ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaNodePoolClient().inNamespace(NAMESPACE).resource(kafkaNodePool2).create();
+
+ KafkaNodePool kafkaNodePool3 = KafkaNodePoolTemplates.defaultKafkaNodePool(NAMESPACE, pool3Name, kafkaClusterName, 7)
+ .editSpec()
+ .addToRoles(ProcessRoles.BROKER)
+ .withNewPersistentClaimStorage()
+ .withDeleteClaim(true)
+ .withSize("10Gi")
+ .endPersistentClaimStorage()
+ .endSpec()
+ .build();
+ ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaNodePoolClient().inNamespace(NAMESPACE).resource(kafkaNodePool3).create();
+ // Create kafka
+ Kafka kafka = KafkaTemplates.kafka(NAMESPACE, kafkaClusterName, 3, 3).build();
+ ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaClient().inNamespace(NAMESPACE).resource(kafka).create();
+
+ Utils.waitFor("Kafka readiness", 1000, 300000,
+ () -> {
+ KafkaStatus status = ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaClient().inNamespace(NAMESPACE).withName(kafkaClusterName).get().getStatus();
+ LOGGER.info("Kafka {} status is {}", kafkaClusterName, status);
+
+ return status.getConditions().stream().anyMatch(condition -> condition.getType().equals("Ready") && condition.getStatus().equals("True"));
+ });
+ }
+
+ @BeforeAll
+ void setupEnvironment() {
+ // Create namespace
+ Namespace ns = new NamespaceBuilder().withNewMetadata().withName(NAMESPACE).endMetadata().build();
+ LOGGER.info("Creating namespace: {}", ns);
+ ClusterManager.getInstance().getClient(EClusters.WORKER_01).getClient().namespaces().resource(ns).create();
+ }
+
+ @AfterAll
+ void teardownEnvironment() {
+ // Delete namespace
+ LOGGER.info("Deleting namespace: {}", NAMESPACE);
+ ClusterManager.getInstance().getClient(EClusters.WORKER_01).getClient().namespaces().withName(NAMESPACE).delete();
+ }
+}
+
diff --git a/src/test/java/io/tealc/worker01/status/ComponentsStatusTest.java b/src/test/java/io/tealc/worker01/status/ComponentsStatusTest.java
index 73009ef..6896159 100644
--- a/src/test/java/io/tealc/worker01/status/ComponentsStatusTest.java
+++ b/src/test/java/io/tealc/worker01/status/ComponentsStatusTest.java
@@ -7,6 +7,9 @@
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaMirrorMaker2;
+import io.strimzi.api.kafka.model.KafkaRebalance;
+import io.strimzi.api.kafka.model.StrimziPodSet;
+import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.tealc.Abstract;
import io.tealc.ClusterManager;
import io.tealc.EClusters;
@@ -20,7 +23,9 @@
import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
public class ComponentsStatusTest extends Abstract {
@@ -50,7 +55,37 @@ void testKafkaMirrorMaker2IsReady(String namespace, String name) {
KafkaMirrorMaker2 kafkaMirrorMaker2 = ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaMirrorMaker2Client().inNamespace(namespace).withName(name).get();
String status = kafkaMirrorMaker2.getStatus().getConditions().stream().filter(item -> item.getType().equals("Ready")).toList().get(0).getStatus();
LOGGER.debug("KafkaMirrorMaker2: {}", kafkaMirrorMaker2);
- assertThat(String.format("KafkaConnect %s is not ready in namespace %s", name, namespace), status, is("True"));
+ assertThat(String.format("KafkaMirrorMaker2 %s is not ready in namespace %s", name, namespace), status, is("True"));
+ }
+
+ @ParameterizedTest(name = "{1}-in-{0}-higher-than-{2}")
+ @MethodSource("kafkaRebalanceInstances")
+ void testKafkaRebalance2IsReady(String namespace, String name) {
+ KafkaRebalance kafkaRebalance = ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaRebalanceClient().inNamespace(namespace).withName(name).get();
+ String status = kafkaRebalance.getStatus().getConditions().stream().filter(item -> item.getType().equals("Ready")).toList().get(0).getStatus();
+ LOGGER.debug("KafkaRebalance: {}", kafkaRebalance);
+ assertThat(String.format("KafkaRebalance %s is not ready in namespace %s", name, namespace), status, is("True"));
+ }
+
+ @ParameterizedTest(name = "{1}-in-{0}-higher-than-{2}")
+ @MethodSource("strimziPodSetInstances")
+ void testStrimziPodSetIsReady(String namespace, String name) {
+ StrimziPodSet strimziPodSet = ClusterManager.getInstance().getClient(EClusters.WORKER_01).strimziPodSetClient().inNamespace(namespace).withName(name).get();
+ LOGGER.debug("StrimziPodSet: {}", strimziPodSet);
+ assertThat(String.format("StrimziPodSet %s pods status has 0 value!", name, namespace), strimziPodSet.getStatus().getPods(), is(greaterThan(0)));
+ assertThat(String.format("StrimziPodSet %s readyPods status has 0 value!", name, namespace), strimziPodSet.getStatus().getReadyPods(), is(greaterThan(0)));
+ assertThat(String.format("StrimziPodSet %s currentPods status has 0 value!", name, namespace), strimziPodSet.getStatus().getCurrentPods(), is(greaterThan(0)));
+ assertThat(String.format("StrimziPodSet %s observerGeneration status has 0 value!", name, namespace), strimziPodSet.getStatus().getObservedGeneration(), is(greaterThan(0L)));
+ }
+
+ @ParameterizedTest(name = "{1}-in-{0}-higher-than-{2}")
+ @MethodSource("kafkaNodePoolsInstances")
+ void testKafkaNodePoolIsReady(String namespace, String name) {
+ KafkaNodePool kafkaNodePool = ClusterManager.getInstance().getClient(EClusters.WORKER_01).kafkaNodePoolClient().inNamespace(namespace).withName(name).get();
+ LOGGER.debug("KafkaNodePool: {}", kafkaNodePool);
+ assertThat(String.format("KafkaNodePool %s replicas status has 0 value!", name, namespace), kafkaNodePool.getStatus().getReplicas(), is(greaterThan(0)));
+ assertThat(String.format("KafkaNodePool %s clusterId status has 0 value!", name, namespace), kafkaNodePool.getStatus().getClusterId(), is(notNullValue()));
+ assertThat(String.format("KafkaNodePool %s NodeIDs status has 0 value!", name, namespace), kafkaNodePool.getStatus().getNodeIds(), is(notNullValue()));
}
private static Stream kafkaInstances() {
@@ -73,4 +108,34 @@ private static Stream kafkaMirrorMakerInstances() {
Arguments.of(ENamespaces.MIRROR_MAKER_KRAFT.name, "replicator-set")
);
}
+
+ private static Stream kafkaRebalanceInstances() {
+ return Stream.of(
+ Arguments.of(ENamespaces.KAFKA.name, "anubis-rebalance"),
+ Arguments.of(ENamespaces.KAFKA_KRAFT.name, "horus-rebalance")
+ );
+ }
+
+ private static Stream strimziPodSetInstances() {
+ return Stream.of(
+ Arguments.of(ENamespaces.KAFKA.name, "anubis-pool-big"),
+ Arguments.of(ENamespaces.KAFKA.name, "anubis-pool-small"),
+ Arguments.of(ENamespaces.KAFKA.name, "anubis-zookeeper"),
+ Arguments.of(ENamespaces.KAFKA_KRAFT.name, "horus-broker"),
+ Arguments.of(ENamespaces.KAFKA_KRAFT.name, "horus-controller"),
+ Arguments.of(ENamespaces.CONNECT.name, "hathor-connect"),
+ Arguments.of(ENamespaces.CONNECT.name, "imhotep-connect"),
+ Arguments.of(ENamespaces.MIRROR_MAKER.name, "replicator-carter-mirrormaker2"),
+ Arguments.of(ENamespaces.MIRROR_MAKER_KRAFT.name, "replicator-set-mirrormaker2")
+ );
+ }
+
+ private static Stream kafkaNodePoolsInstances() {
+ return Stream.of(
+ Arguments.of(ENamespaces.KAFKA.name, "pool-big"),
+ Arguments.of(ENamespaces.KAFKA.name, "pool-small"),
+ Arguments.of(ENamespaces.KAFKA_KRAFT.name, "broker"),
+ Arguments.of(ENamespaces.KAFKA_KRAFT.name, "controller")
+ );
+ }
}