From ed7de12112a74912f18da7663ad254a797ec1061 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Tue, 6 Sep 2022 15:34:43 +0200 Subject: [PATCH 01/17] fix integration tests --- tests/integration/test_kafka_provider.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/test_kafka_provider.py b/tests/integration/test_kafka_provider.py index 7c15b87c..71858a03 100644 --- a/tests/integration/test_kafka_provider.py +++ b/tests/integration/test_kafka_provider.py @@ -36,7 +36,11 @@ async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): "zookeeper-k8s", channel="edge", application_name=ZK_NAME, num_units=3 ), ops_test.model.deploy( +<<<<<<< HEAD kafka_charm, +======= + zk_charm, +>>>>>>> 9026d42 (fix integration tests) application_name=APP_NAME, num_units=1, resources={"kafka-image": KAFKA_CONTAINER}, From 14d48fff94aff9d8d0c55dfaeedeb80e1646247a Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Tue, 6 Sep 2022 19:35:06 +0100 Subject: [PATCH 02/17] fix: fix failing int tests --- tests/integration/test_charm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 44d8eaa6..b59f6f7e 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -4,6 +4,7 @@ import asyncio import logging +import time import pytest from helpers import APP_NAME, KAFKA_CONTAINER, ZK_NAME, check_application_status From 39db21c5d55752021131a56b69b138479f0e9cfa Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 5 Sep 2022 14:06:13 +0100 Subject: [PATCH 03/17] cicd: fix ci rebase --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 40b9cbee..89055769 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -46,4 +46,4 @@ jobs: # This is needed until https://bugs.launchpad.net/juju/+bug/1977582 is fixed bootstrap-options: "--agent-version 2.9.29" - name: Run integration tests - run: tox -e integration \ No newline at end of file + run: tox -e integration From 9d40d92f5070a1f43ef644070fd4f99727ffa5c2 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 15 Aug 2022 17:04:37 +0100 Subject: [PATCH 04/17] fix: remove commented config --- config.yaml | 84 ----------------------------------------------------- 1 file changed, 84 deletions(-) diff --git a/config.yaml b/config.yaml index 2b79250a..f76b8bc6 100644 --- a/config.yaml +++ b/config.yaml @@ -22,87 +22,3 @@ options: description: enables auto creation of topic on the server type: boolean default: false - - # log.dirs=/var/snap/kafka/common/log - # - # # networking - # clientPort=2181 - # listeners=SASL_PLAINTEXT://:9092 - # - # # offsets - # offsets.topic.num.partitions=50 - # offsets.commit.required.acks=-1 - # offsets.retention.minutes=10080 - # - # # topic - # auto.leader.rebalance.enable=true - # # to be changed when necessary - # delete.topic.enable=true - # unclean.leader.election.enable=false - # auto.create.topics.enable=false - # # helpful - # group.initial.rebalance.delay.ms=3000 - # - # # auth - # sasl.enabled.mechanisms=SCRAM-SHA-512 - # sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 - # security.inter.broker.protocol=SASL_PLAINTEXT - # authorizer.class.name=kafka.security.authorizer.AclAuthorizer - # allow.everyone.if.no.acl.found=false - # super.users=User:sync - # listener.name.sasl_plaintext.sasl.enabled.mechanisms=SCRAM-SHA-512 - # # zookeeper.set.acl=true - - - - ## Backup - # background.threads=10 - # compression.type=producer - # leader.imbalance.check.interval.seconds=300 - # leader.imbalance.per.broker.percentage=10 - # log.retention.bytes=-1 - # log.roll.hours=168 - # log.roll.jitter.hours=0 - # log.segment.bytes=1073741824 - # log.segment.delete.delay.ms=60000 - # message.max.bytes=1000012 - # num.io.threads=8 - # num.network.threads=3 - # num.recovery.threads.per.data.dir=1 - # num.replica.fetchers=1 - # offset.metadata.max.bytes=4096 - # offsets.commit.timeout.ms=5000 - # offsets.load.buffer.size=5242880 - # offsets.retention.check.interval.ms=600000 - # offsets.topic.compression.codec=0 - # offsets.topic.segment.bytes=104857600 - # queued.max.requests=500 - # quota.consumer.default=9223372036854775807 - # quota.producer.default=9223372036854775807 - # replica.fetch.min.bytes=1 - # replica.fetch.wait.max.ms=500 - # replica.high.watermark.checkpoint.interval.ms=5000 - # replica.lag.time.max.ms=10000 - # replica.socket.receive.buffer.bytes=65536 - # replica.socket.timeout.ms=30000 - # request.timeout.ms=30000 - # socket.receive.buffer.bytes=102400 - # socket.request.max.bytes=104857600 - # socket.send.buffer.bytes=102400 - # zookeeper.session.timeout.ms=6000 - # connections.max.idle.ms=600000 - # controlled.shutdown.enable=true - # controlled.shutdown.max.retries=3 - # controlled.shutdown.retry.backoff.ms=5000 - # controller.socket.timeout.ms=30000 - # fetch.purgatory.purge.interval.requests=1000 - # group.max.session.timeout.ms=300000 - # group.min.session.timeout.ms=600 - # producer.purgatory.purge.interval.requests=1000 - # replica.fetch.backoff.ms=1000 - # replica.fetch.max.bytes=1048576 - # replica.fetch.response.max.bytes=10485760 - # reserved.broker.max.id=1000 - # num.partitions=1 - # group.initial.rebalance.delay.ms=0 - # zookeeper.connection.timeout.ms=18000 From 0d5c58b03c0e2c2f70791bd7fcb4e2b88da32b69 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 15 Aug 2022 17:05:11 +0100 Subject: [PATCH 05/17] feat: add rollingops lib --- lib/charms/rolling_ops/v0/rollingops.py | 390 ++++++++++++++++++++++++ 1 file changed, 390 insertions(+) create mode 100644 lib/charms/rolling_ops/v0/rollingops.py diff --git a/lib/charms/rolling_ops/v0/rollingops.py b/lib/charms/rolling_ops/v0/rollingops.py new file mode 100644 index 00000000..7f4bd1b8 --- /dev/null +++ b/lib/charms/rolling_ops/v0/rollingops.py @@ -0,0 +1,390 @@ +# Copyright 2022 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This library enables "rolling" operations across units of a charmed Application. + +For example, a charm author might use this library to implement a "rolling restart", in +which all units in an application restart their workload, but no two units execute the +restart at the same time. + +To implement the rolling restart, a charm author would do the following: + +1. Add a peer relation called 'restart' to a charm's `metadata.yaml`: +```yaml +peers: + restart: + interface: rolling_op +``` + +Import this library into src/charm.py, and initialize a RollingOpsManager in the Charm's +`__init__`. The Charm should also define a callback routine, which will be executed when +a unit holds the distributed lock: + +src/charm.py +```python +# ... +from charms.rolling_ops.v0.rollingops import RollingOpsManager +# ... +class SomeCharm(...): + def __init__(...) + # ... + self.restart_manager = RollingOpsManager( + charm=self, relation="restart", callback=self._restart + ) + # ... + def _restart(self, event): + systemd.service_restart('foo') +``` + +To kick off the rolling restart, emit this library's AcquireLock event. The simplest way +to do so would be with an action, though it might make sense to acquire the lock in +response to another event. + +```python + def _on_trigger_restart(self, event): + self.charm.on[self.restart_manager.name].acquire_lock.emit() +``` + +In order to trigger the restart, a human operator would execute the following command on +the CLI: + +``` +juju run-action some-charm/0 some-charm/1 <... some-charm/n> restart +``` + +Note that all units that plan to restart must receive the action and emit the aquire +event. Any units that do not run their acquire handler will be left out of the rolling +restart. (An operator might take advantage of this fact to recover from a failed rolling +operation without restarting workloads that were able to successfully restart -- simply +omit the successful units from a subsequent run-action call.) + +""" +import logging +from enum import Enum +from typing import AnyStr, Callable + +from ops.charm import ActionEvent, CharmBase, RelationChangedEvent +from ops.framework import EventBase, Object +from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "20b7777f58fe421e9a223aefc2b4d3a4" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 2 + + +class LockNoRelationError(Exception): + """Raised if we are trying to process a lock, but do not appear to have a relation yet.""" + + pass + + +class LockState(Enum): + """Possible states for our Distributed lock. + + Note that there are two states set on the unit, and two on the application. + + """ + + ACQUIRE = "acquire" + RELEASE = "release" + GRANTED = "granted" + IDLE = "idle" + + +class Lock: + """A class that keeps track of a single asynchronous lock. + + Warning: a Lock has permission to update relation data, which means that there are + side effects to invoking the .acquire, .release and .grant methods. Running any one of + them will trigger a RelationChanged event, once per transition from one internal + status to another. + + This class tracks state across the cloud by implementing a peer relation + interface. There are two parts to the interface: + + 1) The data on a unit's peer relation (defined in metadata.yaml.) Each unit can update + this data. The only meaningful values are "acquire", and "release", which represent + a request to acquire the lock, and a request to release the lock, respectively. + + 2) The application data in the relation. This tracks whether the lock has been + "granted", Or has been released (and reverted to idle). There are two valid states: + "granted" or None. If a lock is in the "granted" state, a unit should emit a + RunWithLocks event and then release the lock. + + If a lock is in "None", this means that a unit has not yet requested the lock, or + that the request has been completed. + + In more detail, here is the relation structure: + + relation.data: + : + status: 'acquire|release' + : + : 'granted|None' + + Note that this class makes no attempts to timestamp the locks and thus handle multiple + requests in a row. If a unit re-requests a lock before being granted the lock, the + lock will simply stay in the "acquire" state. If a unit wishes to clear its lock, it + simply needs to call lock.release(). + + """ + + def __init__(self, manager, unit=None): + + self.relation = manager.model.relations[manager.name][0] + if not self.relation: + # TODO: defer caller in this case (probably just fired too soon). + raise LockNoRelationError() + + self.unit = unit or manager.model.unit + self.app = manager.model.app + + @property + def _state(self) -> LockState: + """Return an appropriate state. + + Note that the state exists in the unit's relation data, and the application + relation data, so we have to be careful about what our states mean. + + Unit state can only be in "acquire", "release", "None" (None means unset) + Application state can only be in "granted" or "None" (None means unset or released) + + """ + unit_state = LockState(self.relation.data[self.unit].get("state", LockState.IDLE.value)) + app_state = LockState( + self.relation.data[self.app].get(str(self.unit), LockState.IDLE.value) + ) + + if app_state == LockState.GRANTED and unit_state == LockState.RELEASE: + # Active release request. + return LockState.RELEASE + + if app_state == LockState.IDLE and unit_state == LockState.ACQUIRE: + # Active acquire request. + return LockState.ACQUIRE + + return app_state # Granted or unset/released + + @_state.setter + def _state(self, state: LockState): + """Set the given state. + + Since we update the relation data, this may fire off a RelationChanged event. + """ + if state == LockState.ACQUIRE: + self.relation.data[self.unit].update({"state": state.value}) + + if state == LockState.RELEASE: + self.relation.data[self.unit].update({"state": state.value}) + + if state == LockState.GRANTED: + self.relation.data[self.app].update({str(self.unit): state.value}) + + if state is LockState.IDLE: + self.relation.data[self.app].update({str(self.unit): state.value}) + + def acquire(self): + """Request that a lock be acquired.""" + self._state = LockState.ACQUIRE + + def release(self): + """Request that a lock be released.""" + self._state = LockState.RELEASE + + def clear(self): + """Unset a lock.""" + self._state = LockState.IDLE + + def grant(self): + """Grant a lock to a unit.""" + self._state = LockState.GRANTED + + def is_held(self): + """This unit holds the lock.""" + return self._state == LockState.GRANTED + + def release_requested(self): + """A unit has reported that they are finished with the lock.""" + return self._state == LockState.RELEASE + + def is_pending(self): + """Is this unit waiting for a lock?""" + return self._state == LockState.ACQUIRE + + +class Locks: + """Generator that returns a list of locks.""" + + def __init__(self, manager): + self.manager = manager + + # Gather all the units. + relation = manager.model.relations[manager.name][0] + units = [unit for unit in relation.units] + + # Plus our unit ... + units.append(manager.model.unit) + + self.units = units + + def __iter__(self): + """Yields a lock for each unit we can find on the relation.""" + for unit in self.units: + yield Lock(self.manager, unit=unit) + + +class RunWithLock(EventBase): + """Event to signal that this unit should run the callback.""" + + pass + + +class AcquireLock(EventBase): + """Signals that this unit wants to acquire a lock.""" + + pass + + +class ProcessLocks(EventBase): + """Used to tell the leader to process all locks.""" + + pass + + +class RollingOpsManager(Object): + """Emitters and handlers for rolling ops.""" + + def __init__(self, charm: CharmBase, relation: AnyStr, callback: Callable): + """Register our custom events. + + params: + charm: the charm we are attaching this to. + relation: an identifier, by convention based on the name of the relation in the + metadata.yaml, which identifies this instance of RollingOperatorsFactory, + distinct from other instances that may be hanlding other events. + callback: a closure to run when we have a lock. (It must take a CharmBase object and + EventBase object as args.) + """ + # "Inherit" from the charm's class. This gives us access to the framework as + # self.framework, as well as the self.model shortcut. + super().__init__(charm, None) + + self.name = relation + self._callback = callback + self.charm = charm # Maintain a reference to charm, so we can emit events. + + charm.on.define_event("{}_run_with_lock".format(self.name), RunWithLock) + charm.on.define_event("{}_acquire_lock".format(self.name), AcquireLock) + charm.on.define_event("{}_process_locks".format(self.name), ProcessLocks) + + # Watch those events (plus the built in relation event). + self.framework.observe(charm.on[self.name].relation_changed, self._on_relation_changed) + self.framework.observe(charm.on[self.name].acquire_lock, self._on_acquire_lock) + self.framework.observe(charm.on[self.name].run_with_lock, self._on_run_with_lock) + self.framework.observe(charm.on[self.name].process_locks, self._on_process_locks) + + def _callback(self: CharmBase, event: EventBase) -> None: + """Placeholder for the function that actually runs our event. + + Usually overridden in the init. + """ + raise NotImplementedError + + def _on_relation_changed(self: CharmBase, event: RelationChangedEvent): + """Process relation changed. + + First, determine whether this unit has been granted a lock. If so, emit a RunWithLock + event. + + Then, if we are the leader, fire off a process locks event. + + """ + lock = Lock(self) + + if lock.is_pending(): + self.model.unit.status = WaitingStatus("Awaiting {} operation".format(self.name)) + + if lock.is_held(): + self.charm.on[self.name].run_with_lock.emit() + + if self.model.unit.is_leader(): + self.charm.on[self.name].process_locks.emit() + + def _on_process_locks(self: CharmBase, event: ProcessLocks): + """Process locks. + + Runs only on the leader. Updates the status of all locks. + + """ + if not self.model.unit.is_leader(): + return + + pending = [] + + for lock in Locks(self): + if lock.is_held(): + # One of our units has the lock -- return without further processing. + return + + if lock.release_requested(): + lock.clear() # Updates relation data + + if lock.is_pending(): + if lock.unit == self.model.unit: + # Always run on the leader last. + pending.insert(0, lock) + else: + pending.append(lock) + + # If we reach this point, and we have pending units, we want to grant a lock to + # one of them. + if pending: + self.model.app.status = MaintenanceStatus("Beginning rolling {}".format(self.name)) + lock = pending[-1] + lock.grant() + if lock.unit == self.model.unit: + # It's time for the leader to run with lock. + self.charm.on[self.name].run_with_lock.emit() + return + + self.model.app.status = ActiveStatus() + + def _on_acquire_lock(self: CharmBase, event: ActionEvent): + """Request a lock.""" + try: + Lock(self).acquire() # Updates relation data + # emit relation changed event in the edge case where aquire does not + relation = self.model.get_relation(self.name) + self.charm.on[self.name].relation_changed.emit(relation) + except LockNoRelationError: + logger.debug("No {} peer relation yet. Delaying rolling op.".format(self.name)) + event.defer() + + def _on_run_with_lock(self: CharmBase, event: RunWithLock): + lock = Lock(self) + self.model.unit.status = MaintenanceStatus("Executing {} operation".format(self.name)) + self._callback(event) + lock.release() # Updates relation data + if lock.unit == self.model.unit: + self.charm.on[self.name].process_locks.emit() + + self.model.unit.status = ActiveStatus() From 48f3335889d4b99aadcfecfe805fba97badd0a57 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 15 Aug 2022 17:05:39 +0100 Subject: [PATCH 06/17] feat: change provider relation name --- metadata.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata.yaml b/metadata.yaml index 564a3a74..501cd96b 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -29,5 +29,5 @@ requires: interface: zookeeper provides: - kafka: - interface: kafka + kafka-client: + interface: kafka_client From 1e53123f5d7209a784975c9534e2189edcab543b Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 15 Aug 2022 18:05:25 +0100 Subject: [PATCH 07/17] feat: add restart on config changes --- src/auth.py | 338 +++++++++++++++++++++++ src/charm.py | 148 +++++----- src/config.py | 120 ++++++-- src/connection_check.py | 67 ----- src/utils.py | 90 +++++- tests/integration/test_kafka_provider.py | 4 - 6 files changed, 604 insertions(+), 163 deletions(-) create mode 100644 src/auth.py delete mode 100644 src/connection_check.py diff --git a/src/auth.py b/src/auth.py new file mode 100644 index 00000000..c97a250b --- /dev/null +++ b/src/auth.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Supporting objects for Kafka user and ACL management.""" + +import logging +import re +from dataclasses import asdict, dataclass +from typing import List, Optional, Set + +from ops.model import Container + +from utils import run_bin_command + +logger = logging.getLogger(__name__) + + +@dataclass(unsafe_hash=True) +class Acl: + """Convenience object for representing a Kafka ACL.""" + + resource_name: str + resource_type: str + operation: str + username: str + + +class KafkaAuth: + """Object for updating Kafka users and ACLs.""" + + def __init__(self, opts: List[str], zookeeper: str, container: Container): + self.opts = " ".join(opts) + self.zookeeper = zookeeper + self.container = container + self.current_acls: Set[Acl] = set() + self.new_user_acls: Set[Acl] = set() + + def _get_acls_from_cluster(self) -> str: + """Loads the currently active ACLs from the Kafka cluster.""" + command = [ + f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--list", + ] + acls = run_bin_command( + container=self.container, + bin_keyword="acls", + bin_args=command, + extra_args=self.opts, + ) + + return acls + + @staticmethod + def _parse_acls(acls: str) -> Set[Acl]: + """Parses output from raw ACLs provided by the cluster.""" + current_acls = set() + resource_type, name, user, operation = None, None, None, None + for line in acls.splitlines(): + resource_search = re.search(r"resourceType=([^\,]+),", line) + if resource_search: + resource_type = resource_search[1] + + name_search = re.search(r"name=([^\,]+),", line) + if name_search: + name = name_search[1] + + user_search = re.search(r"principal=User\:([^\,]+),", line) + if user_search: + user = user_search[1] + + operation_search = re.search(r"operation=([^\,]+),", line) + if operation_search: + operation = operation_search[1] + else: + continue + + if resource_type and name and user and operation: + current_acls.add( + Acl( + resource_type=resource_type, + resource_name=name, + username=user, + operation=operation, + ) + ) + + return current_acls + + def load_current_acls(self) -> None: + """Sets the current cluster ACLs to the instance state. + + State is set to `KafkaAuth.current_acls`. + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + acls = self._get_acls_from_cluster() + + self.current_acls = self._parse_acls(acls=acls) + + @staticmethod + def _generate_producer_acls(topic: str, username: str, **_) -> Set[Acl]: + """Generates expected set of `Acl`s for a producer client application.""" + producer_acls = set() + for operation in ["CREATE", "WRITE", "DESCRIBE"]: + producer_acls.add( + Acl( + resource_type="TOPIC", + resource_name=topic, + username=username, + operation=operation, + ) + ) + + return producer_acls + + @staticmethod + def _generate_consumer_acls( + topic: str, username: str, group: Optional[str] = None + ) -> Set[Acl]: + """Generates expected set of `Acl`s for a consumer client application.""" + group = group or f"{username}-" # not needed, just for safety + + consumer_acls = set() + for operation in ["READ", "DESCRIBE"]: + consumer_acls.add( + Acl( + resource_type="TOPIC", + resource_name=topic, + username=username, + operation=operation, + ) + ) + consumer_acls.add( + Acl( + resource_type="GROUP", + resource_name=group, + username=username, + operation="READ", + ) + ) + + return consumer_acls + + def add_user(self, username: str, password: str) -> None: + """Adds new user credentials to ZooKeeper. + + Args: + username: the user name to add + password: the user password + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + command = [ + f"--zookeeper={self.zookeeper}", + "--alter", + "--entity-type=users", + f"--entity-name={username}", + f"--add-config=SCRAM-SHA-512=[password={password}]", + ] + run_bin_command( + container=self.container, bin_keyword="configs", bin_args=command, extra_args=self.opts + ) + + def delete_user(self, username: str) -> None: + """Deletes user credentials from ZooKeeper. + + Args: + username: the user name to delete + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + command = [ + f"--zookeeper={self.zookeeper}", + "--alter", + "--entity-type=users", + f"--entity-name={username}", + "--delete-config=SCRAM-SHA-512", + ] + run_bin_command( + container=self.container, bin_keyword="configs", bin_args=command, extra_args=self.opts + ) + + def add_acl( + self, username: str, operation: str, resource_type: str, resource_name: str + ) -> None: + """Adds new ACL rule for the cluster. + + Consumer Group READ permissions are granted to a prefixed group based on the + given `username`. e.g `-` + + Args: + username: the user name to add ACLs for + operation: the operation to grant + e.g `READ`, `WRITE`, `DESCRIBE` + resource_type: the resource type to grant ACLs for + e.g `GROUP`, `TOPIC` + resource_name: the name of the resource to grant ACLs for + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + if resource_type == "TOPIC": + command = [ + f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--add", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--topic={resource_name}", + ] + run_bin_command( + container=self.container, + bin_keyword="configs", + bin_args=command, + extra_args=self.opts, + ) + + if resource_type == "GROUP": + command = [ + f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--add", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--group={resource_name}", + "--resource-pattern-type=PREFIXED", + ] + run_bin_command( + container=self.container, + bin_keyword="configs", + bin_args=command, + extra_args=self.opts, + ) + + def remove_acl( + self, username: str, operation: str, resource_type: str, resource_name: str + ) -> None: + """Removes ACL rule for the cluster. + + Args: + username: the user name to remove ACLs for + operation: the operation to remove + e.g `READ`, `WRITE`, `DESCRIBE` + resource_type: the resource type to remove ACLs for + e.g `GROUP`, `TOPIC` + resource_name: the name of the resource to remove ACLs for + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + if resource_type == "TOPIC": + command = [ + f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--remove", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--topic={resource_name}", + "--force", + ] + run_bin_command( + container=self.container, + bin_keyword="configs", + bin_args=command, + extra_args=self.opts, + ) + + if resource_type == "GROUP": + command = [ + f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--remove", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--group={resource_name}", + "--resource-pattern-type=PREFIXED", + "--force", + ] + run_bin_command( + container=self.container, + bin_keyword="configs", + bin_args=command, + extra_args=self.opts, + ) + + def remove_all_user_acls(self, username: str) -> None: + """Removes all active ACLs for a given user. + + Args: + username: the user name to remove ACLs for + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + # getting subset of all cluster ACLs for only the provided user + current_user_acls = {acl for acl in self.current_acls if acl.username == username} + + for acl in current_user_acls: + self.remove_acl(**asdict(acl)) + + def update_user_acls( + self, username: str, topic: str, extra_user_roles: str, group: Optional[str], **_ + ) -> None: + """Compares data passed from the client relation, and updating cluster ACLs to match. + + `producer`s are granted READ, DESCRIBE and WRITE access for a given topic + `consumer`s are granted READ, DESCRIBE access for a given topic, and READ access for a + generated consumer group + + If new ACLs provided do not match existing ACLs set for the cluster, existing ACLs will + be revoked + + Args: + username: the user name to update ACLs for + topic: the topic to update ACLs for + extra_user_roles: the `extra-user-roles` for the user + group: the consumer group + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + if "producer" in extra_user_roles: + self.new_user_acls.update(self._generate_producer_acls(topic=topic, username=username)) + if "consumer" in extra_user_roles: + self.new_user_acls.update( + self._generate_consumer_acls(topic=topic, username=username, group=group) + ) + + # getting subset of all cluster ACLs for only the provided user + current_user_acls = {acl for acl in self.current_acls if acl.username == username} + + acls_to_add = self.new_user_acls - current_user_acls + for acl in acls_to_add: + self.add_acl(**asdict(acl)) + + acls_to_remove = current_user_acls - self.new_user_acls + for acl in acls_to_remove: + self.remove_acl(**asdict(acl)) diff --git a/src/charm.py b/src/charm.py index 33d7062a..ad3e3096 100755 --- a/src/charm.py +++ b/src/charm.py @@ -5,19 +5,25 @@ """Charmed Machine Operator for Apache Kafka.""" import logging -from typing import List -from ops.charm import ActionEvent, CharmBase, RelationEvent, RelationJoinedEvent +from charms.rolling_ops.v0.rollingops import RollingOpsManager +from ops.charm import ( + ActionEvent, + CharmBase, + ConfigChangedEvent, + RelationEvent, + RelationJoinedEvent, +) from ops.framework import EventBase from ops.main import main from ops.model import ActiveStatus, BlockedStatus, Container, Relation, WaitingStatus -from ops.pebble import ExecError, Layer +from ops.pebble import ExecError, Layer, PathError, ProtocolError +from auth import KafkaAuth from config import KafkaConfig -from connection_check import broker_active, zookeeper_connected from literals import CHARM_KEY, CHARM_USERS, PEER, ZOOKEEPER_REL_NAME from provider import KafkaProvider -from utils import generate_password +from utils import broker_active, generate_password logger = logging.getLogger(__name__) @@ -30,9 +36,13 @@ def __init__(self, *args): self.name = CHARM_KEY self.kafka_config = KafkaConfig(self) self.client_relations = KafkaProvider(self) + self.restart = RollingOpsManager(self, relation="restart", callback=self._restart) self.framework.observe(getattr(self.on, "kafka_pebble_ready"), self._on_kafka_pebble_ready) self.framework.observe(getattr(self.on, "leader_elected"), self._on_leader_elected) + self.framework.observe(getattr(self.on, "config_changed"), self._on_config_changed) + self.framework.observe(self.on[PEER].relation_changed, self._on_config_changed) + self.framework.observe( self.on[ZOOKEEPER_REL_NAME].relation_joined, self._on_zookeeper_joined ) @@ -43,7 +53,7 @@ def __init__(self, *args): self.on[ZOOKEEPER_REL_NAME].relation_broken, self._on_zookeeper_broken ) - self.framework.observe(self.on.set_password_action, self._set_password_action) + self.framework.observe(getattr(self.on, "set_password_action"), self._set_password_action) @property def container(self) -> Container: @@ -70,40 +80,16 @@ def _kafka_layer(self) -> Layer: @property def peer_relation(self) -> Relation: - """The Kafka peer relation.""" + """The Kafka cluster relation.""" return self.model.get_relation(PEER) - def run_bin_command(self, bin_keyword: str, bin_args: List[str], extra_args: str) -> str: - """Runs kafka bin command with desired args. - - Args: - bin_keyword: the kafka shell script to run - e.g `configs`, `topics` etc - bin_args: the shell command args - extra_args (optional): the desired `KAFKA_OPTS` env var values for the command - - Returns: - String of kafka bin command output - """ - environment = {"KAFKA_OPTS": extra_args} - command = [f"/opt/kafka/bin/kafka-{bin_keyword}.sh"] + bin_args - - try: - process = self.container.exec(command=command, environment=environment) - output, _ = process.wait_output() - logger.debug(f"{output=}") - return output - except (ExecError) as e: - logger.debug(f"cmd failed:\ncommand={e.command}\nstdout={e.stdout}\nstderr={e.stderr}") - raise e - def _on_kafka_pebble_ready(self, event: EventBase) -> None: """Handler for `kafka_pebble_ready` event.""" if not self.container.can_connect(): event.defer() return - if not zookeeper_connected(charm=self): + if not self.kafka_config.zookeeper_connected: self.unit.status = WaitingStatus("waiting for zookeeper relation") return @@ -113,19 +99,21 @@ def _on_kafka_pebble_ready(self, event: EventBase) -> None: # do not start units until SCRAM users have been added to ZooKeeper for server-server auth if self.unit.is_leader() and self.kafka_config.sync_password: + kafka_auth = KafkaAuth( + opts=[self.kafka_config.extra_args], + zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), + container=self.container, + ) try: - self.add_user_to_zookeeper( - username="sync", password=self.kafka_config.sync_password - ) + kafka_auth.add_user(username="sync", password=self.kafka_config.sync_password) self.peer_relation.data[self.app].update({"broker-creds": "added"}) - except ExecError: - # command to add users fails sometimes for unknown reasons. Retry seems to fix it. + except ExecError as e: + logger.debug(str(e)) event.defer() return # for non-leader units - if not self.peer_relation.data[self.app].get("broker-creds", None): - logger.debug("broker-creds not yet added to zookeeper") + if not self.ready_to_start: event.defer() return @@ -144,6 +132,35 @@ def _on_kafka_pebble_ready(self, event: EventBase) -> None: self.unit.status = BlockedStatus("kafka unit not connected to ZooKeeper") return + def _on_config_changed(self, event: ConfigChangedEvent) -> None: + """Generic handler for most `config_changed` events across relations.""" + if not self.ready_to_start: + event.defer() + return + + # Load current properties set in the charm workload + raw_properties = None + try: + raw_properties = self.container.pull(self.kafka_config.properties_filepath) + except (ProtocolError, PathError) as e: + logger.debug(str(e)) + if not raw_properties: + # Event fired before charm has properly started + event.defer() + return + + if set(list(raw_properties)) ^ set(self.kafka_config.server_properties): + logger.info( + ( + 'Broker {self.unit.name.split("/")[1]} updating config - ' + "OLD PROPERTIES = {set(properties) - set(self.kafka_config.server_properties)=}, " + "NEW PROPERTIES = {set(self.kafka_config.server_properties) - set(properties)=}" + ) + ) + self.kafka_config.set_server_properties() + + self.on[self.restart.name].acquire_lock.emit() + def _on_leader_elected(self, _) -> None: """Handler for `leader_elected` event, ensuring sync_passwords gets set.""" sync_password = self.kafka_config.sync_password @@ -192,8 +209,13 @@ def _set_password_action(self, event: ActionEvent): return # Update the user + kafka_auth = KafkaAuth( + opts=[self.kafka_config.extra_args], + zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), + container=self.container, + ) try: - self.add_user_to_zookeeper(username=username, password=new_password) + kafka_auth.add_user(username="sync", password=new_password) except ExecError as e: logger.error(str(e)) event.fail(str(e)) @@ -203,39 +225,27 @@ def _set_password_action(self, event: ActionEvent): self.peer_relation.data[self.app].update({f"{username}_password": new_password}) event.set_results({f"{username}-password": new_password}) - def add_user_to_zookeeper(self, username: str, password: str) -> None: - """Adds user credentials to ZooKeeper for authorising clients and brokers. + def _restart(self, event: EventBase) -> None: + """Handler for `rolling_ops` restart events.""" + if not self.ready_to_start: + event.defer() + return - Raises: - ops.pebble.ExecError: If the command failed - """ - command = [ - f"--zookeeper={self.kafka_config.zookeeper_config['connect']}", - "--alter", - "--entity-type=users", - f"--entity-name={username}", - f"--add-config=SCRAM-SHA-512=[password={password}]", - ] - self.run_bin_command( - bin_keyword="configs", bin_args=command, extra_args=self.kafka_config.extra_args - ) + self.container.restart() - def delete_user_from_zookeeper(self, username: str) -> None: - """Deletes user credentials from ZooKeeper for authorising clients and brokers. + @property + def ready_to_start(self) -> bool: + """Check for active ZooKeeper relation and adding of inter-broker auth username. - Raises: - ops.pebble.ExecError: If the command failed + Returns: + True if ZK is related and `sync` user has been added. False otherwise. """ - command = [ - f"--zookeeper={self.kafka_config.zookeeper_config['connect']}", - "--alter", - "--entity-type=users", - f"--entity-name={username}", - "--delete-config=SCRAM-SHA-512", - ] - self.run_bin_command( - bin_keyword="configs", bin_args=command, extra_args=self.kafka_config.extra_args - ) + if not self.kafka_config.zookeeper_connected or not self.peer_relation.data[self.app].get( + "broker-creds", None + ): + return False + + return True if __name__ == "__main__": diff --git a/src/config.py b/src/config.py index a2060b97..dcbb1496 100644 --- a/src/config.py +++ b/src/config.py @@ -8,8 +8,9 @@ from typing import Dict, List, Optional from ops.charm import CharmBase +from ops.model import Unit -from literals import CHARM_KEY, PEER, ZOOKEEPER_REL_NAME +from literals import CHARM_KEY, PEER, REL_NAME, ZOOKEEPER_REL_NAME logger = logging.getLogger(__name__) @@ -21,7 +22,6 @@ security.inter.broker.protocol=SASL_PLAINTEXT authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false -super.users=User:sync listener.name.sasl_plaintext.sasl.enabled.mechanisms=SCRAM-SHA-512 """ @@ -62,12 +62,27 @@ def zookeeper_config(self) -> Dict[str, str]: break if zookeeper_config: - zookeeper_config["connect"] = ( - zookeeper_config["uris"].replace(zookeeper_config["chroot"], "") - + zookeeper_config["chroot"] + sorted_uris = sorted( + zookeeper_config["uris"].replace(zookeeper_config["chroot"], "").split(",") ) + sorted_uris[-1] = sorted_uris[-1] + zookeeper_config["chroot"] + zookeeper_config["connect"] = ",".join(sorted_uris) + return zookeeper_config + @property + def zookeeper_connected(self) -> bool: + """Checks if there is an active ZooKeeper relation. + + Returns: + True if ZooKeeper is currently related with sufficient relation data + for a broker to connect with. False otherwise. + """ + if self.zookeeper_config.get("connect", None): + return True + + return False + @property def extra_args(self) -> str: """Collection of Java config arguments for SASL auth. @@ -79,6 +94,19 @@ def extra_args(self) -> str: return extra_args + @property + def bootstrap_server(self) -> List[str]: + """The current Kafka uris formatted for the `bootstrap-server` command flag. + + Returns: + List of `bootstrap-server` servers + """ + units: List[Unit] = list( + set([self.charm.unit] + list(self.charm.model.get_relation(PEER).units)) + ) + hosts = [self.get_host_from_unit(unit=unit) for unit in units] + return [f"{host}:9092" for host in hosts] + @property def kafka_command(self) -> str: """The run command for starting the Kafka service. @@ -116,7 +144,7 @@ def auth_properties(self) -> List[str]: List of properties to be set """ broker_id = self.charm.unit.name.split("/")[1] - host = f"{self.charm.app.name}-{broker_id}.{self.charm.app.name}-endpoints" + host = self.get_host_from_unit(unit=self.charm.unit) return [ f"broker.id={broker_id}", @@ -125,8 +153,56 @@ def auth_properties(self) -> List[str]: f'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="{self.sync_password}";', ] + @property + def super_users(self) -> str: + """Generates all users with super/admin permissions for the cluster from relations. + + Formatting allows passing to the `super.users` property. + + Returns: + Semi-colon delimited string of current super users + """ + super_users = ["sync"] + for relation in self.charm.model.relations[REL_NAME]: + extra_user_roles = relation.data[relation.app].get("extra-user-roles", "") + password = ( + self.charm.model.get_relation(PEER) + .data[self.charm.app] + .get(f"relation-{relation.id}", None) + ) + # if passwords are set for client admins, they're good to load + if "admin" in extra_user_roles and password is not None: + super_users.append(f"relation-{relation.id}") + + super_users_arg = [f"User:{user}" for user in super_users] + + return ";".join(super_users_arg) + + @property + def server_properties(self) -> List[str]: + """Builds all properties necessary for starting Kafka service. + + This includes charm config, replication, SASL/SCRAM auth and default properties. + + Returns: + List of properties to be set + """ + return ( + [ + f"data.dir={self.charm.config['data-dir']}", + f"log.dir={self.charm.config['log-dir']}", + f"offsets.retention.minutes={self.charm.config['offsets-retention-minutes']}", + f"log.retention.hours={self.charm.config['log-retention-hours']}", + f"auto.create.topics={self.charm.config['auto-create-topics']}", + f"super.users={self.super_users}", + ] + + self.default_replication_properties + + self.auth_properties + + DEFAULT_CONFIG_OPTIONS.split("\n") + ) + def push(self, content: str, path: str) -> None: - """Simple wrapper for writing a file and contents to a container. + """Wrapper for writing a file and contents to a container. Args: content: the text content to write to a file path @@ -134,6 +210,10 @@ def push(self, content: str, path: str) -> None: """ self.container.push(path, content, make_dirs=True) + def set_server_properties(self) -> None: + """Sets all kafka config properties to the `server.properties` path.""" + self.push(content="\n".join(self.server_properties), path=self.properties_filepath) + def set_jaas_config(self) -> None: """Sets the Kafka JAAS config using zookeeper relation data.""" jaas_config = f""" @@ -145,19 +225,15 @@ def set_jaas_config(self) -> None: """ self.push(content=jaas_config, path=self.jaas_filepath) - def set_server_properties(self) -> None: - """Sets all kafka config properties to the server.properties path.""" - server_properties = ( - [ - f"data.dir={self.charm.config['data-dir']}", - f"log.dir={self.charm.config['log-dir']}", - f"offsets.retention.minutes={self.charm.config['offsets-retention-minutes']}", - f"log.retention.hours={self.charm.config['log-retention-hours']}", - f"auto.create.topics={self.charm.config['auto-create-topics']}", - ] - + self.default_replication_properties - + self.auth_properties - + DEFAULT_CONFIG_OPTIONS.split("\n") - ) + def get_host_from_unit(self, unit: Unit) -> str: + """Builds K8s host address for a given `Unit`. + + Args: + unit: the desired unit + + Returns: + String of host address + """ + broker_id = unit.name.split("/")[1] - self.push(content="\n".join(server_properties), path=self.properties_filepath) + return f"{self.charm.app.name}-{broker_id}.{self.charm.app.name}-endpoints" diff --git a/src/connection_check.py b/src/connection_check.py deleted file mode 100644 index 42edbbbb..00000000 --- a/src/connection_check.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Collection of helper methods for checking active connections between ZK and Kafka.""" - -import logging -from typing import Dict - -from charms.zookeeper.v0.client import ZooKeeperManager -from kazoo.exceptions import AuthFailedError, NoNodeError -from ops.charm import CharmBase -from ops.model import Unit -from tenacity import retry -from tenacity.retry import retry_if_not_result -from tenacity.stop import stop_after_attempt -from tenacity.wait import wait_fixed - -logger = logging.getLogger(__name__) - - -def zookeeper_connected(charm: CharmBase) -> bool: - """Flag for if required zookeeper config exists in the relation data. - - Returns: - True if config exits i.e successful relation. False otherwise - """ - if not getattr(charm, "kafka_config").zookeeper_config: - return False - - return True - - -@retry( - # retry to give ZK time to update its broker zNodes before failing - wait=wait_fixed(5), - stop=stop_after_attempt(3), - retry_error_callback=(lambda state: state.outcome.result()), - retry=retry_if_not_result(lambda result: True if result else False), -) -def broker_active(unit: Unit, zookeeper_config: Dict[str, str]) -> bool: - """Checks ZooKeeper for client connections, checks for specific broker id. - - Args: - unit: the `Unit` to check connection of - data: the relation data provided by ZooKeeper - - Returns: - True if broker id is recognised as active by ZooKeeper. Otherwise False. - """ - broker_id = unit.name.split("/")[1] - chroot = zookeeper_config.get("chroot", "") - hosts = zookeeper_config.get("endpoints", "").split(",") - username = zookeeper_config.get("username", "") - password = zookeeper_config.get("password", "") - - zk = ZooKeeperManager(hosts=hosts, username=username, password=password) - path = f"{chroot}/brokers/ids/" - - try: - brokers = zk.leader_znodes(path=path) - # auth might not be ready with ZK after relation yet - except (NoNodeError, AuthFailedError) as e: - logger.debug(str(e)) - return False - - return f"{chroot}/brokers/ids/{broker_id}" in brokers diff --git a/src/utils.py b/src/utils.py index 4d43c8da..e118af7d 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,4 +1,4 @@ -# !/usr/bin/env python3 +#!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. @@ -7,10 +7,70 @@ import logging import secrets import string +from typing import Dict, List, Set + +from charms.zookeeper.v0.client import ZooKeeperManager +from kazoo.exceptions import AuthFailedError, NoNodeError +from ops.model import Container, Unit +from ops.pebble import ExecError +from tenacity import retry +from tenacity.retry import retry_if_not_result +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_fixed logger = logging.getLogger(__name__) +@retry( + # retry to give ZK time to update its broker zNodes before failing + wait=wait_fixed(5), + stop=stop_after_attempt(6), + retry_error_callback=(lambda state: state.outcome.result()), + retry=retry_if_not_result(lambda result: True if result else False), +) +def broker_active(unit: Unit, zookeeper_config: Dict[str, str]) -> bool: + """Checks ZooKeeper for client connections, checks for specific broker id. + + Args: + unit: the `Unit` to check connection of + zookeeper_config: the relation provided by ZooKeeper + + Returns: + True if broker id is recognised as active by ZooKeeper. Otherwise False. + """ + broker_id = unit.name.split("/")[1] + brokers = get_active_brokers(zookeeper_config=zookeeper_config) + chroot = zookeeper_config.get("chroot", "") + return f"{chroot}/brokers/ids/{broker_id}" in brokers + + +def get_active_brokers(zookeeper_config: Dict[str, str]) -> Set[str]: + """Gets all brokers currently connected to ZooKeeper. + + Args: + zookeeper_config: the relation data provided by ZooKeeper + + Returns: + Set of active broker ids + """ + chroot = zookeeper_config.get("chroot", "") + hosts = zookeeper_config.get("endpoints", "").split(",") + username = zookeeper_config.get("username", "") + password = zookeeper_config.get("password", "") + + zk = ZooKeeperManager(hosts=hosts, username=username, password=password) + path = f"{chroot}/brokers/ids/" + + try: + brokers = zk.leader_znodes(path=path) + # auth might not be ready with ZK after relation yet + except (NoNodeError, AuthFailedError) as e: + logger.debug(str(e)) + return set() + + return brokers + + def generate_password() -> str: """Creates randomized string for use as app passwords. @@ -18,3 +78,31 @@ def generate_password() -> str: String of 32 randomized letter+digit characters """ return "".join([secrets.choice(string.ascii_letters + string.digits) for _ in range(32)]) + + +def run_bin_command( + container: Container, bin_keyword: str, bin_args: List[str], extra_args: str +) -> str: + """Runs kafka bin command with desired args. + + Args: + container: the container to run on + bin_keyword: the kafka shell script to run + e.g `configs`, `topics` etc + bin_args: the shell command args + extra_args (optional): the desired `KAFKA_OPTS` env var values for the command + + Returns: + String of kafka bin command output + """ + environment = {"KAFKA_OPTS": extra_args} + command = [f"/opt/kafka/bin/kafka-{bin_keyword}.sh"] + bin_args + + try: + process = container.exec(command=command, environment=environment) + output, _ = process.wait_output() + logger.debug(f"{output=}") + return output + except (ExecError) as e: + logger.debug(f"cmd failed:\ncommand={e.command}\nstdout={e.stdout}\nstderr={e.stderr}") + raise e diff --git a/tests/integration/test_kafka_provider.py b/tests/integration/test_kafka_provider.py index 71858a03..7c15b87c 100644 --- a/tests/integration/test_kafka_provider.py +++ b/tests/integration/test_kafka_provider.py @@ -36,11 +36,7 @@ async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): "zookeeper-k8s", channel="edge", application_name=ZK_NAME, num_units=3 ), ops_test.model.deploy( -<<<<<<< HEAD kafka_charm, -======= - zk_charm, ->>>>>>> 9026d42 (fix integration tests) application_name=APP_NAME, num_units=1, resources={"kafka-image": KAFKA_CONTAINER}, From 52658eacb141ea7bcbf8b7e321122a109f51f5a5 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 15 Aug 2022 18:06:51 +0100 Subject: [PATCH 08/17] chore: update literal name for ZK --- tests/integration/test_charm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index b59f6f7e..44d8eaa6 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -4,7 +4,6 @@ import asyncio import logging -import time import pytest from helpers import APP_NAME, KAFKA_CONTAINER, ZK_NAME, check_application_status From 3f32e2d8fc65e2bb5f11968e5536a81d748f5753 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 15 Aug 2022 18:13:44 +0100 Subject: [PATCH 09/17] feat: add kafka_client relation interface logic --- src/provider.py | 187 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 131 insertions(+), 56 deletions(-) diff --git a/src/provider.py b/src/provider.py index 1906a825..ede2ad7f 100644 --- a/src/provider.py +++ b/src/provider.py @@ -5,15 +5,21 @@ """KafkaProvider class and methods.""" import logging -import secrets -import string from typing import Dict -from ops.charm import RelationBrokenEvent, RelationJoinedEvent +from ops.charm import ( + RelationBrokenEvent, + RelationChangedEvent, + RelationCreatedEvent, + RelationEvent, +) from ops.framework import Object from ops.model import Relation -from literals import PEER, REL_NAME +from auth import KafkaAuth +from config import KafkaConfig +from literals import CHARM_KEY, PEER, REL_NAME +from utils import generate_password logger = logging.getLogger(__name__) @@ -22,24 +28,40 @@ class KafkaProvider(Object): """Implements the provider-side logic for client applications relating to Kafka.""" def __init__(self, charm) -> None: - super().__init__(charm, "client") - + super().__init__(charm, "kafka_client") self.charm = charm - - self.framework.observe( - self.charm.on[REL_NAME].relation_joined, self._on_client_relation_joined - ) - self.framework.observe( - self.charm.on[REL_NAME].relation_broken, self._on_client_relation_broken + self.kafka_config = KafkaConfig(self.charm) + self.kafka_auth = KafkaAuth( + container=self.charm.get_container(CHARM_KEY), + opts=[self.kafka_config.extra_args], + zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), ) + self.framework.observe(self.charm.on[REL_NAME].relation_created, self._on_relation_created) + self.framework.observe(self.charm.on[REL_NAME].relation_changed, self.update_acls) + self.framework.observe(self.charm.on[REL_NAME].relation_broken, self._on_relation_broken) + @property - def app_relation(self) -> Relation: + def peer_relation(self) -> Relation: """The Kafka cluster's peer relation.""" return self.charm.model.get_relation(PEER) - def relation_config(self, relation: Relation) -> Dict[str, str]: - """Builds necessary relation data for a given relation. + def requirer_relation_config(self, event: RelationEvent) -> Dict[str, str]: + """Builds necessary client relation data for a given relation event. + + Args: + event: the event needing config + + Returns: + Dict with keys `topic` and `extra_user_roles` + """ + return { + "extra_user_roles": event.relation.data[event.app].get("extra-user-roles", ""), + "topic": event.relation.data[event.app].get("topic"), + } + + def provider_relation_config(self, event: RelationEvent) -> Dict[str, str]: + """Builds necessary provider relation data for a given relation event. Args: event: the event needing config @@ -47,65 +69,118 @@ def relation_config(self, relation: Relation) -> Dict[str, str]: Returns: Dict of `username`, `password` and `endpoints` data for the related app """ + relation = event.relation + username = f"relation-{relation.id}" - password = self.app_relation.data[self.charm.app].get(username, self.generate_password()) - units = set([self.charm.unit] + list(self.app_relation.units)) - endpoints = [ - f"{self.charm.app.name}-{unit.name.split('/')[1]}.{self.charm.app.name}-endpoints" - for unit in units - ] + password = self.peer_relation.data[self.charm.app].get(username) or generate_password() + bootstrap_server = self.charm.kafka_config.bootstrap_server + endpoints = [server.split(":")[0] for server in bootstrap_server] + zookeeper_uris = self.charm.kafka_config.zookeeper_config.get("connect", "") + + relation_config = { + "username": username, + "password": password, + "endpoints": ",".join(endpoints), + "uris": ",".join(bootstrap_server), + "zookeeper-uris": zookeeper_uris, + "consumer-group-prefix": "", + } + + # only set this if `consumer` is set to avoid missing information + if "consumer" in event.relation.data[event.app].get("extra-user-roles", ""): + relation_config["consumer-group-prefix"] = f"{username}-" - return {"username": username, "password": password, "endpoints": ",".join(endpoints)} + return relation_config - def _on_client_relation_joined(self, event: RelationJoinedEvent) -> None: - """Handler for `relation_joined` events.""" + def update_acls(self, event: RelationChangedEvent) -> None: + """Updates cluster ACLs for a given event relation to match client relation data. + + Args: + event: the event from a related client application needing ACLs + """ if not self.charm.unit.is_leader(): return - relation_config = self.relation_config(relation=event.relation) + if not self.charm.kafka_config.zookeeper_connected: + logger.debug("cannot update ACLs, ZooKeeper not yet connected") + event.defer() + return + + provider_relation_config = self.provider_relation_config(event=event) + requirer_relation_config = self.requirer_relation_config(event=event) + + self.kafka_auth.load_current_acls() + + self.kafka_auth.update_user_acls( + username=provider_relation_config["username"], + group=provider_relation_config.get("consumer-group-prefix"), + **requirer_relation_config, + ) + + # non-leader units need cluster_config_changed event to update their super.users + self.charm.model.get_relation(PEER).data[self.charm.app].update( + {"super-users": self.kafka_config.super_users} + ) - self.add_user(username=relation_config["username"], password=relation_config["password"]) - event.relation.data[self.charm.app].update(relation_config) + event.relation.data[self.charm.app].update(provider_relation_config) - def _on_client_relation_broken(self, event: RelationBrokenEvent) -> None: - """Handler for `relation_broken` events.""" + def _on_relation_created(self, event: RelationCreatedEvent) -> None: + """Handler for `kafka-client-relation-created` event. + + Adds new relation users to ZooKeeper. + + Args: + event: the event from a related client application needing a user + """ if not self.charm.unit.is_leader(): return - relation_config = self.relation_config(relation=event.relation) + if not self.charm.ready_to_start: + logger.debug("cannot add user, ZooKeeper not yet connected") + event.defer() + return - self.delete_user(username=relation_config["username"]) + provider_relation_config = self.provider_relation_config(event=event) - def add_user(self, username: str, password: str) -> None: - """Adds/updates users' SCRAM credentials to ZooKeeper. + self.kafka_auth.add_user( + username=provider_relation_config["username"], + password=provider_relation_config["password"], + ) - Args: - username: the user's username - password: the user's password + # non-leader units need cluster_config_changed event to update their super.users + self.charm.model.get_relation(PEER).data[self.charm.app].update( + {provider_relation_config["username"]: provider_relation_config["password"]} + ) - Raises: - ops.pebble.ExecError: if the command failed - """ - self.charm.add_user_to_zookeeper(username=username, password=password) - self.app_relation.data[self.charm.app].update({username: password}) + def _on_relation_broken(self, event: RelationBrokenEvent) -> None: + """Handler for `kafka-client-relation-broken` event. - def delete_user(self, username: str) -> None: - """Deletes users' SCRAM credentials from ZooKeeper. + Removes relation users from ZooKeeper. Args: - username: the user's username - - Raises: - ops.pebble.ExecError: if the command failed + event: the event from a related client application needing a user """ - self.charm.delete_user_from_zookeeper(username=username) - self.app_relation.data[self.charm.app].update({username: ""}) + if not self.charm.unit.is_leader(): + return + + if not self.charm.ready_to_start: + logger.debug("cannot remove user, ZooKeeper not yet connected") + event.defer() + return - @staticmethod - def generate_password(): - """Creates randomized string for use as app passwords. + if ( + event.relation.app != self.charm.app or not self.charm.app.planned_units() == 0 + ): # avoid on own charm during teardown + provider_relation_config = self.provider_relation_config(event=event) - Returns: - String of 32 randomized letter+digit characters - """ - return "".join([secrets.choice(string.ascii_letters + string.digits) for _ in range(32)]) + self.kafka_auth.load_current_acls() + + self.kafka_auth.remove_all_user_acls( + username=provider_relation_config["username"], + ) + self.kafka_auth.delete_user(username=provider_relation_config["username"]) + + # non-leader units need cluster_config_changed event to update their super.users + self.charm.model.get_relation(PEER).data[self.charm.app].update( + {provider_relation_config["username"]: ""} + ) From 6efd82dddc3d89b9ce30223435938b4916b8ed0d Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 29 Aug 2022 02:26:39 +0100 Subject: [PATCH 10/17] update metadata for restart --- metadata.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata.yaml b/metadata.yaml index 501cd96b..71713199 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -23,6 +23,8 @@ resources: peers: cluster: interface: cluster + restart: + interface: rolling_op requires: zookeeper: From a4f261216dac522fd390f7b1ace98e4b5d45977c Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 29 Aug 2022 02:27:14 +0100 Subject: [PATCH 11/17] feat: update config/auth setting --- src/auth.py | 23 ++++++++++++++--------- src/charm.py | 16 ++++++++++------ src/config.py | 8 ++++++-- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/auth.py b/src/auth.py index c97a250b..21153bd3 100644 --- a/src/auth.py +++ b/src/auth.py @@ -39,7 +39,8 @@ def __init__(self, opts: List[str], zookeeper: str, container: Container): def _get_acls_from_cluster(self) -> str: """Loads the currently active ACLs from the Kafka cluster.""" command = [ - f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", "--list", ] acls = run_bin_command( @@ -205,7 +206,8 @@ def add_acl( """ if resource_type == "TOPIC": command = [ - f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", "--add", f"--allow-principal=User:{username}", f"--operation={operation}", @@ -213,14 +215,15 @@ def add_acl( ] run_bin_command( container=self.container, - bin_keyword="configs", + bin_keyword="acls", bin_args=command, extra_args=self.opts, ) if resource_type == "GROUP": command = [ - f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", "--add", f"--allow-principal=User:{username}", f"--operation={operation}", @@ -229,7 +232,7 @@ def add_acl( ] run_bin_command( container=self.container, - bin_keyword="configs", + bin_keyword="acls", bin_args=command, extra_args=self.opts, ) @@ -252,7 +255,8 @@ def remove_acl( """ if resource_type == "TOPIC": command = [ - f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", "--remove", f"--allow-principal=User:{username}", f"--operation={operation}", @@ -261,14 +265,15 @@ def remove_acl( ] run_bin_command( container=self.container, - bin_keyword="configs", + bin_keyword="acls", bin_args=command, extra_args=self.opts, ) if resource_type == "GROUP": command = [ - f"--authorizer-properties zookeeper.connect={self.zookeeper}", + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", "--remove", f"--allow-principal=User:{username}", f"--operation={operation}", @@ -278,7 +283,7 @@ def remove_acl( ] run_bin_command( container=self.container, - bin_keyword="configs", + bin_keyword="acls", bin_args=command, extra_args=self.opts, ) diff --git a/src/charm.py b/src/charm.py index ad3e3096..a907e061 100755 --- a/src/charm.py +++ b/src/charm.py @@ -141,20 +141,24 @@ def _on_config_changed(self, event: ConfigChangedEvent) -> None: # Load current properties set in the charm workload raw_properties = None try: - raw_properties = self.container.pull(self.kafka_config.properties_filepath) + raw_properties = str(self.container.pull(self.kafka_config.properties_filepath).read()) + properties = raw_properties.splitlines() except (ProtocolError, PathError) as e: logger.debug(str(e)) + event.defer() + return if not raw_properties: # Event fired before charm has properly started event.defer() return - if set(list(raw_properties)) ^ set(self.kafka_config.server_properties): + + if set(properties) ^ set(self.kafka_config.server_properties): logger.info( ( - 'Broker {self.unit.name.split("/")[1]} updating config - ' - "OLD PROPERTIES = {set(properties) - set(self.kafka_config.server_properties)=}, " - "NEW PROPERTIES = {set(self.kafka_config.server_properties) - set(properties)=}" + f'Broker {self.unit.name.split("/")[1]} updating config - ' + f"OLD PROPERTIES = {set(properties) - set(self.kafka_config.server_properties)}, " + f"NEW PROPERTIES = {set(self.kafka_config.server_properties) - set(properties)}" ) ) self.kafka_config.set_server_properties() @@ -231,7 +235,7 @@ def _restart(self, event: EventBase) -> None: event.defer() return - self.container.restart() + self.container.restart(CHARM_KEY) @property def ready_to_start(self) -> bool: diff --git a/src/config.py b/src/config.py index dcbb1496..89cc681c 100644 --- a/src/config.py +++ b/src/config.py @@ -8,7 +8,7 @@ from typing import Dict, List, Optional from ops.charm import CharmBase -from ops.model import Unit +from ops.model import Container, Unit from literals import CHARM_KEY, PEER, REL_NAME, ZOOKEEPER_REL_NAME @@ -31,11 +31,15 @@ class KafkaConfig: def __init__(self, charm: CharmBase): self.charm = charm - self.container = self.charm.unit.get_container(CHARM_KEY) self.default_config_path = f"{self.charm.config['data-dir']}/config" self.properties_filepath = f"{self.default_config_path}/server.properties" self.jaas_filepath = f"{self.default_config_path}/kafka-jaas.cfg" + @property + def container(self) -> Container: + """Grabs the current Kafka container.""" + return getattr(self.charm, "unit").get_container(CHARM_KEY) + @property def sync_password(self) -> Optional[str]: """Returns charm-set sync_password for server-server auth between brokers.""" From 9aaa07e76a63465766a956a61172fe73c631787a Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 29 Aug 2022 02:27:30 +0100 Subject: [PATCH 12/17] feat: implement kafka-client relation --- src/literals.py | 2 +- src/provider.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/literals.py b/src/literals.py index 24b3d4e3..bd2b0fd1 100644 --- a/src/literals.py +++ b/src/literals.py @@ -7,5 +7,5 @@ CHARM_KEY = "kafka" PEER = "cluster" ZOOKEEPER_REL_NAME = "zookeeper" -REL_NAME = "kafka" CHARM_USERS = ["sync"] +REL_NAME = "kafka-client" diff --git a/src/provider.py b/src/provider.py index ede2ad7f..f7203d57 100644 --- a/src/provider.py +++ b/src/provider.py @@ -32,7 +32,7 @@ def __init__(self, charm) -> None: self.charm = charm self.kafka_config = KafkaConfig(self.charm) self.kafka_auth = KafkaAuth( - container=self.charm.get_container(CHARM_KEY), + container=self.charm.unit.get_container(CHARM_KEY), opts=[self.kafka_config.extra_args], zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), ) From f663319b6d41fa1562eb8fd62f065c7beda2c1e6 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Tue, 6 Sep 2022 21:53:44 +0100 Subject: [PATCH 13/17] test: update all tests --- src/charm.py | 1 - tests/fixtures/valid_server.properties | 6 - tests/integration/app-charm/actions.yaml | 11 ++ tests/integration/app-charm/metadata.yaml | 4 +- tests/integration/app-charm/src/charm.py | 31 +++- tests/integration/helpers.py | 64 +++++--- tests/integration/test_provider.py | 178 ++++++++++++++++++++++ tests/integration/test_scaling.py | 72 +++++++++ tests/unit/test_auth.py | 61 ++++++++ tests/unit/test_config.py | 173 ++++++++++++++++----- tests/unit/test_provider.py | 77 ---------- 11 files changed, 532 insertions(+), 146 deletions(-) delete mode 100644 tests/fixtures/valid_server.properties create mode 100644 tests/integration/app-charm/actions.yaml create mode 100644 tests/integration/test_provider.py create mode 100644 tests/integration/test_scaling.py create mode 100644 tests/unit/test_auth.py delete mode 100644 tests/unit/test_provider.py diff --git a/src/charm.py b/src/charm.py index a907e061..2a002b50 100755 --- a/src/charm.py +++ b/src/charm.py @@ -152,7 +152,6 @@ def _on_config_changed(self, event: ConfigChangedEvent) -> None: event.defer() return - if set(properties) ^ set(self.kafka_config.server_properties): logger.info( ( diff --git a/tests/fixtures/valid_server.properties b/tests/fixtures/valid_server.properties deleted file mode 100644 index dccf7741..00000000 --- a/tests/fixtures/valid_server.properties +++ /dev/null @@ -1,6 +0,0 @@ -broker.id=1 -clientPort=2181 -broker.id.generation.enable=true -listeners=PLAINTEXT://:9092 -advertised.listeners=PLAINTEXT://:9092 -log.dirs=/var/lib/kafka/data diff --git a/tests/integration/app-charm/actions.yaml b/tests/integration/app-charm/actions.yaml new file mode 100644 index 00000000..5b694b27 --- /dev/null +++ b/tests/integration/app-charm/actions.yaml @@ -0,0 +1,11 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +make-admin: + description: Adds admin to relation data + +remove-admin: + description: Adds admin to relation data + +change-topic: + description: Changes topic diff --git a/tests/integration/app-charm/metadata.yaml b/tests/integration/app-charm/metadata.yaml index de52389d..49804b95 100644 --- a/tests/integration/app-charm/metadata.yaml +++ b/tests/integration/app-charm/metadata.yaml @@ -13,5 +13,5 @@ peers: interface: cluster requires: - kafka: - interface: kafka + kafka-client: + interface: kafka_client diff --git a/tests/integration/app-charm/src/charm.py b/tests/integration/app-charm/src/charm.py index a30759a2..769058e4 100755 --- a/tests/integration/app-charm/src/charm.py +++ b/tests/integration/app-charm/src/charm.py @@ -19,7 +19,8 @@ CHARM_KEY = "app" PEER = "cluster" -REL_NAME = "kafka" +REL_NAME = "kafka-client" +ZK = "zookeeper" class ApplicationCharm(CharmBase): @@ -30,9 +31,13 @@ def __init__(self, *args): self.name = CHARM_KEY self.framework.observe(getattr(self.on, "start"), self._on_start) + self.framework.observe(self.on[REL_NAME].relation_created, self._set_data) self.framework.observe(self.on[REL_NAME].relation_changed, self._log) self.framework.observe(self.on[REL_NAME].relation_broken, self._log) - self.framework.observe(self.on[REL_NAME].relation_joined, self._set_data) + + self.framework.observe(getattr(self.on, "make_admin_action"), self._make_admin) + self.framework.observe(getattr(self.on, "remove_admin_action"), self._remove_admin) + self.framework.observe(getattr(self.on, "change_topic_action"), self._change_topic) @property def relation(self): @@ -41,8 +46,26 @@ def relation(self): def _on_start(self, _) -> None: self.unit.status = ActiveStatus() - def _set_data(self, _) -> None: - return + def _set_data(self, event: RelationEvent) -> None: + + event.relation.data[self.unit].update({"group": self.unit.name.split("/")[1]}) + + if not self.unit.is_leader(): + return + event.relation.data[self.app].update( + {"extra-user-roles": "consumer", "topic": "test-topic"} + ) + + def _make_admin(self, _): + self.model.get_relation(REL_NAME).data[self.app].update( + {"extra-user-roles": "admin,consumer"} + ) + + def _remove_admin(self, _): + self.model.get_relation(REL_NAME).data[self.app].update({"extra-user-roles": "producer"}) + + def _change_topic(self, _): + self.model.get_relation(REL_NAME).data[self.app].update({"topic": "test-topic-changed"}) def _log(self, event: RelationEvent): return diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 372535e7..d9158c69 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -4,17 +4,47 @@ import re from pathlib import Path from subprocess import PIPE, check_output -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Set, Tuple import yaml from pytest_operator.plugin import OpsTest +from auth import Acl, KafkaAuth +from literals import CHARM_KEY + METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) KAFKA_CONTAINER = METADATA["resources"]["kafka-image"]["upstream-source"] APP_NAME = METADATA["name"] ZK_NAME = "zookeeper-k8s" +def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]: + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh {CHARM_KEY}/0 'kafka.acls --authorizer-properties zookeeper.connect={zookeeper_uri} --list'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return KafkaAuth._parse_acls(acls=result) + + +def load_super_users(model_full_name: str) -> List[str]: + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh {CHARM_KEY}/0 'cat /data/kafka/server.properties'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + properties = result.splitlines() + + for prop in properties: + if "super.users" in prop: + return prop.split("=")[1].split(";") + + return [] + + def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" result = check_output( @@ -27,7 +57,7 @@ def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: assert "SCRAM-SHA-512" in result -def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: +def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> str: """Get information related to a user stored on zookeeper.""" container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" result = check_output( @@ -72,21 +102,6 @@ def get_zookeeper_connection(unit_name: str, model_full_name: str) -> Tuple[List raise Exception("config not found") -def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str, str]: - result = show_unit(unit_name=unit_name, model_full_name=model_full_name) - relations_info = result[unit_name]["relation-info"] - - zk_relation_data = {} - for info in relations_info: - if info["endpoint"] == "zookeeper": - zk_relation_data["chroot"] = info["application-data"]["chroot"] - zk_relation_data["endpoints"] = info["application-data"]["endpoints"] - zk_relation_data["password"] = info["application-data"]["password"] - zk_relation_data["uris"] = info["application-data"]["uris"] - zk_relation_data["username"] = info["application-data"]["username"] - return zk_relation_data - - async def set_password(ops_test: OpsTest, username="sync", password=None, num_unit=0) -> str: """Use the charm action to start a password rotation.""" params = {"username": username} @@ -117,3 +132,18 @@ def check_application_status(ops_test: OpsTest, app_name: str) -> str: find_status = list(statuses & set(parts)) if find_status: return find_status[0] + + +def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str, str]: + result = show_unit(unit_name=unit_name, model_full_name=model_full_name) + relations_info = result[unit_name]["relation-info"] + + zk_relation_data = {} + for info in relations_info: + if info["endpoint"] == "zookeeper": + zk_relation_data["chroot"] = info["application-data"]["chroot"] + zk_relation_data["endpoints"] = info["application-data"]["endpoints"] + zk_relation_data["password"] = info["application-data"]["password"] + zk_relation_data["uris"] = info["application-data"]["uris"] + zk_relation_data["username"] = info["application-data"]["username"] + return zk_relation_data diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py new file mode 100644 index 00000000..5d24ee59 --- /dev/null +++ b/tests/integration/test_provider.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import time + +import pytest +from pytest_operator.plugin import OpsTest + +from literals import CHARM_KEY, ZK +from tests.integration.helpers import ( + check_user, + get_zookeeper_connection, + load_acls, + load_super_users, +) + +logger = logging.getLogger(__name__) + +DUMMY_NAME_1 = "app" +DUMMY_NAME_2 = "appii" + + +@pytest.fixture(scope="module") +def usernames(): + return set() + + +@pytest.mark.abort_on_fail +async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): + kafka_charm = await ops_test.build_charm(".") + app_charm = await ops_test.build_charm("tests/integration/app-charm") + + await asyncio.gather( + ops_test.model.deploy("zookeeper-k8s", channel="edge", application_name=ZK, num_units=1), + ops_test.model.deploy( + kafka_charm, + application_name=CHARM_KEY, + num_units=1, + resources={"kafka-image": "ubuntu/kafka:latest"}, + ), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), + ) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_1, ZK]) + await ops_test.model.add_relation(CHARM_KEY, ZK) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZK]) + await ops_test.model.add_relation(CHARM_KEY, DUMMY_NAME_1) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_1]) + assert ops_test.model.applications[CHARM_KEY].status == "active" + assert ops_test.model.applications[DUMMY_NAME_1].status == "active" + + # implicitly tests setting of kafka app data + returned_usernames, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name + ) + usernames.update(returned_usernames) + + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) + + for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): + assert acl.username in usernames + assert acl.operation in ["READ", "DESCRIBE"] + assert acl.resource_type in ["GROUP", "TOPIC"] + if acl.resource_type == "TOPIC": + assert acl.resource_name == "test-topic" + + +@pytest.mark.abort_on_fail +async def test_deploy_multiple_charms_same_topic_relate_active(ops_test: OpsTest, usernames): + appii_charm = await ops_test.build_charm("tests/integration/app-charm") + await ops_test.model.deploy(appii_charm, application_name=DUMMY_NAME_2, num_units=1), + await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2]) + await ops_test.model.add_relation(CHARM_KEY, DUMMY_NAME_2) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) + assert ops_test.model.applications[CHARM_KEY].status == "active" + assert ops_test.model.applications[DUMMY_NAME_1].status == "active" + assert ops_test.model.applications[DUMMY_NAME_2].status == "active" + + returned_usernames, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name + ) + usernames.update(returned_usernames) + + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) + + for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): + assert acl.username in usernames + assert acl.operation in ["READ", "DESCRIBE"] + assert acl.resource_type in ["GROUP", "TOPIC"] + if acl.resource_type == "TOPIC": + assert acl.resource_name == "test-topic" + + +@pytest.mark.abort_on_fail +async def test_remove_application_removes_user_and_acls(ops_test: OpsTest, usernames): + await ops_test.model.remove_application(DUMMY_NAME_1, block_until_done=True) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY]) + assert ops_test.model.applications[CHARM_KEY].status == "active" + + _, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name + ) + + # checks that old users are removed from active cluster ACLs + acls = load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri) + acl_usernames = set() + for acl in acls: + acl_usernames.add(acl.username) + + assert acl_usernames != usernames + + # checks that past usernames no longer exist in ZooKeeper + with pytest.raises(AssertionError): + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) + + +@pytest.mark.abort_on_fail +async def test_change_client_topic(ops_test: OpsTest): + action = await ops_test.model.units.get(f"{DUMMY_NAME_2}/0").run_action("change-topic") + await action.wait() + + assert ops_test.model.applications[CHARM_KEY].status == "active" + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) + + _, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name + ) + + for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): + if acl.resource_type == "TOPIC": + assert acl.resource_name == "test-topic-changed" + + +@pytest.mark.abort_on_fail +async def test_admin_added_to_super_users(ops_test: OpsTest): + # ensures only broker user for now + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 1 + + action = await ops_test.model.units.get(f"{DUMMY_NAME_2}/0").run_action("make-admin") + await action.wait() + + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) + assert ops_test.model.applications[CHARM_KEY].status == "active" + + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 2 + + +@pytest.mark.abort_on_fail +async def test_admin_removed_from_super_users(ops_test: OpsTest): + action = await ops_test.model.units.get(f"{DUMMY_NAME_2}/0").run_action("remove-admin") + await action.wait() + + time.sleep(20) + + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) + assert ops_test.model.applications[CHARM_KEY].status == "active" + + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 1 diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py new file mode 100644 index 00000000..8cd92f23 --- /dev/null +++ b/tests/integration/test_scaling.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import time + +import pytest +from pytest_operator.plugin import OpsTest + +from literals import CHARM_KEY, ZK +from tests.integration.helpers import get_kafka_zk_relation_data +from utils import get_active_brokers + +logger = logging.getLogger(__name__) + + +@pytest.mark.abort_on_fail +async def test_kafka_simple_scale_up(ops_test: OpsTest): + kafka_charm = await ops_test.build_charm(".") + + await asyncio.gather( + ops_test.model.deploy("zookeeper-k8s", application_name=ZK, num_units=1), + ops_test.model.deploy( + kafka_charm, + application_name=CHARM_KEY, + num_units=1, + resources={"kafka-image": "ubuntu/kafka:latest"}, + ), + ) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZK]) + await ops_test.model.add_relation(CHARM_KEY, ZK) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZK]) + assert ops_test.model.applications[ZK].status == "active" + assert ops_test.model.applications[CHARM_KEY].status == "active" + + await ops_test.model.applications[CHARM_KEY].add_units(count=2) + await ops_test.model.block_until( + lambda: len(ops_test.model.applications[CHARM_KEY].units) == 3 + ) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY], status="active", timeout=1000) + + kafka_zk_relation_data = get_kafka_zk_relation_data( + unit_name=f"{CHARM_KEY}/2", model_full_name=ops_test.model_full_name + ) + active_brokers = get_active_brokers(zookeeper_config=kafka_zk_relation_data) + chroot = kafka_zk_relation_data.get("chroot", "") + assert f"{chroot}/brokers/ids/0" in active_brokers + assert f"{chroot}/brokers/ids/1" in active_brokers + assert f"{chroot}/brokers/ids/2" in active_brokers + + +@pytest.mark.abort_on_fail +async def test_kafka_simple_scale_down(ops_test: OpsTest): + + await ops_test.model.applications[CHARM_KEY].destroy_units(f"{CHARM_KEY}/1") + await ops_test.model.block_until( + lambda: len(ops_test.model.applications[CHARM_KEY].units) == 2 + ) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY], status="active", timeout=1000) + + time.sleep(30) + + kafka_zk_relation_data = get_kafka_zk_relation_data( + unit_name=f"{CHARM_KEY}/2", model_full_name=ops_test.model_full_name + ) + active_brokers = get_active_brokers(zookeeper_config=kafka_zk_relation_data) + chroot = kafka_zk_relation_data.get("chroot", "") + assert f"{chroot}/brokers/ids/0" in active_brokers + assert f"{chroot}/brokers/ids/1" not in active_brokers + assert f"{chroot}/brokers/ids/2" in active_brokers diff --git a/tests/unit/test_auth.py b/tests/unit/test_auth.py new file mode 100644 index 00000000..4817e532 --- /dev/null +++ b/tests/unit/test_auth.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +from auth import Acl, KafkaAuth + + +def test_acl(): + assert sorted(list(Acl.__annotations__.keys())) == sorted( + ["operation", "resource_name", "resource_type", "username"] + ) + assert Acl.__hash__ + + +def test_parse_acls(): + acls = """ + Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=relation-81-*, patternType=LITERAL)`: + (principal=User:relation-81, host=*, operation=READ, permissionType=ALLOW) + + Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: + (principal=User:relation-81, host=*, operation=WRITE, permissionType=ALLOW) + (principal=User:relation-81, host=*, operation=CREATE, permissionType=ALLOW) + (principal=User:relation-81, host=*, operation=DESCRIBE, permissionType=ALLOW) + (principal=User:relation-81, host=*, operation=READ, permissionType=ALLOW) + """ + + parsed_acls = KafkaAuth._parse_acls(acls=acls) + + assert len(parsed_acls) == 5 + assert type(list(parsed_acls)[0]) == Acl + + +def test_generate_producer_acls(): + generated_acls = KafkaAuth._generate_producer_acls(topic="theonering", username="frodo") + assert len(generated_acls) == 3 + + operations = set() + resource_types = set() + for acl in generated_acls: + operations.add(acl.operation) + resource_types.add(acl.resource_type) + + assert sorted(operations) == sorted(set(["CREATE", "WRITE", "DESCRIBE"])) + assert resource_types == {"TOPIC"} + + +def test_generate_consumer_acls(): + generated_acls = KafkaAuth._generate_consumer_acls(topic="theonering", username="frodo") + assert len(generated_acls) == 3 + + operations = set() + resource_types = set() + for acl in generated_acls: + operations.add(acl.operation) + resource_types.add(acl.resource_type) + + if acl.resource_type == "GROUP": + assert acl.operation == "READ" + + assert sorted(operations) == sorted(set(["READ", "DESCRIBE"])) + assert sorted(resource_types) == sorted(set(["TOPIC", "GROUP"])) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 3e71e6ae..cd283052 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -2,9 +2,8 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. -import unittest - import ops.testing +import pytest from ops.testing import Harness from charm import KafkaK8sCharm @@ -12,40 +11,136 @@ ops.testing.SIMULATE_CAN_CONNECT = True -class TestKafkaConfig(unittest.TestCase): - def setUp(self): - self.harness = Harness(KafkaK8sCharm) - self.addCleanup(self.harness.cleanup) - self.harness.begin_with_initial_hooks() - self.relation_id = self.harness.add_relation("zookeeper", "kafka-k8s") - - def test_zookeeper_config_succeeds_fails_config(self): - self.harness.update_relation_data( - self.relation_id, - self.harness.charm.app.name, - { - "chroot": "/kafka", - "username": "moria", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181,2.2.2.2:2181/kafka", - }, - ) - self.assertDictEqual(self.harness.charm.kafka_config.zookeeper_config, {}) - - def test_zookeeper_config_succeeds_valid_config(self): - self.harness.update_relation_data( - self.relation_id, - self.harness.charm.app.name, - { - "chroot": "/kafka", - "username": "moria", - "password": "mellon", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", - }, - ) - self.assertIn("connect", self.harness.charm.kafka_config.zookeeper_config.keys()) - self.assertEqual( - self.harness.charm.kafka_config.zookeeper_config["connect"], - "1.1.1.1:2181,2.2.2.2:2181/kafka", - ) +@pytest.fixture(scope="function") +def harness(): + harness = Harness(KafkaK8sCharm) + harness.begin_with_initial_hooks() + return harness + + +@pytest.fixture(scope="function") +def zk_relation_id(harness): + relation_id = harness.add_relation("zookeeper", "kafka-k8s") + return relation_id + + +def test_zookeeper_config_succeeds_fails_config(zk_relation_id, harness): + harness.update_relation_data( + zk_relation_id, + harness.charm.app.name, + { + "chroot": "/kafka", + "username": "moria", + "endpoints": "1.1.1.1,2.2.2.2", + "uris": "1.1.1.1:2181,2.2.2.2:2181/kafka", + }, + ) + assert harness.charm.kafka_config.zookeeper_config == {} + assert not harness.charm.kafka_config.zookeeper_connected + + +def test_zookeeper_config_succeeds_valid_config(zk_relation_id, harness): + harness.update_relation_data( + zk_relation_id, + harness.charm.app.name, + { + "chroot": "/kafka", + "username": "moria", + "password": "mellon", + "endpoints": "1.1.1.1,2.2.2.2", + "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", + }, + ) + assert "connect" in harness.charm.kafka_config.zookeeper_config + assert ( + harness.charm.kafka_config.zookeeper_config["connect"] == "1.1.1.1:2181,2.2.2.2:2181/kafka" + ) + assert harness.charm.kafka_config.zookeeper_connected + + +def test_extra_args(harness): + args = harness.charm.kafka_config.extra_args + assert "-Djava.security.auth.login.config" in args + + +def test_bootstrap_server(harness): + peer_relation_id = harness.charm.model.get_relation("cluster").id + harness.add_relation_unit(peer_relation_id, "kafka/1") + + assert len(harness.charm.kafka_config.bootstrap_server) == 2 + for server in harness.charm.kafka_config.bootstrap_server: + assert "9092" in server + + +def test_default_replication_properties_less_than_three(harness): + assert "num.partitions=1" in harness.charm.kafka_config.default_replication_properties + assert ( + "default.replication.factor=1" in harness.charm.kafka_config.default_replication_properties + ) + assert "min.insync.replicas=1" in harness.charm.kafka_config.default_replication_properties + + +def test_default_replication_properties_more_than_three(harness): + peer_relation_id = harness.charm.model.get_relation("cluster").id + harness.add_relation_unit(peer_relation_id, "kafka/1") + harness.add_relation_unit(peer_relation_id, "kafka/2") + harness.add_relation_unit(peer_relation_id, "kafka/3") + harness.add_relation_unit(peer_relation_id, "kafka/4") + harness.add_relation_unit(peer_relation_id, "kafka/5") + + assert "num.partitions=3" in harness.charm.kafka_config.default_replication_properties + assert ( + "default.replication.factor=3" in harness.charm.kafka_config.default_replication_properties + ) + assert "min.insync.replicas=2" in harness.charm.kafka_config.default_replication_properties + + +def test_auth_properties(zk_relation_id, harness): + peer_relation_id = harness.charm.model.get_relation("cluster").id + harness.update_relation_data( + peer_relation_id, harness.charm.app.name, {"sync_password": "mellon"} + ) + harness.update_relation_data( + zk_relation_id, + harness.charm.app.name, + { + "chroot": "/kafka", + "username": "moria", + "password": "mellon", + "endpoints": "1.1.1.1,2.2.2.2", + "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", + }, + ) + + assert "broker.id=0" in harness.charm.kafka_config.auth_properties + assert ( + f"zookeeper.connect={harness.charm.kafka_config.zookeeper_config['connect']}" + in harness.charm.kafka_config.auth_properties + ) + assert ( + 'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="mellon";' + in harness.charm.kafka_config.auth_properties + ) + + +def test_super_users(harness): + assert len(harness.charm.kafka_config.super_users.split(";")) == 1 + + client_relation_id = harness.add_relation("kafka-client", "app") + harness.update_relation_data(client_relation_id, "app", {"extra-user-roles": "admin,producer"}) + client_relation_id = harness.add_relation("kafka-client", "appii") + harness.update_relation_data( + client_relation_id, "appii", {"extra-user-roles": "admin,consumer"} + ) + + peer_relation_id = harness.charm.model.get_relation("cluster").id + + harness.update_relation_data( + peer_relation_id, harness.charm.app.name, {"relation-2": "mellon"} + ) + assert len(harness.charm.kafka_config.super_users.split(";")) == 2 + + harness.update_relation_data( + peer_relation_id, harness.charm.app.name, {"relation-3": "mellon"} + ) + assert len(harness.charm.kafka_config.super_users.split(";")) == 3 diff --git a/tests/unit/test_provider.py b/tests/unit/test_provider.py deleted file mode 100644 index 81621a27..00000000 --- a/tests/unit/test_provider.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -import logging -import unittest -from collections import namedtuple - -import ops.testing -from ops.charm import CharmBase -from ops.testing import Harness - -from provider import KafkaProvider - -ops.testing.SIMULATE_CAN_CONNECT = True - -logger = logging.getLogger(__name__) - -METADATA = """ - name: kafka - peers: - cluster: - interface: cluster - provides: - kafka: - interface: kafka -""" - -CustomRelation = namedtuple("Relation", ["id"]) - - -class DummyKafkaCharm(CharmBase): - def __init__(self, *args): - super().__init__(*args) - self.client_relation = KafkaProvider(self) - - -class TestProvider(unittest.TestCase): - def setUp(self): - self.harness = Harness(DummyKafkaCharm, meta=METADATA) - self.addCleanup(self.harness.cleanup) - self.harness.begin_with_initial_hooks() - - @property - def provider(self): - return self.harness.charm.client_relation - - def test_relation_config_new_relation_no_password(self): - self.harness.set_leader(True) - relation_id = self.harness.add_relation("kafka", "client_app") - - config = self.harness.charm.client_relation.relation_config( - relation=self.harness.charm.model.get_relation( - relation_name="kafka", relation_id=relation_id - ) - ) - - self.assertEqual(sorted(["endpoints", "password", "username"]), sorted(config.keys())) - self.assertEqual(sorted(config["endpoints"].split(",")), ["kafka-0.kafka-endpoints"]) - self.assertEqual(len(config["password"]), 32) - - def test_relation_config_existing_relation_password(self): - self.harness.set_leader(True) - relation_id = self.harness.add_relation("kafka", "client_app") - self.harness.update_relation_data( - self.harness.charm.model.get_relation("cluster").id, - "kafka", - {"relation-1": "keepitsecret"}, - ) - - config = self.harness.charm.client_relation.relation_config( - relation=self.harness.charm.model.get_relation( - relation_name="kafka", relation_id=relation_id - ) - ) - - self.assertEqual(config["password"], "keepitsecret") From 0d8296beafd906c087a61416ec72ec6aa8037419 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Mon, 5 Sep 2022 14:04:24 +0100 Subject: [PATCH 14/17] style: fix typos/docstrings --- src/provider.py | 2 +- tests/integration/app-charm/actions.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/provider.py b/src/provider.py index f7203d57..70811fff 100644 --- a/src/provider.py +++ b/src/provider.py @@ -67,7 +67,7 @@ def provider_relation_config(self, event: RelationEvent) -> Dict[str, str]: event: the event needing config Returns: - Dict of `username`, `password` and `endpoints` data for the related app + Dict of `username`, `password`, `endpoints`, `uris` and `consumer-group-prefix` """ relation = event.relation diff --git a/tests/integration/app-charm/actions.yaml b/tests/integration/app-charm/actions.yaml index 5b694b27..24630763 100644 --- a/tests/integration/app-charm/actions.yaml +++ b/tests/integration/app-charm/actions.yaml @@ -5,7 +5,7 @@ make-admin: description: Adds admin to relation data remove-admin: - description: Adds admin to relation data + description: Removes admin from relation data change-topic: description: Changes topic From 0d74fb897ffc1f1a38ce2b0b48fb30f34b387fe8 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Tue, 6 Sep 2022 21:58:25 +0100 Subject: [PATCH 15/17] fix: fix failing int tests --- tests/integration/helpers.py | 4 +++- tests/integration/test_provider.py | 2 +- tests/integration/test_scaling.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index d9158c69..e79b08fa 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. + import re from pathlib import Path from subprocess import PIPE, check_output @@ -19,8 +20,9 @@ def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]: + container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeeper_uri} --list" result = check_output( - f"JUJU_MODEL={model_full_name} juju ssh {CHARM_KEY}/0 'kafka.acls --authorizer-properties zookeeper.connect={zookeeper_uri} --list'", + f"JUJU_MODEL={model_full_name} juju ssh --container kafka kafka-k8s/0 '{container_command}'", stderr=PIPE, shell=True, universal_newlines=True, diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py index 5d24ee59..435219a7 100644 --- a/tests/integration/test_provider.py +++ b/tests/integration/test_provider.py @@ -34,7 +34,7 @@ async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): app_charm = await ops_test.build_charm("tests/integration/app-charm") await asyncio.gather( - ops_test.model.deploy("zookeeper-k8s", channel="edge", application_name=ZK, num_units=1), + ops_test.model.deploy("zookeeper-k8s", channel="edge", application_name=ZK, num_units=3), ops_test.model.deploy( kafka_charm, application_name=CHARM_KEY, diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py index 8cd92f23..870d223f 100644 --- a/tests/integration/test_scaling.py +++ b/tests/integration/test_scaling.py @@ -21,7 +21,7 @@ async def test_kafka_simple_scale_up(ops_test: OpsTest): kafka_charm = await ops_test.build_charm(".") await asyncio.gather( - ops_test.model.deploy("zookeeper-k8s", application_name=ZK, num_units=1), + ops_test.model.deploy("zookeeper-k8s", application_name=ZK, num_units=3), ops_test.model.deploy( kafka_charm, application_name=CHARM_KEY, From 4c56262d75cdf0d789337c1776e263c35a0bb6bb Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Wed, 7 Sep 2022 16:17:34 +0100 Subject: [PATCH 16/17] fix: remove stale provider tests --- tests/integration/test_kafka_provider.py | 108 ----------------------- tests/integration/test_provider.py | 10 +-- tests/integration/test_scaling.py | 12 +-- 3 files changed, 11 insertions(+), 119 deletions(-) delete mode 100644 tests/integration/test_kafka_provider.py diff --git a/tests/integration/test_kafka_provider.py b/tests/integration/test_kafka_provider.py deleted file mode 100644 index 7c15b87c..00000000 --- a/tests/integration/test_kafka_provider.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -import asyncio -import logging - -import pytest -from helpers import ( - APP_NAME, - KAFKA_CONTAINER, - ZK_NAME, - check_user, - get_zookeeper_connection, -) -from pytest_operator.plugin import OpsTest - -logger = logging.getLogger(__name__) - -DUMMY_NAME_1 = "app" -DUMMY_NAME_2 = "appii" - - -@pytest.fixture(scope="module") -def usernames(): - return set() - - -@pytest.mark.abort_on_fail -async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): - kafka_charm = await ops_test.build_charm(".") - app_charm = await ops_test.build_charm("tests/integration/app-charm") - - await asyncio.gather( - ops_test.model.deploy( - "zookeeper-k8s", channel="edge", application_name=ZK_NAME, num_units=3 - ), - ops_test.model.deploy( - kafka_charm, - application_name=APP_NAME, - num_units=1, - resources={"kafka-image": KAFKA_CONTAINER}, - ), - ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), - ) - await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3) - await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1, ZK_NAME]) - - await ops_test.model.add_relation(APP_NAME, ZK_NAME) - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) - - await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_1) - await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) - - assert ops_test.model.applications[APP_NAME].status == "active" - assert ops_test.model.applications[DUMMY_NAME_1].status == "active" - - # implicitly tests setting of kafka app data - returned_usernames, zookeeper_uri = get_zookeeper_connection( - unit_name="kafka-k8s/0", model_full_name=ops_test.model_full_name - ) - usernames.update(returned_usernames) - - for username in usernames: - check_user( - username=username, - zookeeper_uri=zookeeper_uri, - model_full_name=ops_test.model_full_name, - ) - - -@pytest.mark.abort_on_fail -async def test_deploy_multiple_charms_relate_active(ops_test: OpsTest, usernames): - appii_charm = await ops_test.build_charm("tests/integration/app-charm") - await ops_test.model.deploy(appii_charm, application_name=DUMMY_NAME_2, num_units=1), - await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2]) - await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_2) - await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_2]) - - assert ops_test.model.applications[APP_NAME].status == "active" - assert ops_test.model.applications[DUMMY_NAME_1].status == "active" - assert ops_test.model.applications[DUMMY_NAME_2].status == "active" - - returned_usernames, zookeeper_uri = get_zookeeper_connection( - unit_name="kafka-k8s/0", model_full_name=ops_test.model_full_name - ) - usernames.update(returned_usernames) - - for username in usernames: - check_user( - username=username, - zookeeper_uri=zookeeper_uri, - model_full_name=ops_test.model_full_name, - ) - - -@pytest.mark.abort_on_fail -async def test_remove_application_removes_user(ops_test: OpsTest, usernames): - await ops_test.model.applications[DUMMY_NAME_1].remove() - await ops_test.model.applications[DUMMY_NAME_2].remove() - await ops_test.model.wait_for_idle(apps=[APP_NAME]) - assert ops_test.model.applications[APP_NAME].status == "active" - - # checks that past usernames no longer exist in ZooKeeper - with pytest.raises(Exception): - _, _ = get_zookeeper_connection( - unit_name="kafka-k8s/0", model_full_name=ops_test.model_full_name - ) diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py index 435219a7..8dc573f9 100644 --- a/tests/integration/test_provider.py +++ b/tests/integration/test_provider.py @@ -9,7 +9,7 @@ import pytest from pytest_operator.plugin import OpsTest -from literals import CHARM_KEY, ZK +from literals import CHARM_KEY, ZOOKEEPER_REL_NAME from tests.integration.helpers import ( check_user, get_zookeeper_connection, @@ -34,7 +34,7 @@ async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): app_charm = await ops_test.build_charm("tests/integration/app-charm") await asyncio.gather( - ops_test.model.deploy("zookeeper-k8s", channel="edge", application_name=ZK, num_units=3), + ops_test.model.deploy("zookeeper-k8s", channel="edge", application_name=ZOOKEEPER_REL_NAME, num_units=3), ops_test.model.deploy( kafka_charm, application_name=CHARM_KEY, @@ -43,9 +43,9 @@ async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): ), ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), ) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_1, ZK]) - await ops_test.model.add_relation(CHARM_KEY, ZK) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZK]) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_1, ZOOKEEPER_REL_NAME]) + await ops_test.model.add_relation(CHARM_KEY, ZOOKEEPER_REL_NAME) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZOOKEEPER_REL_NAME]) await ops_test.model.add_relation(CHARM_KEY, DUMMY_NAME_1) await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_1]) assert ops_test.model.applications[CHARM_KEY].status == "active" diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py index 870d223f..cebc334c 100644 --- a/tests/integration/test_scaling.py +++ b/tests/integration/test_scaling.py @@ -9,7 +9,7 @@ import pytest from pytest_operator.plugin import OpsTest -from literals import CHARM_KEY, ZK +from literals import CHARM_KEY, ZOOKEEPER_REL_NAME from tests.integration.helpers import get_kafka_zk_relation_data from utils import get_active_brokers @@ -21,7 +21,7 @@ async def test_kafka_simple_scale_up(ops_test: OpsTest): kafka_charm = await ops_test.build_charm(".") await asyncio.gather( - ops_test.model.deploy("zookeeper-k8s", application_name=ZK, num_units=3), + ops_test.model.deploy("zookeeper-k8s", application_name=ZOOKEEPER_REL_NAME, num_units=1), ops_test.model.deploy( kafka_charm, application_name=CHARM_KEY, @@ -29,10 +29,10 @@ async def test_kafka_simple_scale_up(ops_test: OpsTest): resources={"kafka-image": "ubuntu/kafka:latest"}, ), ) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZK]) - await ops_test.model.add_relation(CHARM_KEY, ZK) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZK]) - assert ops_test.model.applications[ZK].status == "active" + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZOOKEEPER_REL_NAME]) + await ops_test.model.add_relation(CHARM_KEY, ZOOKEEPER_REL_NAME) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZOOKEEPER_REL_NAME]) + assert ops_test.model.applications[ZOOKEEPER_REL_NAME].status == "active" assert ops_test.model.applications[CHARM_KEY].status == "active" await ops_test.model.applications[CHARM_KEY].add_units(count=2) From c2e73b5e658f189e242c3b67135668d125faf5c3 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Wed, 7 Sep 2022 18:23:10 +0100 Subject: [PATCH 17/17] fix: refix failing int-tests, skip scaling --- tests/integration/helpers.py | 3 +- tests/integration/test_provider.py | 144 +++++++++++++++-------------- tests/integration/test_scaling.py | 2 + 3 files changed, 78 insertions(+), 71 deletions(-) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index e79b08fa..4baef3c2 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -11,7 +11,6 @@ from pytest_operator.plugin import OpsTest from auth import Acl, KafkaAuth -from literals import CHARM_KEY METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) KAFKA_CONTAINER = METADATA["resources"]["kafka-image"]["upstream-source"] @@ -33,7 +32,7 @@ def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]: def load_super_users(model_full_name: str) -> List[str]: result = check_output( - f"JUJU_MODEL={model_full_name} juju ssh {CHARM_KEY}/0 'cat /data/kafka/server.properties'", + f"JUJU_MODEL={model_full_name} juju ssh --container kafka {APP_NAME}/0 'cat /data/kafka/config/server.properties'", stderr=PIPE, shell=True, universal_newlines=True, diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py index 8dc573f9..951e2efb 100644 --- a/tests/integration/test_provider.py +++ b/tests/integration/test_provider.py @@ -9,8 +9,10 @@ import pytest from pytest_operator.plugin import OpsTest -from literals import CHARM_KEY, ZOOKEEPER_REL_NAME from tests.integration.helpers import ( + APP_NAME, + KAFKA_CONTAINER, + ZK_NAME, check_user, get_zookeeper_connection, load_acls, @@ -34,26 +36,29 @@ async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): app_charm = await ops_test.build_charm("tests/integration/app-charm") await asyncio.gather( - ops_test.model.deploy("zookeeper-k8s", channel="edge", application_name=ZOOKEEPER_REL_NAME, num_units=3), + ops_test.model.deploy( + "zookeeper-k8s", channel="edge", application_name=ZK_NAME, num_units=3 + ), ops_test.model.deploy( kafka_charm, - application_name=CHARM_KEY, + application_name=APP_NAME, num_units=1, - resources={"kafka-image": "ubuntu/kafka:latest"}, + resources={"kafka-image": KAFKA_CONTAINER}, ), ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), ) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_1, ZOOKEEPER_REL_NAME]) - await ops_test.model.add_relation(CHARM_KEY, ZOOKEEPER_REL_NAME) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZOOKEEPER_REL_NAME]) - await ops_test.model.add_relation(CHARM_KEY, DUMMY_NAME_1) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_1]) - assert ops_test.model.applications[CHARM_KEY].status == "active" + await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1, ZK_NAME]) + await ops_test.model.add_relation(APP_NAME, ZK_NAME) + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) + await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_1) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[DUMMY_NAME_1].status == "active" # implicitly tests setting of kafka app data returned_usernames, zookeeper_uri = get_zookeeper_connection( - unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name ) usernames.update(returned_usernames) @@ -72,19 +77,68 @@ async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): assert acl.resource_name == "test-topic" +@pytest.mark.abort_on_fail +async def test_change_client_topic(ops_test: OpsTest): + action = await ops_test.model.units.get(f"{DUMMY_NAME_1}/0").run_action("change-topic") + await action.wait() + + assert ops_test.model.applications[APP_NAME].status == "active" + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + + _, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + + for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): + if acl.resource_type == "TOPIC": + assert acl.resource_name == "test-topic-changed" + + +@pytest.mark.abort_on_fail +async def test_admin_added_to_super_users(ops_test: OpsTest): + # ensures only broker user for now + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 1 + + action = await ops_test.model.units.get(f"{DUMMY_NAME_1}/0").run_action("make-admin") + await action.wait() + + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + assert ops_test.model.applications[APP_NAME].status == "active" + + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 2 + + +@pytest.mark.abort_on_fail +async def test_admin_removed_from_super_users(ops_test: OpsTest): + action = await ops_test.model.units.get(f"{DUMMY_NAME_1}/0").run_action("remove-admin") + await action.wait() + + time.sleep(20) + + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + assert ops_test.model.applications[APP_NAME].status == "active" + + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 1 + + @pytest.mark.abort_on_fail async def test_deploy_multiple_charms_same_topic_relate_active(ops_test: OpsTest, usernames): appii_charm = await ops_test.build_charm("tests/integration/app-charm") await ops_test.model.deploy(appii_charm, application_name=DUMMY_NAME_2, num_units=1), - await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2]) - await ops_test.model.add_relation(CHARM_KEY, DUMMY_NAME_2) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) - assert ops_test.model.applications[CHARM_KEY].status == "active" + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2], timeout=1000) + await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_2) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_2], timeout=1000) + + assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[DUMMY_NAME_1].status == "active" assert ops_test.model.applications[DUMMY_NAME_2].status == "active" returned_usernames, zookeeper_uri = get_zookeeper_connection( - unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name ) usernames.update(returned_usernames) @@ -97,20 +151,19 @@ async def test_deploy_multiple_charms_same_topic_relate_active(ops_test: OpsTest for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): assert acl.username in usernames - assert acl.operation in ["READ", "DESCRIBE"] - assert acl.resource_type in ["GROUP", "TOPIC"] if acl.resource_type == "TOPIC": - assert acl.resource_name == "test-topic" + assert acl.resource_name in ["test-topic", "test-topic-changed"] +@pytest.mark.skip # skip until scaling operations work in MicroK8s @pytest.mark.abort_on_fail async def test_remove_application_removes_user_and_acls(ops_test: OpsTest, usernames): await ops_test.model.remove_application(DUMMY_NAME_1, block_until_done=True) - await ops_test.model.wait_for_idle(apps=[CHARM_KEY]) - assert ops_test.model.applications[CHARM_KEY].status == "active" + await ops_test.model.wait_for_idle(apps=[APP_NAME]) + assert ops_test.model.applications[APP_NAME].status == "active" _, zookeeper_uri = get_zookeeper_connection( - unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name ) # checks that old users are removed from active cluster ACLs @@ -129,50 +182,3 @@ async def test_remove_application_removes_user_and_acls(ops_test: OpsTest, usern zookeeper_uri=zookeeper_uri, model_full_name=ops_test.model_full_name, ) - - -@pytest.mark.abort_on_fail -async def test_change_client_topic(ops_test: OpsTest): - action = await ops_test.model.units.get(f"{DUMMY_NAME_2}/0").run_action("change-topic") - await action.wait() - - assert ops_test.model.applications[CHARM_KEY].status == "active" - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) - - _, zookeeper_uri = get_zookeeper_connection( - unit_name=f"{CHARM_KEY}/0", model_full_name=ops_test.model_full_name - ) - - for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): - if acl.resource_type == "TOPIC": - assert acl.resource_name == "test-topic-changed" - - -@pytest.mark.abort_on_fail -async def test_admin_added_to_super_users(ops_test: OpsTest): - # ensures only broker user for now - super_users = load_super_users(model_full_name=ops_test.model_full_name) - assert len(super_users) == 1 - - action = await ops_test.model.units.get(f"{DUMMY_NAME_2}/0").run_action("make-admin") - await action.wait() - - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) - assert ops_test.model.applications[CHARM_KEY].status == "active" - - super_users = load_super_users(model_full_name=ops_test.model_full_name) - assert len(super_users) == 2 - - -@pytest.mark.abort_on_fail -async def test_admin_removed_from_super_users(ops_test: OpsTest): - action = await ops_test.model.units.get(f"{DUMMY_NAME_2}/0").run_action("remove-admin") - await action.wait() - - time.sleep(20) - - await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME_2]) - assert ops_test.model.applications[CHARM_KEY].status == "active" - - super_users = load_super_users(model_full_name=ops_test.model_full_name) - assert len(super_users) == 1 diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py index cebc334c..6145805a 100644 --- a/tests/integration/test_scaling.py +++ b/tests/integration/test_scaling.py @@ -16,6 +16,7 @@ logger = logging.getLogger(__name__) +@pytest.mark.skip # skip until scaling operations work in MicroK8s @pytest.mark.abort_on_fail async def test_kafka_simple_scale_up(ops_test: OpsTest): kafka_charm = await ops_test.build_charm(".") @@ -51,6 +52,7 @@ async def test_kafka_simple_scale_up(ops_test: OpsTest): assert f"{chroot}/brokers/ids/2" in active_brokers +@pytest.mark.skip # skip until scaling operations work in MicroK8s @pytest.mark.abort_on_fail async def test_kafka_simple_scale_down(ops_test: OpsTest):