From e2a2a43b136e2e2cd405eb29ebfe875c45ec0ea3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Smolarek?= <34063647+Razz4780@users.noreply.github.com> Date: Fri, 11 Oct 2024 17:03:37 +0200 Subject: [PATCH] Added kafka splitting setup (#117) --- mirrord-operator/Chart.yaml | 2 +- mirrord-operator/templates/cluster-role.yaml | 32 ++- mirrord-operator/templates/crd.yaml | 273 +++++++++++++++++++ mirrord-operator/templates/deployment.yaml | 2 + mirrord-operator/values.yaml | 2 + test_values/operator_kafka.yaml | 8 + 6 files changed, 316 insertions(+), 3 deletions(-) create mode 100644 test_values/operator_kafka.yaml diff --git a/mirrord-operator/Chart.yaml b/mirrord-operator/Chart.yaml index 09f1a30..e7d5294 100644 --- a/mirrord-operator/Chart.yaml +++ b/mirrord-operator/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.9.2 +version: 1.10.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/mirrord-operator/templates/cluster-role.yaml b/mirrord-operator/templates/cluster-role.yaml index 1f2f598..7ddc264 100644 --- a/mirrord-operator/templates/cluster-role.yaml +++ b/mirrord-operator/templates/cluster-role.yaml @@ -27,8 +27,8 @@ rules: - get - list - watch -{{- if .Values.operator.sqsSplitting }} -# For patching target workloads to use different queue. +{{- if or .Values.operator.sqsSplitting .Values.operator.kafkaSplitting }} +# For patching target workloads to use different queue/topic. - apiGroups: - apps resources: @@ -111,6 +111,34 @@ rules: verbs: - update {{- end }} +{{- if .Values.operator.kafkaSplitting }} +- apiGroups: + - queues.mirrord.metalbear.co + resources: + - mirrordkafkaephemeraltopics + verbs: + - get + - list + - watch + - create + - delete +- apiGroups: + - queues.mirrord.metalbear.co + resources: + - mirrordkafkaclientconfigs + verbs: + - get + - list + - watch +- apiGroups: + - queues.mirrord.metalbear.co + resources: + - mirrordkafkatopicsconsumers + verbs: + - get + - list + - watch +{{- end }} --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/mirrord-operator/templates/crd.yaml b/mirrord-operator/templates/crd.yaml index 67e183d..cd22576 100644 --- a/mirrord-operator/templates/crd.yaml +++ b/mirrord-operator/templates/crd.yaml @@ -438,3 +438,276 @@ spec: subresources: status: {} {{ end }} +{{ if .Values.operator.kafkaSplitting}} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: mirrordkafkaclientconfigs.queues.mirrord.metalbear.co +spec: + group: queues.mirrord.metalbear.co + names: + categories: [] + kind: MirrordKafkaClientConfig + plural: mirrordkafkaclientconfigs + shortNames: [] + singular: mirrordkafkaclientconfig + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Name of parent configuration. + jsonPath: .spec.parent + name: PARENT + type: string + name: v1alpha + schema: + openAPIV3Schema: + description: Auto-generated derived type for MirrordKafkaClientConfigSpec via `CustomResource` + properties: + spec: + description: Configuration to use when creating operator's Kafka client. Resources of this kind should live in the operator's namespace. + properties: + parent: + description: Name of parent resource to use as base when resolving final configuration. + nullable: true + type: string + properties: + description: |- + Properties to set. + + When performing Kafka splitting, the operator will override `group.id` property. + + The list of all available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + items: + description: Property to use when creating operator's Kafka client. + properties: + name: + description: Name of the property, e.g `bootstrap.servers`. + type: string + value: + description: Value for the property, e.g `kafka.default.svc.cluster.local:9092`. `null` clears the property from parent resource when resolving the final configuration. + nullable: true + type: string + required: + - name + type: object + type: array + required: + - properties + type: object + required: + - spec + title: MirrordKafkaClientConfig + type: object + served: true + storage: true + subresources: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: mirrordkafkaephemeraltopics.queues.mirrord.metalbear.co +spec: + group: queues.mirrord.metalbear.co + names: + categories: [] + kind: MirrordKafkaEphemeralTopic + plural: mirrordkafkaephemeraltopics + shortNames: [] + singular: mirrordkafkaephemeraltopic + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Name of the topic. + jsonPath: .spec.name + name: NAME + type: string + - description: Name of MirrordKafkaClientProperties to use when creating Kafka client. + jsonPath: .spec.clientConfig + name: CLIENT-CONFIG + type: string + name: v1alpha + schema: + openAPIV3Schema: + description: Auto-generated derived type for MirrordKafkaEphemeralTopicSpec via `CustomResource` + properties: + spec: + description: |- + Ephemeral topic created in your Kafka cluster for the purpose of running a Kafka splitting session. + + Resources of this kind should live in the operator's namespace. They will be used to clean up topics that are no longer used. + properties: + clientConfig: + description: Links to [`MirrordKafkaClientConfigSpec`] resource living in the same namespace. + type: string + name: + description: Name of the topic. + type: string + required: + - clientConfig + - name + type: object + required: + - spec + title: MirrordKafkaEphemeralTopic + type: object + served: true + storage: true + subresources: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: mirrordkafkatopicsconsumers.queues.mirrord.metalbear.co +spec: + group: queues.mirrord.metalbear.co + names: + categories: [] + kind: MirrordKafkaTopicsConsumer + plural: mirrordkafkatopicsconsumers + shortNames: [] + singular: mirrordkafkatopicsconsumer + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Name of the topic consumer workload. + jsonPath: .spec.consumerName + name: CONSUMER-NAME + type: string + - description: Kind of the topic consumer workload. + jsonPath: .spec.consumerKind + name: CONSUMER-KIND + type: string + - description: Api version of the topic consumer workload. + jsonPath: .spec.consumerApiVersion + name: CONSUMER-API-VERSION + type: string + - description: Timeout for consumer workload restart. + jsonPath: .spec.consumerRestartTimeout + name: CONSUMER-RESTART-TIMEOUT + type: string + name: v1alpha + schema: + openAPIV3Schema: + description: Auto-generated derived type for MirrordKafkaTopicsConsumerSpec via `CustomResource` + properties: + spec: + description: |- + Defines splittable Kafka topics consumed by some workload living in the same namespace. + + # Concurrent splitting + + Concurrent Kafka splitting sessions are allowed, as long as they use the same topic id or their topics' `nameSources` do not overlap. + + # Example + + ```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha kind: MirrordKafkaTopicsConsumer metadata: name: example namespace: default spec: consumerName: example-deployment consumerApiVersion: apps/v1 consumerKind: Deployment topics: - id: example-topic nameSources: - directEnvVar: container: example-container name: KAFKA_TOPIC_NAME groupIdSources: - directEnvVar: container: example-container name: KAFKA_GROUP_ID clientConfig: example-config ``` + + 1. Creating the resource below will enable Kafka splitting on a deployment `example-deployment` living in namespace `default`. Id `example-topic` can be then used in the mirrord config to split the topic for the duration of the mirrord session. + + 2. Topic name will be resolved based on `example-deployment`'s pod template by extracting value of variable `KAFKA_TOPIC_NAME` defined directly in `example-container`. + + 3. Consumer group id used by the mirrord operator will be resolved based on `example-deployment`'s pod template by extracting value of variable `KAFKA_GROUP_ID` defined directly in `example-container`. + + 4. For the duration of the session, `example-deployment` will be patched - the mirrord operator will substitute topic name in `KAFKA_TOPIC_NAME` variable with a name of an ephemeral Kafka topic. + + 5. Local application will see a different value of the `KAFKA_TOPIC_NAME` - it will be a name of another ephemeral Kafka topic. + + 6. `MirrordKafkaClientConfig` named `example-config` living in mirrord operator's namespace will be used to manage ephemeral Kafka topics and consume/produce messages. + properties: + consumerApiVersion: + description: Workload api version, for example `apps/v1`. + type: string + consumerKind: + description: Workload kind, for example `Deployment`. + type: string + consumerName: + description: Workload name, for example `my-deployment`. + type: string + consumerRestartTimeout: + description: |- + Timeout for waiting until workload patch takes effect, that is at least one pod reads from the ephemeral topic. + + Specified in seconds. Defaults to 60s. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + topics: + description: List of consumed splittable topics. + items: + description: Splittable Kafka topic consumed by some remote target. + properties: + clientConfig: + description: Links to [`MirrordKafkaClientConfig`] in the operator's namespace. This config will be used to manage ephemeral Kafka topics and consume/produce messages. + type: string + groupIdSources: + description: All occurrences of this topic's group id in the workload's pod template. + items: + description: Source of some topic property required for Kafka splitting. + oneOf: + - required: + - directEnvVar + properties: + directEnvVar: + description: Environment variable with value defined directly in the pod template. + properties: + container: + description: Name of the container. + type: string + variable: + description: Name of the variable. + type: string + required: + - container + - variable + type: object + type: object + type: array + id: + description: Id of this topic. Can be used in mirrord config to identify this topic. + type: string + nameSources: + description: All occurrences of this topic's name in the workload's pod template. + items: + description: Source of some topic property required for Kafka splitting. + oneOf: + - required: + - directEnvVar + properties: + directEnvVar: + description: Environment variable with value defined directly in the pod template. + properties: + container: + description: Name of the container. + type: string + variable: + description: Name of the variable. + type: string + required: + - container + - variable + type: object + type: object + type: array + required: + - clientConfig + - groupIdSources + - id + - nameSources + type: object + type: array + required: + - consumerApiVersion + - consumerKind + - consumerName + - topics + type: object + required: + - spec + title: MirrordKafkaTopicsConsumer + type: object + served: true + storage: true + subresources: {} +{{ end }} diff --git a/mirrord-operator/templates/deployment.yaml b/mirrord-operator/templates/deployment.yaml index 5b067ae..d615320 100644 --- a/mirrord-operator/templates/deployment.yaml +++ b/mirrord-operator/templates/deployment.yaml @@ -70,6 +70,8 @@ spec: {{- end }} - name: OPERATOR_SQS_SPLITTING value: {{ .Values.operator.sqsSplitting | ternary "true" "false" | quote }} + - name: OPERATOR_KAFKA_SPLITTING + value: {{ .Values.operator.kafkaSplitting | ternary "true" "false" | quote }} - name: OPERATOR_JSON_LOG value: {{ .Values.operator.jsonLog | ternary "true" "false" | quote }} - name: OPERATOR_AGENT_CONFIG diff --git a/mirrord-operator/values.yaml b/mirrord-operator/values.yaml index e555e21..00c0152 100644 --- a/mirrord-operator/values.yaml +++ b/mirrord-operator/values.yaml @@ -20,6 +20,8 @@ operator: jsonLog: false # Has to be set to `true` in order to use the SQS queue splitting feature. sqsSplitting: false + # Has to be set to `true` in order to use the Kafka queue splitting feature. + kafkaSplitting: false # imagePullSecrets: # - name: value diff --git a/test_values/operator_kafka.yaml b/test_values/operator_kafka.yaml new file mode 100644 index 0000000..53fd10d --- /dev/null +++ b/test_values/operator_kafka.yaml @@ -0,0 +1,8 @@ +operator: + kafkaSplitting: true + +license: + file: + secret: mirrord-operator-license + data: + license.pem: "DOESN'TNEEDTOBOOTSOITCANBEINVALID"