diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 40b9cbee..89055769 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -46,4 +46,4 @@ jobs: # This is needed until https://bugs.launchpad.net/juju/+bug/1977582 is fixed bootstrap-options: "--agent-version 2.9.29" - name: Run integration tests - run: tox -e integration \ No newline at end of file + run: tox -e integration diff --git a/config.yaml b/config.yaml index 2b79250a..f76b8bc6 100644 --- a/config.yaml +++ b/config.yaml @@ -22,87 +22,3 @@ options: description: enables auto creation of topic on the server type: boolean default: false - - # log.dirs=/var/snap/kafka/common/log - # - # # networking - # clientPort=2181 - # listeners=SASL_PLAINTEXT://:9092 - # - # # offsets - # offsets.topic.num.partitions=50 - # offsets.commit.required.acks=-1 - # offsets.retention.minutes=10080 - # - # # topic - # auto.leader.rebalance.enable=true - # # to be changed when necessary - # delete.topic.enable=true - # unclean.leader.election.enable=false - # auto.create.topics.enable=false - # # helpful - # group.initial.rebalance.delay.ms=3000 - # - # # auth - # sasl.enabled.mechanisms=SCRAM-SHA-512 - # sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 - # security.inter.broker.protocol=SASL_PLAINTEXT - # authorizer.class.name=kafka.security.authorizer.AclAuthorizer - # allow.everyone.if.no.acl.found=false - # super.users=User:sync - # listener.name.sasl_plaintext.sasl.enabled.mechanisms=SCRAM-SHA-512 - # # zookeeper.set.acl=true - - - - ## Backup - # background.threads=10 - # compression.type=producer - # leader.imbalance.check.interval.seconds=300 - # leader.imbalance.per.broker.percentage=10 - # log.retention.bytes=-1 - # log.roll.hours=168 - # log.roll.jitter.hours=0 - # log.segment.bytes=1073741824 - # log.segment.delete.delay.ms=60000 - # message.max.bytes=1000012 - # num.io.threads=8 - # num.network.threads=3 - # num.recovery.threads.per.data.dir=1 - # num.replica.fetchers=1 - # offset.metadata.max.bytes=4096 - # offsets.commit.timeout.ms=5000 - # offsets.load.buffer.size=5242880 - # offsets.retention.check.interval.ms=600000 - # offsets.topic.compression.codec=0 - # offsets.topic.segment.bytes=104857600 - # queued.max.requests=500 - # quota.consumer.default=9223372036854775807 - # quota.producer.default=9223372036854775807 - # replica.fetch.min.bytes=1 - # replica.fetch.wait.max.ms=500 - # replica.high.watermark.checkpoint.interval.ms=5000 - # replica.lag.time.max.ms=10000 - # replica.socket.receive.buffer.bytes=65536 - # replica.socket.timeout.ms=30000 - # request.timeout.ms=30000 - # socket.receive.buffer.bytes=102400 - # socket.request.max.bytes=104857600 - # socket.send.buffer.bytes=102400 - # zookeeper.session.timeout.ms=6000 - # connections.max.idle.ms=600000 - # controlled.shutdown.enable=true - # controlled.shutdown.max.retries=3 - # controlled.shutdown.retry.backoff.ms=5000 - # controller.socket.timeout.ms=30000 - # fetch.purgatory.purge.interval.requests=1000 - # group.max.session.timeout.ms=300000 - # group.min.session.timeout.ms=600 - # producer.purgatory.purge.interval.requests=1000 - # replica.fetch.backoff.ms=1000 - # replica.fetch.max.bytes=1048576 - # replica.fetch.response.max.bytes=10485760 - # reserved.broker.max.id=1000 - # num.partitions=1 - # group.initial.rebalance.delay.ms=0 - # zookeeper.connection.timeout.ms=18000 diff --git a/lib/charms/rolling_ops/v0/rollingops.py b/lib/charms/rolling_ops/v0/rollingops.py new file mode 100644 index 00000000..7f4bd1b8 --- /dev/null +++ b/lib/charms/rolling_ops/v0/rollingops.py @@ -0,0 +1,390 @@ +# Copyright 2022 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This library enables "rolling" operations across units of a charmed Application. + +For example, a charm author might use this library to implement a "rolling restart", in +which all units in an application restart their workload, but no two units execute the +restart at the same time. + +To implement the rolling restart, a charm author would do the following: + +1. Add a peer relation called 'restart' to a charm's `metadata.yaml`: +```yaml +peers: + restart: + interface: rolling_op +``` + +Import this library into src/charm.py, and initialize a RollingOpsManager in the Charm's +`__init__`. The Charm should also define a callback routine, which will be executed when +a unit holds the distributed lock: + +src/charm.py +```python +# ... +from charms.rolling_ops.v0.rollingops import RollingOpsManager +# ... +class SomeCharm(...): + def __init__(...) + # ... + self.restart_manager = RollingOpsManager( + charm=self, relation="restart", callback=self._restart + ) + # ... + def _restart(self, event): + systemd.service_restart('foo') +``` + +To kick off the rolling restart, emit this library's AcquireLock event. The simplest way +to do so would be with an action, though it might make sense to acquire the lock in +response to another event. + +```python + def _on_trigger_restart(self, event): + self.charm.on[self.restart_manager.name].acquire_lock.emit() +``` + +In order to trigger the restart, a human operator would execute the following command on +the CLI: + +``` +juju run-action some-charm/0 some-charm/1 <... some-charm/n> restart +``` + +Note that all units that plan to restart must receive the action and emit the aquire +event. Any units that do not run their acquire handler will be left out of the rolling +restart. (An operator might take advantage of this fact to recover from a failed rolling +operation without restarting workloads that were able to successfully restart -- simply +omit the successful units from a subsequent run-action call.) + +""" +import logging +from enum import Enum +from typing import AnyStr, Callable + +from ops.charm import ActionEvent, CharmBase, RelationChangedEvent +from ops.framework import EventBase, Object +from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "20b7777f58fe421e9a223aefc2b4d3a4" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 2 + + +class LockNoRelationError(Exception): + """Raised if we are trying to process a lock, but do not appear to have a relation yet.""" + + pass + + +class LockState(Enum): + """Possible states for our Distributed lock. + + Note that there are two states set on the unit, and two on the application. + + """ + + ACQUIRE = "acquire" + RELEASE = "release" + GRANTED = "granted" + IDLE = "idle" + + +class Lock: + """A class that keeps track of a single asynchronous lock. + + Warning: a Lock has permission to update relation data, which means that there are + side effects to invoking the .acquire, .release and .grant methods. Running any one of + them will trigger a RelationChanged event, once per transition from one internal + status to another. + + This class tracks state across the cloud by implementing a peer relation + interface. There are two parts to the interface: + + 1) The data on a unit's peer relation (defined in metadata.yaml.) Each unit can update + this data. The only meaningful values are "acquire", and "release", which represent + a request to acquire the lock, and a request to release the lock, respectively. + + 2) The application data in the relation. This tracks whether the lock has been + "granted", Or has been released (and reverted to idle). There are two valid states: + "granted" or None. If a lock is in the "granted" state, a unit should emit a + RunWithLocks event and then release the lock. + + If a lock is in "None", this means that a unit has not yet requested the lock, or + that the request has been completed. + + In more detail, here is the relation structure: + + relation.data: + : + status: 'acquire|release' + : + : 'granted|None' + + Note that this class makes no attempts to timestamp the locks and thus handle multiple + requests in a row. If a unit re-requests a lock before being granted the lock, the + lock will simply stay in the "acquire" state. If a unit wishes to clear its lock, it + simply needs to call lock.release(). + + """ + + def __init__(self, manager, unit=None): + + self.relation = manager.model.relations[manager.name][0] + if not self.relation: + # TODO: defer caller in this case (probably just fired too soon). + raise LockNoRelationError() + + self.unit = unit or manager.model.unit + self.app = manager.model.app + + @property + def _state(self) -> LockState: + """Return an appropriate state. + + Note that the state exists in the unit's relation data, and the application + relation data, so we have to be careful about what our states mean. + + Unit state can only be in "acquire", "release", "None" (None means unset) + Application state can only be in "granted" or "None" (None means unset or released) + + """ + unit_state = LockState(self.relation.data[self.unit].get("state", LockState.IDLE.value)) + app_state = LockState( + self.relation.data[self.app].get(str(self.unit), LockState.IDLE.value) + ) + + if app_state == LockState.GRANTED and unit_state == LockState.RELEASE: + # Active release request. + return LockState.RELEASE + + if app_state == LockState.IDLE and unit_state == LockState.ACQUIRE: + # Active acquire request. + return LockState.ACQUIRE + + return app_state # Granted or unset/released + + @_state.setter + def _state(self, state: LockState): + """Set the given state. + + Since we update the relation data, this may fire off a RelationChanged event. + """ + if state == LockState.ACQUIRE: + self.relation.data[self.unit].update({"state": state.value}) + + if state == LockState.RELEASE: + self.relation.data[self.unit].update({"state": state.value}) + + if state == LockState.GRANTED: + self.relation.data[self.app].update({str(self.unit): state.value}) + + if state is LockState.IDLE: + self.relation.data[self.app].update({str(self.unit): state.value}) + + def acquire(self): + """Request that a lock be acquired.""" + self._state = LockState.ACQUIRE + + def release(self): + """Request that a lock be released.""" + self._state = LockState.RELEASE + + def clear(self): + """Unset a lock.""" + self._state = LockState.IDLE + + def grant(self): + """Grant a lock to a unit.""" + self._state = LockState.GRANTED + + def is_held(self): + """This unit holds the lock.""" + return self._state == LockState.GRANTED + + def release_requested(self): + """A unit has reported that they are finished with the lock.""" + return self._state == LockState.RELEASE + + def is_pending(self): + """Is this unit waiting for a lock?""" + return self._state == LockState.ACQUIRE + + +class Locks: + """Generator that returns a list of locks.""" + + def __init__(self, manager): + self.manager = manager + + # Gather all the units. + relation = manager.model.relations[manager.name][0] + units = [unit for unit in relation.units] + + # Plus our unit ... + units.append(manager.model.unit) + + self.units = units + + def __iter__(self): + """Yields a lock for each unit we can find on the relation.""" + for unit in self.units: + yield Lock(self.manager, unit=unit) + + +class RunWithLock(EventBase): + """Event to signal that this unit should run the callback.""" + + pass + + +class AcquireLock(EventBase): + """Signals that this unit wants to acquire a lock.""" + + pass + + +class ProcessLocks(EventBase): + """Used to tell the leader to process all locks.""" + + pass + + +class RollingOpsManager(Object): + """Emitters and handlers for rolling ops.""" + + def __init__(self, charm: CharmBase, relation: AnyStr, callback: Callable): + """Register our custom events. + + params: + charm: the charm we are attaching this to. + relation: an identifier, by convention based on the name of the relation in the + metadata.yaml, which identifies this instance of RollingOperatorsFactory, + distinct from other instances that may be hanlding other events. + callback: a closure to run when we have a lock. (It must take a CharmBase object and + EventBase object as args.) + """ + # "Inherit" from the charm's class. This gives us access to the framework as + # self.framework, as well as the self.model shortcut. + super().__init__(charm, None) + + self.name = relation + self._callback = callback + self.charm = charm # Maintain a reference to charm, so we can emit events. + + charm.on.define_event("{}_run_with_lock".format(self.name), RunWithLock) + charm.on.define_event("{}_acquire_lock".format(self.name), AcquireLock) + charm.on.define_event("{}_process_locks".format(self.name), ProcessLocks) + + # Watch those events (plus the built in relation event). + self.framework.observe(charm.on[self.name].relation_changed, self._on_relation_changed) + self.framework.observe(charm.on[self.name].acquire_lock, self._on_acquire_lock) + self.framework.observe(charm.on[self.name].run_with_lock, self._on_run_with_lock) + self.framework.observe(charm.on[self.name].process_locks, self._on_process_locks) + + def _callback(self: CharmBase, event: EventBase) -> None: + """Placeholder for the function that actually runs our event. + + Usually overridden in the init. + """ + raise NotImplementedError + + def _on_relation_changed(self: CharmBase, event: RelationChangedEvent): + """Process relation changed. + + First, determine whether this unit has been granted a lock. If so, emit a RunWithLock + event. + + Then, if we are the leader, fire off a process locks event. + + """ + lock = Lock(self) + + if lock.is_pending(): + self.model.unit.status = WaitingStatus("Awaiting {} operation".format(self.name)) + + if lock.is_held(): + self.charm.on[self.name].run_with_lock.emit() + + if self.model.unit.is_leader(): + self.charm.on[self.name].process_locks.emit() + + def _on_process_locks(self: CharmBase, event: ProcessLocks): + """Process locks. + + Runs only on the leader. Updates the status of all locks. + + """ + if not self.model.unit.is_leader(): + return + + pending = [] + + for lock in Locks(self): + if lock.is_held(): + # One of our units has the lock -- return without further processing. + return + + if lock.release_requested(): + lock.clear() # Updates relation data + + if lock.is_pending(): + if lock.unit == self.model.unit: + # Always run on the leader last. + pending.insert(0, lock) + else: + pending.append(lock) + + # If we reach this point, and we have pending units, we want to grant a lock to + # one of them. + if pending: + self.model.app.status = MaintenanceStatus("Beginning rolling {}".format(self.name)) + lock = pending[-1] + lock.grant() + if lock.unit == self.model.unit: + # It's time for the leader to run with lock. + self.charm.on[self.name].run_with_lock.emit() + return + + self.model.app.status = ActiveStatus() + + def _on_acquire_lock(self: CharmBase, event: ActionEvent): + """Request a lock.""" + try: + Lock(self).acquire() # Updates relation data + # emit relation changed event in the edge case where aquire does not + relation = self.model.get_relation(self.name) + self.charm.on[self.name].relation_changed.emit(relation) + except LockNoRelationError: + logger.debug("No {} peer relation yet. Delaying rolling op.".format(self.name)) + event.defer() + + def _on_run_with_lock(self: CharmBase, event: RunWithLock): + lock = Lock(self) + self.model.unit.status = MaintenanceStatus("Executing {} operation".format(self.name)) + self._callback(event) + lock.release() # Updates relation data + if lock.unit == self.model.unit: + self.charm.on[self.name].process_locks.emit() + + self.model.unit.status = ActiveStatus() diff --git a/metadata.yaml b/metadata.yaml index 564a3a74..71713199 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -23,11 +23,13 @@ resources: peers: cluster: interface: cluster + restart: + interface: rolling_op requires: zookeeper: interface: zookeeper provides: - kafka: - interface: kafka + kafka-client: + interface: kafka_client diff --git a/src/auth.py b/src/auth.py new file mode 100644 index 00000000..21153bd3 --- /dev/null +++ b/src/auth.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Supporting objects for Kafka user and ACL management.""" + +import logging +import re +from dataclasses import asdict, dataclass +from typing import List, Optional, Set + +from ops.model import Container + +from utils import run_bin_command + +logger = logging.getLogger(__name__) + + +@dataclass(unsafe_hash=True) +class Acl: + """Convenience object for representing a Kafka ACL.""" + + resource_name: str + resource_type: str + operation: str + username: str + + +class KafkaAuth: + """Object for updating Kafka users and ACLs.""" + + def __init__(self, opts: List[str], zookeeper: str, container: Container): + self.opts = " ".join(opts) + self.zookeeper = zookeeper + self.container = container + self.current_acls: Set[Acl] = set() + self.new_user_acls: Set[Acl] = set() + + def _get_acls_from_cluster(self) -> str: + """Loads the currently active ACLs from the Kafka cluster.""" + command = [ + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", + "--list", + ] + acls = run_bin_command( + container=self.container, + bin_keyword="acls", + bin_args=command, + extra_args=self.opts, + ) + + return acls + + @staticmethod + def _parse_acls(acls: str) -> Set[Acl]: + """Parses output from raw ACLs provided by the cluster.""" + current_acls = set() + resource_type, name, user, operation = None, None, None, None + for line in acls.splitlines(): + resource_search = re.search(r"resourceType=([^\,]+),", line) + if resource_search: + resource_type = resource_search[1] + + name_search = re.search(r"name=([^\,]+),", line) + if name_search: + name = name_search[1] + + user_search = re.search(r"principal=User\:([^\,]+),", line) + if user_search: + user = user_search[1] + + operation_search = re.search(r"operation=([^\,]+),", line) + if operation_search: + operation = operation_search[1] + else: + continue + + if resource_type and name and user and operation: + current_acls.add( + Acl( + resource_type=resource_type, + resource_name=name, + username=user, + operation=operation, + ) + ) + + return current_acls + + def load_current_acls(self) -> None: + """Sets the current cluster ACLs to the instance state. + + State is set to `KafkaAuth.current_acls`. + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + acls = self._get_acls_from_cluster() + + self.current_acls = self._parse_acls(acls=acls) + + @staticmethod + def _generate_producer_acls(topic: str, username: str, **_) -> Set[Acl]: + """Generates expected set of `Acl`s for a producer client application.""" + producer_acls = set() + for operation in ["CREATE", "WRITE", "DESCRIBE"]: + producer_acls.add( + Acl( + resource_type="TOPIC", + resource_name=topic, + username=username, + operation=operation, + ) + ) + + return producer_acls + + @staticmethod + def _generate_consumer_acls( + topic: str, username: str, group: Optional[str] = None + ) -> Set[Acl]: + """Generates expected set of `Acl`s for a consumer client application.""" + group = group or f"{username}-" # not needed, just for safety + + consumer_acls = set() + for operation in ["READ", "DESCRIBE"]: + consumer_acls.add( + Acl( + resource_type="TOPIC", + resource_name=topic, + username=username, + operation=operation, + ) + ) + consumer_acls.add( + Acl( + resource_type="GROUP", + resource_name=group, + username=username, + operation="READ", + ) + ) + + return consumer_acls + + def add_user(self, username: str, password: str) -> None: + """Adds new user credentials to ZooKeeper. + + Args: + username: the user name to add + password: the user password + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + command = [ + f"--zookeeper={self.zookeeper}", + "--alter", + "--entity-type=users", + f"--entity-name={username}", + f"--add-config=SCRAM-SHA-512=[password={password}]", + ] + run_bin_command( + container=self.container, bin_keyword="configs", bin_args=command, extra_args=self.opts + ) + + def delete_user(self, username: str) -> None: + """Deletes user credentials from ZooKeeper. + + Args: + username: the user name to delete + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + command = [ + f"--zookeeper={self.zookeeper}", + "--alter", + "--entity-type=users", + f"--entity-name={username}", + "--delete-config=SCRAM-SHA-512", + ] + run_bin_command( + container=self.container, bin_keyword="configs", bin_args=command, extra_args=self.opts + ) + + def add_acl( + self, username: str, operation: str, resource_type: str, resource_name: str + ) -> None: + """Adds new ACL rule for the cluster. + + Consumer Group READ permissions are granted to a prefixed group based on the + given `username`. e.g `-` + + Args: + username: the user name to add ACLs for + operation: the operation to grant + e.g `READ`, `WRITE`, `DESCRIBE` + resource_type: the resource type to grant ACLs for + e.g `GROUP`, `TOPIC` + resource_name: the name of the resource to grant ACLs for + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + if resource_type == "TOPIC": + command = [ + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", + "--add", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--topic={resource_name}", + ] + run_bin_command( + container=self.container, + bin_keyword="acls", + bin_args=command, + extra_args=self.opts, + ) + + if resource_type == "GROUP": + command = [ + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", + "--add", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--group={resource_name}", + "--resource-pattern-type=PREFIXED", + ] + run_bin_command( + container=self.container, + bin_keyword="acls", + bin_args=command, + extra_args=self.opts, + ) + + def remove_acl( + self, username: str, operation: str, resource_type: str, resource_name: str + ) -> None: + """Removes ACL rule for the cluster. + + Args: + username: the user name to remove ACLs for + operation: the operation to remove + e.g `READ`, `WRITE`, `DESCRIBE` + resource_type: the resource type to remove ACLs for + e.g `GROUP`, `TOPIC` + resource_name: the name of the resource to remove ACLs for + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + if resource_type == "TOPIC": + command = [ + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", + "--remove", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--topic={resource_name}", + "--force", + ] + run_bin_command( + container=self.container, + bin_keyword="acls", + bin_args=command, + extra_args=self.opts, + ) + + if resource_type == "GROUP": + command = [ + "--authorizer-properties", + f"zookeeper.connect={self.zookeeper}", + "--remove", + f"--allow-principal=User:{username}", + f"--operation={operation}", + f"--group={resource_name}", + "--resource-pattern-type=PREFIXED", + "--force", + ] + run_bin_command( + container=self.container, + bin_keyword="acls", + bin_args=command, + extra_args=self.opts, + ) + + def remove_all_user_acls(self, username: str) -> None: + """Removes all active ACLs for a given user. + + Args: + username: the user name to remove ACLs for + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + # getting subset of all cluster ACLs for only the provided user + current_user_acls = {acl for acl in self.current_acls if acl.username == username} + + for acl in current_user_acls: + self.remove_acl(**asdict(acl)) + + def update_user_acls( + self, username: str, topic: str, extra_user_roles: str, group: Optional[str], **_ + ) -> None: + """Compares data passed from the client relation, and updating cluster ACLs to match. + + `producer`s are granted READ, DESCRIBE and WRITE access for a given topic + `consumer`s are granted READ, DESCRIBE access for a given topic, and READ access for a + generated consumer group + + If new ACLs provided do not match existing ACLs set for the cluster, existing ACLs will + be revoked + + Args: + username: the user name to update ACLs for + topic: the topic to update ACLs for + extra_user_roles: the `extra-user-roles` for the user + group: the consumer group + + Raises: + `subprocess.CalledProcessError`: if the error returned a non-zero exit code + """ + if "producer" in extra_user_roles: + self.new_user_acls.update(self._generate_producer_acls(topic=topic, username=username)) + if "consumer" in extra_user_roles: + self.new_user_acls.update( + self._generate_consumer_acls(topic=topic, username=username, group=group) + ) + + # getting subset of all cluster ACLs for only the provided user + current_user_acls = {acl for acl in self.current_acls if acl.username == username} + + acls_to_add = self.new_user_acls - current_user_acls + for acl in acls_to_add: + self.add_acl(**asdict(acl)) + + acls_to_remove = current_user_acls - self.new_user_acls + for acl in acls_to_remove: + self.remove_acl(**asdict(acl)) diff --git a/src/charm.py b/src/charm.py index 33d7062a..2a002b50 100755 --- a/src/charm.py +++ b/src/charm.py @@ -5,19 +5,25 @@ """Charmed Machine Operator for Apache Kafka.""" import logging -from typing import List -from ops.charm import ActionEvent, CharmBase, RelationEvent, RelationJoinedEvent +from charms.rolling_ops.v0.rollingops import RollingOpsManager +from ops.charm import ( + ActionEvent, + CharmBase, + ConfigChangedEvent, + RelationEvent, + RelationJoinedEvent, +) from ops.framework import EventBase from ops.main import main from ops.model import ActiveStatus, BlockedStatus, Container, Relation, WaitingStatus -from ops.pebble import ExecError, Layer +from ops.pebble import ExecError, Layer, PathError, ProtocolError +from auth import KafkaAuth from config import KafkaConfig -from connection_check import broker_active, zookeeper_connected from literals import CHARM_KEY, CHARM_USERS, PEER, ZOOKEEPER_REL_NAME from provider import KafkaProvider -from utils import generate_password +from utils import broker_active, generate_password logger = logging.getLogger(__name__) @@ -30,9 +36,13 @@ def __init__(self, *args): self.name = CHARM_KEY self.kafka_config = KafkaConfig(self) self.client_relations = KafkaProvider(self) + self.restart = RollingOpsManager(self, relation="restart", callback=self._restart) self.framework.observe(getattr(self.on, "kafka_pebble_ready"), self._on_kafka_pebble_ready) self.framework.observe(getattr(self.on, "leader_elected"), self._on_leader_elected) + self.framework.observe(getattr(self.on, "config_changed"), self._on_config_changed) + self.framework.observe(self.on[PEER].relation_changed, self._on_config_changed) + self.framework.observe( self.on[ZOOKEEPER_REL_NAME].relation_joined, self._on_zookeeper_joined ) @@ -43,7 +53,7 @@ def __init__(self, *args): self.on[ZOOKEEPER_REL_NAME].relation_broken, self._on_zookeeper_broken ) - self.framework.observe(self.on.set_password_action, self._set_password_action) + self.framework.observe(getattr(self.on, "set_password_action"), self._set_password_action) @property def container(self) -> Container: @@ -70,40 +80,16 @@ def _kafka_layer(self) -> Layer: @property def peer_relation(self) -> Relation: - """The Kafka peer relation.""" + """The Kafka cluster relation.""" return self.model.get_relation(PEER) - def run_bin_command(self, bin_keyword: str, bin_args: List[str], extra_args: str) -> str: - """Runs kafka bin command with desired args. - - Args: - bin_keyword: the kafka shell script to run - e.g `configs`, `topics` etc - bin_args: the shell command args - extra_args (optional): the desired `KAFKA_OPTS` env var values for the command - - Returns: - String of kafka bin command output - """ - environment = {"KAFKA_OPTS": extra_args} - command = [f"/opt/kafka/bin/kafka-{bin_keyword}.sh"] + bin_args - - try: - process = self.container.exec(command=command, environment=environment) - output, _ = process.wait_output() - logger.debug(f"{output=}") - return output - except (ExecError) as e: - logger.debug(f"cmd failed:\ncommand={e.command}\nstdout={e.stdout}\nstderr={e.stderr}") - raise e - def _on_kafka_pebble_ready(self, event: EventBase) -> None: """Handler for `kafka_pebble_ready` event.""" if not self.container.can_connect(): event.defer() return - if not zookeeper_connected(charm=self): + if not self.kafka_config.zookeeper_connected: self.unit.status = WaitingStatus("waiting for zookeeper relation") return @@ -113,19 +99,21 @@ def _on_kafka_pebble_ready(self, event: EventBase) -> None: # do not start units until SCRAM users have been added to ZooKeeper for server-server auth if self.unit.is_leader() and self.kafka_config.sync_password: + kafka_auth = KafkaAuth( + opts=[self.kafka_config.extra_args], + zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), + container=self.container, + ) try: - self.add_user_to_zookeeper( - username="sync", password=self.kafka_config.sync_password - ) + kafka_auth.add_user(username="sync", password=self.kafka_config.sync_password) self.peer_relation.data[self.app].update({"broker-creds": "added"}) - except ExecError: - # command to add users fails sometimes for unknown reasons. Retry seems to fix it. + except ExecError as e: + logger.debug(str(e)) event.defer() return # for non-leader units - if not self.peer_relation.data[self.app].get("broker-creds", None): - logger.debug("broker-creds not yet added to zookeeper") + if not self.ready_to_start: event.defer() return @@ -144,6 +132,38 @@ def _on_kafka_pebble_ready(self, event: EventBase) -> None: self.unit.status = BlockedStatus("kafka unit not connected to ZooKeeper") return + def _on_config_changed(self, event: ConfigChangedEvent) -> None: + """Generic handler for most `config_changed` events across relations.""" + if not self.ready_to_start: + event.defer() + return + + # Load current properties set in the charm workload + raw_properties = None + try: + raw_properties = str(self.container.pull(self.kafka_config.properties_filepath).read()) + properties = raw_properties.splitlines() + except (ProtocolError, PathError) as e: + logger.debug(str(e)) + event.defer() + return + if not raw_properties: + # Event fired before charm has properly started + event.defer() + return + + if set(properties) ^ set(self.kafka_config.server_properties): + logger.info( + ( + f'Broker {self.unit.name.split("/")[1]} updating config - ' + f"OLD PROPERTIES = {set(properties) - set(self.kafka_config.server_properties)}, " + f"NEW PROPERTIES = {set(self.kafka_config.server_properties) - set(properties)}" + ) + ) + self.kafka_config.set_server_properties() + + self.on[self.restart.name].acquire_lock.emit() + def _on_leader_elected(self, _) -> None: """Handler for `leader_elected` event, ensuring sync_passwords gets set.""" sync_password = self.kafka_config.sync_password @@ -192,8 +212,13 @@ def _set_password_action(self, event: ActionEvent): return # Update the user + kafka_auth = KafkaAuth( + opts=[self.kafka_config.extra_args], + zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), + container=self.container, + ) try: - self.add_user_to_zookeeper(username=username, password=new_password) + kafka_auth.add_user(username="sync", password=new_password) except ExecError as e: logger.error(str(e)) event.fail(str(e)) @@ -203,39 +228,27 @@ def _set_password_action(self, event: ActionEvent): self.peer_relation.data[self.app].update({f"{username}_password": new_password}) event.set_results({f"{username}-password": new_password}) - def add_user_to_zookeeper(self, username: str, password: str) -> None: - """Adds user credentials to ZooKeeper for authorising clients and brokers. + def _restart(self, event: EventBase) -> None: + """Handler for `rolling_ops` restart events.""" + if not self.ready_to_start: + event.defer() + return - Raises: - ops.pebble.ExecError: If the command failed - """ - command = [ - f"--zookeeper={self.kafka_config.zookeeper_config['connect']}", - "--alter", - "--entity-type=users", - f"--entity-name={username}", - f"--add-config=SCRAM-SHA-512=[password={password}]", - ] - self.run_bin_command( - bin_keyword="configs", bin_args=command, extra_args=self.kafka_config.extra_args - ) + self.container.restart(CHARM_KEY) - def delete_user_from_zookeeper(self, username: str) -> None: - """Deletes user credentials from ZooKeeper for authorising clients and brokers. + @property + def ready_to_start(self) -> bool: + """Check for active ZooKeeper relation and adding of inter-broker auth username. - Raises: - ops.pebble.ExecError: If the command failed + Returns: + True if ZK is related and `sync` user has been added. False otherwise. """ - command = [ - f"--zookeeper={self.kafka_config.zookeeper_config['connect']}", - "--alter", - "--entity-type=users", - f"--entity-name={username}", - "--delete-config=SCRAM-SHA-512", - ] - self.run_bin_command( - bin_keyword="configs", bin_args=command, extra_args=self.kafka_config.extra_args - ) + if not self.kafka_config.zookeeper_connected or not self.peer_relation.data[self.app].get( + "broker-creds", None + ): + return False + + return True if __name__ == "__main__": diff --git a/src/config.py b/src/config.py index a2060b97..89cc681c 100644 --- a/src/config.py +++ b/src/config.py @@ -8,8 +8,9 @@ from typing import Dict, List, Optional from ops.charm import CharmBase +from ops.model import Container, Unit -from literals import CHARM_KEY, PEER, ZOOKEEPER_REL_NAME +from literals import CHARM_KEY, PEER, REL_NAME, ZOOKEEPER_REL_NAME logger = logging.getLogger(__name__) @@ -21,7 +22,6 @@ security.inter.broker.protocol=SASL_PLAINTEXT authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false -super.users=User:sync listener.name.sasl_plaintext.sasl.enabled.mechanisms=SCRAM-SHA-512 """ @@ -31,11 +31,15 @@ class KafkaConfig: def __init__(self, charm: CharmBase): self.charm = charm - self.container = self.charm.unit.get_container(CHARM_KEY) self.default_config_path = f"{self.charm.config['data-dir']}/config" self.properties_filepath = f"{self.default_config_path}/server.properties" self.jaas_filepath = f"{self.default_config_path}/kafka-jaas.cfg" + @property + def container(self) -> Container: + """Grabs the current Kafka container.""" + return getattr(self.charm, "unit").get_container(CHARM_KEY) + @property def sync_password(self) -> Optional[str]: """Returns charm-set sync_password for server-server auth between brokers.""" @@ -62,12 +66,27 @@ def zookeeper_config(self) -> Dict[str, str]: break if zookeeper_config: - zookeeper_config["connect"] = ( - zookeeper_config["uris"].replace(zookeeper_config["chroot"], "") - + zookeeper_config["chroot"] + sorted_uris = sorted( + zookeeper_config["uris"].replace(zookeeper_config["chroot"], "").split(",") ) + sorted_uris[-1] = sorted_uris[-1] + zookeeper_config["chroot"] + zookeeper_config["connect"] = ",".join(sorted_uris) + return zookeeper_config + @property + def zookeeper_connected(self) -> bool: + """Checks if there is an active ZooKeeper relation. + + Returns: + True if ZooKeeper is currently related with sufficient relation data + for a broker to connect with. False otherwise. + """ + if self.zookeeper_config.get("connect", None): + return True + + return False + @property def extra_args(self) -> str: """Collection of Java config arguments for SASL auth. @@ -79,6 +98,19 @@ def extra_args(self) -> str: return extra_args + @property + def bootstrap_server(self) -> List[str]: + """The current Kafka uris formatted for the `bootstrap-server` command flag. + + Returns: + List of `bootstrap-server` servers + """ + units: List[Unit] = list( + set([self.charm.unit] + list(self.charm.model.get_relation(PEER).units)) + ) + hosts = [self.get_host_from_unit(unit=unit) for unit in units] + return [f"{host}:9092" for host in hosts] + @property def kafka_command(self) -> str: """The run command for starting the Kafka service. @@ -116,7 +148,7 @@ def auth_properties(self) -> List[str]: List of properties to be set """ broker_id = self.charm.unit.name.split("/")[1] - host = f"{self.charm.app.name}-{broker_id}.{self.charm.app.name}-endpoints" + host = self.get_host_from_unit(unit=self.charm.unit) return [ f"broker.id={broker_id}", @@ -125,8 +157,56 @@ def auth_properties(self) -> List[str]: f'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="{self.sync_password}";', ] + @property + def super_users(self) -> str: + """Generates all users with super/admin permissions for the cluster from relations. + + Formatting allows passing to the `super.users` property. + + Returns: + Semi-colon delimited string of current super users + """ + super_users = ["sync"] + for relation in self.charm.model.relations[REL_NAME]: + extra_user_roles = relation.data[relation.app].get("extra-user-roles", "") + password = ( + self.charm.model.get_relation(PEER) + .data[self.charm.app] + .get(f"relation-{relation.id}", None) + ) + # if passwords are set for client admins, they're good to load + if "admin" in extra_user_roles and password is not None: + super_users.append(f"relation-{relation.id}") + + super_users_arg = [f"User:{user}" for user in super_users] + + return ";".join(super_users_arg) + + @property + def server_properties(self) -> List[str]: + """Builds all properties necessary for starting Kafka service. + + This includes charm config, replication, SASL/SCRAM auth and default properties. + + Returns: + List of properties to be set + """ + return ( + [ + f"data.dir={self.charm.config['data-dir']}", + f"log.dir={self.charm.config['log-dir']}", + f"offsets.retention.minutes={self.charm.config['offsets-retention-minutes']}", + f"log.retention.hours={self.charm.config['log-retention-hours']}", + f"auto.create.topics={self.charm.config['auto-create-topics']}", + f"super.users={self.super_users}", + ] + + self.default_replication_properties + + self.auth_properties + + DEFAULT_CONFIG_OPTIONS.split("\n") + ) + def push(self, content: str, path: str) -> None: - """Simple wrapper for writing a file and contents to a container. + """Wrapper for writing a file and contents to a container. Args: content: the text content to write to a file path @@ -134,6 +214,10 @@ def push(self, content: str, path: str) -> None: """ self.container.push(path, content, make_dirs=True) + def set_server_properties(self) -> None: + """Sets all kafka config properties to the `server.properties` path.""" + self.push(content="\n".join(self.server_properties), path=self.properties_filepath) + def set_jaas_config(self) -> None: """Sets the Kafka JAAS config using zookeeper relation data.""" jaas_config = f""" @@ -145,19 +229,15 @@ def set_jaas_config(self) -> None: """ self.push(content=jaas_config, path=self.jaas_filepath) - def set_server_properties(self) -> None: - """Sets all kafka config properties to the server.properties path.""" - server_properties = ( - [ - f"data.dir={self.charm.config['data-dir']}", - f"log.dir={self.charm.config['log-dir']}", - f"offsets.retention.minutes={self.charm.config['offsets-retention-minutes']}", - f"log.retention.hours={self.charm.config['log-retention-hours']}", - f"auto.create.topics={self.charm.config['auto-create-topics']}", - ] - + self.default_replication_properties - + self.auth_properties - + DEFAULT_CONFIG_OPTIONS.split("\n") - ) + def get_host_from_unit(self, unit: Unit) -> str: + """Builds K8s host address for a given `Unit`. + + Args: + unit: the desired unit + + Returns: + String of host address + """ + broker_id = unit.name.split("/")[1] - self.push(content="\n".join(server_properties), path=self.properties_filepath) + return f"{self.charm.app.name}-{broker_id}.{self.charm.app.name}-endpoints" diff --git a/src/connection_check.py b/src/connection_check.py deleted file mode 100644 index 42edbbbb..00000000 --- a/src/connection_check.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Collection of helper methods for checking active connections between ZK and Kafka.""" - -import logging -from typing import Dict - -from charms.zookeeper.v0.client import ZooKeeperManager -from kazoo.exceptions import AuthFailedError, NoNodeError -from ops.charm import CharmBase -from ops.model import Unit -from tenacity import retry -from tenacity.retry import retry_if_not_result -from tenacity.stop import stop_after_attempt -from tenacity.wait import wait_fixed - -logger = logging.getLogger(__name__) - - -def zookeeper_connected(charm: CharmBase) -> bool: - """Flag for if required zookeeper config exists in the relation data. - - Returns: - True if config exits i.e successful relation. False otherwise - """ - if not getattr(charm, "kafka_config").zookeeper_config: - return False - - return True - - -@retry( - # retry to give ZK time to update its broker zNodes before failing - wait=wait_fixed(5), - stop=stop_after_attempt(3), - retry_error_callback=(lambda state: state.outcome.result()), - retry=retry_if_not_result(lambda result: True if result else False), -) -def broker_active(unit: Unit, zookeeper_config: Dict[str, str]) -> bool: - """Checks ZooKeeper for client connections, checks for specific broker id. - - Args: - unit: the `Unit` to check connection of - data: the relation data provided by ZooKeeper - - Returns: - True if broker id is recognised as active by ZooKeeper. Otherwise False. - """ - broker_id = unit.name.split("/")[1] - chroot = zookeeper_config.get("chroot", "") - hosts = zookeeper_config.get("endpoints", "").split(",") - username = zookeeper_config.get("username", "") - password = zookeeper_config.get("password", "") - - zk = ZooKeeperManager(hosts=hosts, username=username, password=password) - path = f"{chroot}/brokers/ids/" - - try: - brokers = zk.leader_znodes(path=path) - # auth might not be ready with ZK after relation yet - except (NoNodeError, AuthFailedError) as e: - logger.debug(str(e)) - return False - - return f"{chroot}/brokers/ids/{broker_id}" in brokers diff --git a/src/literals.py b/src/literals.py index 24b3d4e3..bd2b0fd1 100644 --- a/src/literals.py +++ b/src/literals.py @@ -7,5 +7,5 @@ CHARM_KEY = "kafka" PEER = "cluster" ZOOKEEPER_REL_NAME = "zookeeper" -REL_NAME = "kafka" CHARM_USERS = ["sync"] +REL_NAME = "kafka-client" diff --git a/src/provider.py b/src/provider.py index 1906a825..70811fff 100644 --- a/src/provider.py +++ b/src/provider.py @@ -5,15 +5,21 @@ """KafkaProvider class and methods.""" import logging -import secrets -import string from typing import Dict -from ops.charm import RelationBrokenEvent, RelationJoinedEvent +from ops.charm import ( + RelationBrokenEvent, + RelationChangedEvent, + RelationCreatedEvent, + RelationEvent, +) from ops.framework import Object from ops.model import Relation -from literals import PEER, REL_NAME +from auth import KafkaAuth +from config import KafkaConfig +from literals import CHARM_KEY, PEER, REL_NAME +from utils import generate_password logger = logging.getLogger(__name__) @@ -22,90 +28,159 @@ class KafkaProvider(Object): """Implements the provider-side logic for client applications relating to Kafka.""" def __init__(self, charm) -> None: - super().__init__(charm, "client") - + super().__init__(charm, "kafka_client") self.charm = charm - - self.framework.observe( - self.charm.on[REL_NAME].relation_joined, self._on_client_relation_joined - ) - self.framework.observe( - self.charm.on[REL_NAME].relation_broken, self._on_client_relation_broken + self.kafka_config = KafkaConfig(self.charm) + self.kafka_auth = KafkaAuth( + container=self.charm.unit.get_container(CHARM_KEY), + opts=[self.kafka_config.extra_args], + zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), ) + self.framework.observe(self.charm.on[REL_NAME].relation_created, self._on_relation_created) + self.framework.observe(self.charm.on[REL_NAME].relation_changed, self.update_acls) + self.framework.observe(self.charm.on[REL_NAME].relation_broken, self._on_relation_broken) + @property - def app_relation(self) -> Relation: + def peer_relation(self) -> Relation: """The Kafka cluster's peer relation.""" return self.charm.model.get_relation(PEER) - def relation_config(self, relation: Relation) -> Dict[str, str]: - """Builds necessary relation data for a given relation. + def requirer_relation_config(self, event: RelationEvent) -> Dict[str, str]: + """Builds necessary client relation data for a given relation event. + + Args: + event: the event needing config + + Returns: + Dict with keys `topic` and `extra_user_roles` + """ + return { + "extra_user_roles": event.relation.data[event.app].get("extra-user-roles", ""), + "topic": event.relation.data[event.app].get("topic"), + } + + def provider_relation_config(self, event: RelationEvent) -> Dict[str, str]: + """Builds necessary provider relation data for a given relation event. Args: event: the event needing config Returns: - Dict of `username`, `password` and `endpoints` data for the related app + Dict of `username`, `password`, `endpoints`, `uris` and `consumer-group-prefix` """ + relation = event.relation + username = f"relation-{relation.id}" - password = self.app_relation.data[self.charm.app].get(username, self.generate_password()) - units = set([self.charm.unit] + list(self.app_relation.units)) - endpoints = [ - f"{self.charm.app.name}-{unit.name.split('/')[1]}.{self.charm.app.name}-endpoints" - for unit in units - ] + password = self.peer_relation.data[self.charm.app].get(username) or generate_password() + bootstrap_server = self.charm.kafka_config.bootstrap_server + endpoints = [server.split(":")[0] for server in bootstrap_server] + zookeeper_uris = self.charm.kafka_config.zookeeper_config.get("connect", "") + + relation_config = { + "username": username, + "password": password, + "endpoints": ",".join(endpoints), + "uris": ",".join(bootstrap_server), + "zookeeper-uris": zookeeper_uris, + "consumer-group-prefix": "", + } + + # only set this if `consumer` is set to avoid missing information + if "consumer" in event.relation.data[event.app].get("extra-user-roles", ""): + relation_config["consumer-group-prefix"] = f"{username}-" - return {"username": username, "password": password, "endpoints": ",".join(endpoints)} + return relation_config - def _on_client_relation_joined(self, event: RelationJoinedEvent) -> None: - """Handler for `relation_joined` events.""" + def update_acls(self, event: RelationChangedEvent) -> None: + """Updates cluster ACLs for a given event relation to match client relation data. + + Args: + event: the event from a related client application needing ACLs + """ if not self.charm.unit.is_leader(): return - relation_config = self.relation_config(relation=event.relation) + if not self.charm.kafka_config.zookeeper_connected: + logger.debug("cannot update ACLs, ZooKeeper not yet connected") + event.defer() + return + + provider_relation_config = self.provider_relation_config(event=event) + requirer_relation_config = self.requirer_relation_config(event=event) + + self.kafka_auth.load_current_acls() + + self.kafka_auth.update_user_acls( + username=provider_relation_config["username"], + group=provider_relation_config.get("consumer-group-prefix"), + **requirer_relation_config, + ) + + # non-leader units need cluster_config_changed event to update their super.users + self.charm.model.get_relation(PEER).data[self.charm.app].update( + {"super-users": self.kafka_config.super_users} + ) - self.add_user(username=relation_config["username"], password=relation_config["password"]) - event.relation.data[self.charm.app].update(relation_config) + event.relation.data[self.charm.app].update(provider_relation_config) - def _on_client_relation_broken(self, event: RelationBrokenEvent) -> None: - """Handler for `relation_broken` events.""" + def _on_relation_created(self, event: RelationCreatedEvent) -> None: + """Handler for `kafka-client-relation-created` event. + + Adds new relation users to ZooKeeper. + + Args: + event: the event from a related client application needing a user + """ if not self.charm.unit.is_leader(): return - relation_config = self.relation_config(relation=event.relation) + if not self.charm.ready_to_start: + logger.debug("cannot add user, ZooKeeper not yet connected") + event.defer() + return - self.delete_user(username=relation_config["username"]) + provider_relation_config = self.provider_relation_config(event=event) - def add_user(self, username: str, password: str) -> None: - """Adds/updates users' SCRAM credentials to ZooKeeper. + self.kafka_auth.add_user( + username=provider_relation_config["username"], + password=provider_relation_config["password"], + ) - Args: - username: the user's username - password: the user's password + # non-leader units need cluster_config_changed event to update their super.users + self.charm.model.get_relation(PEER).data[self.charm.app].update( + {provider_relation_config["username"]: provider_relation_config["password"]} + ) - Raises: - ops.pebble.ExecError: if the command failed - """ - self.charm.add_user_to_zookeeper(username=username, password=password) - self.app_relation.data[self.charm.app].update({username: password}) + def _on_relation_broken(self, event: RelationBrokenEvent) -> None: + """Handler for `kafka-client-relation-broken` event. - def delete_user(self, username: str) -> None: - """Deletes users' SCRAM credentials from ZooKeeper. + Removes relation users from ZooKeeper. Args: - username: the user's username - - Raises: - ops.pebble.ExecError: if the command failed + event: the event from a related client application needing a user """ - self.charm.delete_user_from_zookeeper(username=username) - self.app_relation.data[self.charm.app].update({username: ""}) + if not self.charm.unit.is_leader(): + return + + if not self.charm.ready_to_start: + logger.debug("cannot remove user, ZooKeeper not yet connected") + event.defer() + return - @staticmethod - def generate_password(): - """Creates randomized string for use as app passwords. + if ( + event.relation.app != self.charm.app or not self.charm.app.planned_units() == 0 + ): # avoid on own charm during teardown + provider_relation_config = self.provider_relation_config(event=event) - Returns: - String of 32 randomized letter+digit characters - """ - return "".join([secrets.choice(string.ascii_letters + string.digits) for _ in range(32)]) + self.kafka_auth.load_current_acls() + + self.kafka_auth.remove_all_user_acls( + username=provider_relation_config["username"], + ) + self.kafka_auth.delete_user(username=provider_relation_config["username"]) + + # non-leader units need cluster_config_changed event to update their super.users + self.charm.model.get_relation(PEER).data[self.charm.app].update( + {provider_relation_config["username"]: ""} + ) diff --git a/src/utils.py b/src/utils.py index 4d43c8da..e118af7d 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,4 +1,4 @@ -# !/usr/bin/env python3 +#!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. @@ -7,10 +7,70 @@ import logging import secrets import string +from typing import Dict, List, Set + +from charms.zookeeper.v0.client import ZooKeeperManager +from kazoo.exceptions import AuthFailedError, NoNodeError +from ops.model import Container, Unit +from ops.pebble import ExecError +from tenacity import retry +from tenacity.retry import retry_if_not_result +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_fixed logger = logging.getLogger(__name__) +@retry( + # retry to give ZK time to update its broker zNodes before failing + wait=wait_fixed(5), + stop=stop_after_attempt(6), + retry_error_callback=(lambda state: state.outcome.result()), + retry=retry_if_not_result(lambda result: True if result else False), +) +def broker_active(unit: Unit, zookeeper_config: Dict[str, str]) -> bool: + """Checks ZooKeeper for client connections, checks for specific broker id. + + Args: + unit: the `Unit` to check connection of + zookeeper_config: the relation provided by ZooKeeper + + Returns: + True if broker id is recognised as active by ZooKeeper. Otherwise False. + """ + broker_id = unit.name.split("/")[1] + brokers = get_active_brokers(zookeeper_config=zookeeper_config) + chroot = zookeeper_config.get("chroot", "") + return f"{chroot}/brokers/ids/{broker_id}" in brokers + + +def get_active_brokers(zookeeper_config: Dict[str, str]) -> Set[str]: + """Gets all brokers currently connected to ZooKeeper. + + Args: + zookeeper_config: the relation data provided by ZooKeeper + + Returns: + Set of active broker ids + """ + chroot = zookeeper_config.get("chroot", "") + hosts = zookeeper_config.get("endpoints", "").split(",") + username = zookeeper_config.get("username", "") + password = zookeeper_config.get("password", "") + + zk = ZooKeeperManager(hosts=hosts, username=username, password=password) + path = f"{chroot}/brokers/ids/" + + try: + brokers = zk.leader_znodes(path=path) + # auth might not be ready with ZK after relation yet + except (NoNodeError, AuthFailedError) as e: + logger.debug(str(e)) + return set() + + return brokers + + def generate_password() -> str: """Creates randomized string for use as app passwords. @@ -18,3 +78,31 @@ def generate_password() -> str: String of 32 randomized letter+digit characters """ return "".join([secrets.choice(string.ascii_letters + string.digits) for _ in range(32)]) + + +def run_bin_command( + container: Container, bin_keyword: str, bin_args: List[str], extra_args: str +) -> str: + """Runs kafka bin command with desired args. + + Args: + container: the container to run on + bin_keyword: the kafka shell script to run + e.g `configs`, `topics` etc + bin_args: the shell command args + extra_args (optional): the desired `KAFKA_OPTS` env var values for the command + + Returns: + String of kafka bin command output + """ + environment = {"KAFKA_OPTS": extra_args} + command = [f"/opt/kafka/bin/kafka-{bin_keyword}.sh"] + bin_args + + try: + process = container.exec(command=command, environment=environment) + output, _ = process.wait_output() + logger.debug(f"{output=}") + return output + except (ExecError) as e: + logger.debug(f"cmd failed:\ncommand={e.command}\nstdout={e.stdout}\nstderr={e.stderr}") + raise e diff --git a/tests/fixtures/valid_server.properties b/tests/fixtures/valid_server.properties deleted file mode 100644 index dccf7741..00000000 --- a/tests/fixtures/valid_server.properties +++ /dev/null @@ -1,6 +0,0 @@ -broker.id=1 -clientPort=2181 -broker.id.generation.enable=true -listeners=PLAINTEXT://:9092 -advertised.listeners=PLAINTEXT://:9092 -log.dirs=/var/lib/kafka/data diff --git a/tests/integration/app-charm/actions.yaml b/tests/integration/app-charm/actions.yaml new file mode 100644 index 00000000..24630763 --- /dev/null +++ b/tests/integration/app-charm/actions.yaml @@ -0,0 +1,11 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +make-admin: + description: Adds admin to relation data + +remove-admin: + description: Removes admin from relation data + +change-topic: + description: Changes topic diff --git a/tests/integration/app-charm/metadata.yaml b/tests/integration/app-charm/metadata.yaml index de52389d..49804b95 100644 --- a/tests/integration/app-charm/metadata.yaml +++ b/tests/integration/app-charm/metadata.yaml @@ -13,5 +13,5 @@ peers: interface: cluster requires: - kafka: - interface: kafka + kafka-client: + interface: kafka_client diff --git a/tests/integration/app-charm/src/charm.py b/tests/integration/app-charm/src/charm.py index a30759a2..769058e4 100755 --- a/tests/integration/app-charm/src/charm.py +++ b/tests/integration/app-charm/src/charm.py @@ -19,7 +19,8 @@ CHARM_KEY = "app" PEER = "cluster" -REL_NAME = "kafka" +REL_NAME = "kafka-client" +ZK = "zookeeper" class ApplicationCharm(CharmBase): @@ -30,9 +31,13 @@ def __init__(self, *args): self.name = CHARM_KEY self.framework.observe(getattr(self.on, "start"), self._on_start) + self.framework.observe(self.on[REL_NAME].relation_created, self._set_data) self.framework.observe(self.on[REL_NAME].relation_changed, self._log) self.framework.observe(self.on[REL_NAME].relation_broken, self._log) - self.framework.observe(self.on[REL_NAME].relation_joined, self._set_data) + + self.framework.observe(getattr(self.on, "make_admin_action"), self._make_admin) + self.framework.observe(getattr(self.on, "remove_admin_action"), self._remove_admin) + self.framework.observe(getattr(self.on, "change_topic_action"), self._change_topic) @property def relation(self): @@ -41,8 +46,26 @@ def relation(self): def _on_start(self, _) -> None: self.unit.status = ActiveStatus() - def _set_data(self, _) -> None: - return + def _set_data(self, event: RelationEvent) -> None: + + event.relation.data[self.unit].update({"group": self.unit.name.split("/")[1]}) + + if not self.unit.is_leader(): + return + event.relation.data[self.app].update( + {"extra-user-roles": "consumer", "topic": "test-topic"} + ) + + def _make_admin(self, _): + self.model.get_relation(REL_NAME).data[self.app].update( + {"extra-user-roles": "admin,consumer"} + ) + + def _remove_admin(self, _): + self.model.get_relation(REL_NAME).data[self.app].update({"extra-user-roles": "producer"}) + + def _change_topic(self, _): + self.model.get_relation(REL_NAME).data[self.app].update({"topic": "test-topic-changed"}) def _log(self, event: RelationEvent): return diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 372535e7..4baef3c2 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1,20 +1,51 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. + import re from pathlib import Path from subprocess import PIPE, check_output -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Set, Tuple import yaml from pytest_operator.plugin import OpsTest +from auth import Acl, KafkaAuth + METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) KAFKA_CONTAINER = METADATA["resources"]["kafka-image"]["upstream-source"] APP_NAME = METADATA["name"] ZK_NAME = "zookeeper-k8s" +def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]: + container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeeper_uri} --list" + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh --container kafka kafka-k8s/0 '{container_command}'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return KafkaAuth._parse_acls(acls=result) + + +def load_super_users(model_full_name: str) -> List[str]: + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh --container kafka {APP_NAME}/0 'cat /data/kafka/config/server.properties'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + properties = result.splitlines() + + for prop in properties: + if "super.users" in prop: + return prop.split("=")[1].split(";") + + return [] + + def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" result = check_output( @@ -27,7 +58,7 @@ def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: assert "SCRAM-SHA-512" in result -def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: +def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> str: """Get information related to a user stored on zookeeper.""" container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" result = check_output( @@ -72,21 +103,6 @@ def get_zookeeper_connection(unit_name: str, model_full_name: str) -> Tuple[List raise Exception("config not found") -def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str, str]: - result = show_unit(unit_name=unit_name, model_full_name=model_full_name) - relations_info = result[unit_name]["relation-info"] - - zk_relation_data = {} - for info in relations_info: - if info["endpoint"] == "zookeeper": - zk_relation_data["chroot"] = info["application-data"]["chroot"] - zk_relation_data["endpoints"] = info["application-data"]["endpoints"] - zk_relation_data["password"] = info["application-data"]["password"] - zk_relation_data["uris"] = info["application-data"]["uris"] - zk_relation_data["username"] = info["application-data"]["username"] - return zk_relation_data - - async def set_password(ops_test: OpsTest, username="sync", password=None, num_unit=0) -> str: """Use the charm action to start a password rotation.""" params = {"username": username} @@ -117,3 +133,18 @@ def check_application_status(ops_test: OpsTest, app_name: str) -> str: find_status = list(statuses & set(parts)) if find_status: return find_status[0] + + +def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str, str]: + result = show_unit(unit_name=unit_name, model_full_name=model_full_name) + relations_info = result[unit_name]["relation-info"] + + zk_relation_data = {} + for info in relations_info: + if info["endpoint"] == "zookeeper": + zk_relation_data["chroot"] = info["application-data"]["chroot"] + zk_relation_data["endpoints"] = info["application-data"]["endpoints"] + zk_relation_data["password"] = info["application-data"]["password"] + zk_relation_data["uris"] = info["application-data"]["uris"] + zk_relation_data["username"] = info["application-data"]["username"] + return zk_relation_data diff --git a/tests/integration/test_kafka_provider.py b/tests/integration/test_kafka_provider.py deleted file mode 100644 index 7c15b87c..00000000 --- a/tests/integration/test_kafka_provider.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -import asyncio -import logging - -import pytest -from helpers import ( - APP_NAME, - KAFKA_CONTAINER, - ZK_NAME, - check_user, - get_zookeeper_connection, -) -from pytest_operator.plugin import OpsTest - -logger = logging.getLogger(__name__) - -DUMMY_NAME_1 = "app" -DUMMY_NAME_2 = "appii" - - -@pytest.fixture(scope="module") -def usernames(): - return set() - - -@pytest.mark.abort_on_fail -async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): - kafka_charm = await ops_test.build_charm(".") - app_charm = await ops_test.build_charm("tests/integration/app-charm") - - await asyncio.gather( - ops_test.model.deploy( - "zookeeper-k8s", channel="edge", application_name=ZK_NAME, num_units=3 - ), - ops_test.model.deploy( - kafka_charm, - application_name=APP_NAME, - num_units=1, - resources={"kafka-image": KAFKA_CONTAINER}, - ), - ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), - ) - await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3) - await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1, ZK_NAME]) - - await ops_test.model.add_relation(APP_NAME, ZK_NAME) - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) - - await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_1) - await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) - - assert ops_test.model.applications[APP_NAME].status == "active" - assert ops_test.model.applications[DUMMY_NAME_1].status == "active" - - # implicitly tests setting of kafka app data - returned_usernames, zookeeper_uri = get_zookeeper_connection( - unit_name="kafka-k8s/0", model_full_name=ops_test.model_full_name - ) - usernames.update(returned_usernames) - - for username in usernames: - check_user( - username=username, - zookeeper_uri=zookeeper_uri, - model_full_name=ops_test.model_full_name, - ) - - -@pytest.mark.abort_on_fail -async def test_deploy_multiple_charms_relate_active(ops_test: OpsTest, usernames): - appii_charm = await ops_test.build_charm("tests/integration/app-charm") - await ops_test.model.deploy(appii_charm, application_name=DUMMY_NAME_2, num_units=1), - await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2]) - await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_2) - await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_2]) - - assert ops_test.model.applications[APP_NAME].status == "active" - assert ops_test.model.applications[DUMMY_NAME_1].status == "active" - assert ops_test.model.applications[DUMMY_NAME_2].status == "active" - - returned_usernames, zookeeper_uri = get_zookeeper_connection( - unit_name="kafka-k8s/0", model_full_name=ops_test.model_full_name - ) - usernames.update(returned_usernames) - - for username in usernames: - check_user( - username=username, - zookeeper_uri=zookeeper_uri, - model_full_name=ops_test.model_full_name, - ) - - -@pytest.mark.abort_on_fail -async def test_remove_application_removes_user(ops_test: OpsTest, usernames): - await ops_test.model.applications[DUMMY_NAME_1].remove() - await ops_test.model.applications[DUMMY_NAME_2].remove() - await ops_test.model.wait_for_idle(apps=[APP_NAME]) - assert ops_test.model.applications[APP_NAME].status == "active" - - # checks that past usernames no longer exist in ZooKeeper - with pytest.raises(Exception): - _, _ = get_zookeeper_connection( - unit_name="kafka-k8s/0", model_full_name=ops_test.model_full_name - ) diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py new file mode 100644 index 00000000..951e2efb --- /dev/null +++ b/tests/integration/test_provider.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import time + +import pytest +from pytest_operator.plugin import OpsTest + +from tests.integration.helpers import ( + APP_NAME, + KAFKA_CONTAINER, + ZK_NAME, + check_user, + get_zookeeper_connection, + load_acls, + load_super_users, +) + +logger = logging.getLogger(__name__) + +DUMMY_NAME_1 = "app" +DUMMY_NAME_2 = "appii" + + +@pytest.fixture(scope="module") +def usernames(): + return set() + + +@pytest.mark.abort_on_fail +async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): + kafka_charm = await ops_test.build_charm(".") + app_charm = await ops_test.build_charm("tests/integration/app-charm") + + await asyncio.gather( + ops_test.model.deploy( + "zookeeper-k8s", channel="edge", application_name=ZK_NAME, num_units=3 + ), + ops_test.model.deploy( + kafka_charm, + application_name=APP_NAME, + num_units=1, + resources={"kafka-image": KAFKA_CONTAINER}, + ), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), + ) + await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1, ZK_NAME]) + await ops_test.model.add_relation(APP_NAME, ZK_NAME) + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) + await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_1) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[DUMMY_NAME_1].status == "active" + + # implicitly tests setting of kafka app data + returned_usernames, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + usernames.update(returned_usernames) + + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) + + for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): + assert acl.username in usernames + assert acl.operation in ["READ", "DESCRIBE"] + assert acl.resource_type in ["GROUP", "TOPIC"] + if acl.resource_type == "TOPIC": + assert acl.resource_name == "test-topic" + + +@pytest.mark.abort_on_fail +async def test_change_client_topic(ops_test: OpsTest): + action = await ops_test.model.units.get(f"{DUMMY_NAME_1}/0").run_action("change-topic") + await action.wait() + + assert ops_test.model.applications[APP_NAME].status == "active" + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + + _, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + + for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): + if acl.resource_type == "TOPIC": + assert acl.resource_name == "test-topic-changed" + + +@pytest.mark.abort_on_fail +async def test_admin_added_to_super_users(ops_test: OpsTest): + # ensures only broker user for now + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 1 + + action = await ops_test.model.units.get(f"{DUMMY_NAME_1}/0").run_action("make-admin") + await action.wait() + + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + assert ops_test.model.applications[APP_NAME].status == "active" + + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 2 + + +@pytest.mark.abort_on_fail +async def test_admin_removed_from_super_users(ops_test: OpsTest): + action = await ops_test.model.units.get(f"{DUMMY_NAME_1}/0").run_action("remove-admin") + await action.wait() + + time.sleep(20) + + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + assert ops_test.model.applications[APP_NAME].status == "active" + + super_users = load_super_users(model_full_name=ops_test.model_full_name) + assert len(super_users) == 1 + + +@pytest.mark.abort_on_fail +async def test_deploy_multiple_charms_same_topic_relate_active(ops_test: OpsTest, usernames): + appii_charm = await ops_test.build_charm("tests/integration/app-charm") + await ops_test.model.deploy(appii_charm, application_name=DUMMY_NAME_2, num_units=1), + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2], timeout=1000) + await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_2) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_2], timeout=1000) + + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[DUMMY_NAME_1].status == "active" + assert ops_test.model.applications[DUMMY_NAME_2].status == "active" + + returned_usernames, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + usernames.update(returned_usernames) + + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) + + for acl in load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri): + assert acl.username in usernames + if acl.resource_type == "TOPIC": + assert acl.resource_name in ["test-topic", "test-topic-changed"] + + +@pytest.mark.skip # skip until scaling operations work in MicroK8s +@pytest.mark.abort_on_fail +async def test_remove_application_removes_user_and_acls(ops_test: OpsTest, usernames): + await ops_test.model.remove_application(DUMMY_NAME_1, block_until_done=True) + await ops_test.model.wait_for_idle(apps=[APP_NAME]) + assert ops_test.model.applications[APP_NAME].status == "active" + + _, zookeeper_uri = get_zookeeper_connection( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + + # checks that old users are removed from active cluster ACLs + acls = load_acls(model_full_name=ops_test.model_full_name, zookeeper_uri=zookeeper_uri) + acl_usernames = set() + for acl in acls: + acl_usernames.add(acl.username) + + assert acl_usernames != usernames + + # checks that past usernames no longer exist in ZooKeeper + with pytest.raises(AssertionError): + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py new file mode 100644 index 00000000..6145805a --- /dev/null +++ b/tests/integration/test_scaling.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import time + +import pytest +from pytest_operator.plugin import OpsTest + +from literals import CHARM_KEY, ZOOKEEPER_REL_NAME +from tests.integration.helpers import get_kafka_zk_relation_data +from utils import get_active_brokers + +logger = logging.getLogger(__name__) + + +@pytest.mark.skip # skip until scaling operations work in MicroK8s +@pytest.mark.abort_on_fail +async def test_kafka_simple_scale_up(ops_test: OpsTest): + kafka_charm = await ops_test.build_charm(".") + + await asyncio.gather( + ops_test.model.deploy("zookeeper-k8s", application_name=ZOOKEEPER_REL_NAME, num_units=1), + ops_test.model.deploy( + kafka_charm, + application_name=CHARM_KEY, + num_units=1, + resources={"kafka-image": "ubuntu/kafka:latest"}, + ), + ) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZOOKEEPER_REL_NAME]) + await ops_test.model.add_relation(CHARM_KEY, ZOOKEEPER_REL_NAME) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY, ZOOKEEPER_REL_NAME]) + assert ops_test.model.applications[ZOOKEEPER_REL_NAME].status == "active" + assert ops_test.model.applications[CHARM_KEY].status == "active" + + await ops_test.model.applications[CHARM_KEY].add_units(count=2) + await ops_test.model.block_until( + lambda: len(ops_test.model.applications[CHARM_KEY].units) == 3 + ) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY], status="active", timeout=1000) + + kafka_zk_relation_data = get_kafka_zk_relation_data( + unit_name=f"{CHARM_KEY}/2", model_full_name=ops_test.model_full_name + ) + active_brokers = get_active_brokers(zookeeper_config=kafka_zk_relation_data) + chroot = kafka_zk_relation_data.get("chroot", "") + assert f"{chroot}/brokers/ids/0" in active_brokers + assert f"{chroot}/brokers/ids/1" in active_brokers + assert f"{chroot}/brokers/ids/2" in active_brokers + + +@pytest.mark.skip # skip until scaling operations work in MicroK8s +@pytest.mark.abort_on_fail +async def test_kafka_simple_scale_down(ops_test: OpsTest): + + await ops_test.model.applications[CHARM_KEY].destroy_units(f"{CHARM_KEY}/1") + await ops_test.model.block_until( + lambda: len(ops_test.model.applications[CHARM_KEY].units) == 2 + ) + await ops_test.model.wait_for_idle(apps=[CHARM_KEY], status="active", timeout=1000) + + time.sleep(30) + + kafka_zk_relation_data = get_kafka_zk_relation_data( + unit_name=f"{CHARM_KEY}/2", model_full_name=ops_test.model_full_name + ) + active_brokers = get_active_brokers(zookeeper_config=kafka_zk_relation_data) + chroot = kafka_zk_relation_data.get("chroot", "") + assert f"{chroot}/brokers/ids/0" in active_brokers + assert f"{chroot}/brokers/ids/1" not in active_brokers + assert f"{chroot}/brokers/ids/2" in active_brokers diff --git a/tests/unit/test_auth.py b/tests/unit/test_auth.py new file mode 100644 index 00000000..4817e532 --- /dev/null +++ b/tests/unit/test_auth.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +from auth import Acl, KafkaAuth + + +def test_acl(): + assert sorted(list(Acl.__annotations__.keys())) == sorted( + ["operation", "resource_name", "resource_type", "username"] + ) + assert Acl.__hash__ + + +def test_parse_acls(): + acls = """ + Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=relation-81-*, patternType=LITERAL)`: + (principal=User:relation-81, host=*, operation=READ, permissionType=ALLOW) + + Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: + (principal=User:relation-81, host=*, operation=WRITE, permissionType=ALLOW) + (principal=User:relation-81, host=*, operation=CREATE, permissionType=ALLOW) + (principal=User:relation-81, host=*, operation=DESCRIBE, permissionType=ALLOW) + (principal=User:relation-81, host=*, operation=READ, permissionType=ALLOW) + """ + + parsed_acls = KafkaAuth._parse_acls(acls=acls) + + assert len(parsed_acls) == 5 + assert type(list(parsed_acls)[0]) == Acl + + +def test_generate_producer_acls(): + generated_acls = KafkaAuth._generate_producer_acls(topic="theonering", username="frodo") + assert len(generated_acls) == 3 + + operations = set() + resource_types = set() + for acl in generated_acls: + operations.add(acl.operation) + resource_types.add(acl.resource_type) + + assert sorted(operations) == sorted(set(["CREATE", "WRITE", "DESCRIBE"])) + assert resource_types == {"TOPIC"} + + +def test_generate_consumer_acls(): + generated_acls = KafkaAuth._generate_consumer_acls(topic="theonering", username="frodo") + assert len(generated_acls) == 3 + + operations = set() + resource_types = set() + for acl in generated_acls: + operations.add(acl.operation) + resource_types.add(acl.resource_type) + + if acl.resource_type == "GROUP": + assert acl.operation == "READ" + + assert sorted(operations) == sorted(set(["READ", "DESCRIBE"])) + assert sorted(resource_types) == sorted(set(["TOPIC", "GROUP"])) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 3e71e6ae..cd283052 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -2,9 +2,8 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. -import unittest - import ops.testing +import pytest from ops.testing import Harness from charm import KafkaK8sCharm @@ -12,40 +11,136 @@ ops.testing.SIMULATE_CAN_CONNECT = True -class TestKafkaConfig(unittest.TestCase): - def setUp(self): - self.harness = Harness(KafkaK8sCharm) - self.addCleanup(self.harness.cleanup) - self.harness.begin_with_initial_hooks() - self.relation_id = self.harness.add_relation("zookeeper", "kafka-k8s") - - def test_zookeeper_config_succeeds_fails_config(self): - self.harness.update_relation_data( - self.relation_id, - self.harness.charm.app.name, - { - "chroot": "/kafka", - "username": "moria", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181,2.2.2.2:2181/kafka", - }, - ) - self.assertDictEqual(self.harness.charm.kafka_config.zookeeper_config, {}) - - def test_zookeeper_config_succeeds_valid_config(self): - self.harness.update_relation_data( - self.relation_id, - self.harness.charm.app.name, - { - "chroot": "/kafka", - "username": "moria", - "password": "mellon", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", - }, - ) - self.assertIn("connect", self.harness.charm.kafka_config.zookeeper_config.keys()) - self.assertEqual( - self.harness.charm.kafka_config.zookeeper_config["connect"], - "1.1.1.1:2181,2.2.2.2:2181/kafka", - ) +@pytest.fixture(scope="function") +def harness(): + harness = Harness(KafkaK8sCharm) + harness.begin_with_initial_hooks() + return harness + + +@pytest.fixture(scope="function") +def zk_relation_id(harness): + relation_id = harness.add_relation("zookeeper", "kafka-k8s") + return relation_id + + +def test_zookeeper_config_succeeds_fails_config(zk_relation_id, harness): + harness.update_relation_data( + zk_relation_id, + harness.charm.app.name, + { + "chroot": "/kafka", + "username": "moria", + "endpoints": "1.1.1.1,2.2.2.2", + "uris": "1.1.1.1:2181,2.2.2.2:2181/kafka", + }, + ) + assert harness.charm.kafka_config.zookeeper_config == {} + assert not harness.charm.kafka_config.zookeeper_connected + + +def test_zookeeper_config_succeeds_valid_config(zk_relation_id, harness): + harness.update_relation_data( + zk_relation_id, + harness.charm.app.name, + { + "chroot": "/kafka", + "username": "moria", + "password": "mellon", + "endpoints": "1.1.1.1,2.2.2.2", + "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", + }, + ) + assert "connect" in harness.charm.kafka_config.zookeeper_config + assert ( + harness.charm.kafka_config.zookeeper_config["connect"] == "1.1.1.1:2181,2.2.2.2:2181/kafka" + ) + assert harness.charm.kafka_config.zookeeper_connected + + +def test_extra_args(harness): + args = harness.charm.kafka_config.extra_args + assert "-Djava.security.auth.login.config" in args + + +def test_bootstrap_server(harness): + peer_relation_id = harness.charm.model.get_relation("cluster").id + harness.add_relation_unit(peer_relation_id, "kafka/1") + + assert len(harness.charm.kafka_config.bootstrap_server) == 2 + for server in harness.charm.kafka_config.bootstrap_server: + assert "9092" in server + + +def test_default_replication_properties_less_than_three(harness): + assert "num.partitions=1" in harness.charm.kafka_config.default_replication_properties + assert ( + "default.replication.factor=1" in harness.charm.kafka_config.default_replication_properties + ) + assert "min.insync.replicas=1" in harness.charm.kafka_config.default_replication_properties + + +def test_default_replication_properties_more_than_three(harness): + peer_relation_id = harness.charm.model.get_relation("cluster").id + harness.add_relation_unit(peer_relation_id, "kafka/1") + harness.add_relation_unit(peer_relation_id, "kafka/2") + harness.add_relation_unit(peer_relation_id, "kafka/3") + harness.add_relation_unit(peer_relation_id, "kafka/4") + harness.add_relation_unit(peer_relation_id, "kafka/5") + + assert "num.partitions=3" in harness.charm.kafka_config.default_replication_properties + assert ( + "default.replication.factor=3" in harness.charm.kafka_config.default_replication_properties + ) + assert "min.insync.replicas=2" in harness.charm.kafka_config.default_replication_properties + + +def test_auth_properties(zk_relation_id, harness): + peer_relation_id = harness.charm.model.get_relation("cluster").id + harness.update_relation_data( + peer_relation_id, harness.charm.app.name, {"sync_password": "mellon"} + ) + harness.update_relation_data( + zk_relation_id, + harness.charm.app.name, + { + "chroot": "/kafka", + "username": "moria", + "password": "mellon", + "endpoints": "1.1.1.1,2.2.2.2", + "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", + }, + ) + + assert "broker.id=0" in harness.charm.kafka_config.auth_properties + assert ( + f"zookeeper.connect={harness.charm.kafka_config.zookeeper_config['connect']}" + in harness.charm.kafka_config.auth_properties + ) + assert ( + 'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="mellon";' + in harness.charm.kafka_config.auth_properties + ) + + +def test_super_users(harness): + assert len(harness.charm.kafka_config.super_users.split(";")) == 1 + + client_relation_id = harness.add_relation("kafka-client", "app") + harness.update_relation_data(client_relation_id, "app", {"extra-user-roles": "admin,producer"}) + client_relation_id = harness.add_relation("kafka-client", "appii") + harness.update_relation_data( + client_relation_id, "appii", {"extra-user-roles": "admin,consumer"} + ) + + peer_relation_id = harness.charm.model.get_relation("cluster").id + + harness.update_relation_data( + peer_relation_id, harness.charm.app.name, {"relation-2": "mellon"} + ) + assert len(harness.charm.kafka_config.super_users.split(";")) == 2 + + harness.update_relation_data( + peer_relation_id, harness.charm.app.name, {"relation-3": "mellon"} + ) + assert len(harness.charm.kafka_config.super_users.split(";")) == 3 diff --git a/tests/unit/test_provider.py b/tests/unit/test_provider.py deleted file mode 100644 index 81621a27..00000000 --- a/tests/unit/test_provider.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -import logging -import unittest -from collections import namedtuple - -import ops.testing -from ops.charm import CharmBase -from ops.testing import Harness - -from provider import KafkaProvider - -ops.testing.SIMULATE_CAN_CONNECT = True - -logger = logging.getLogger(__name__) - -METADATA = """ - name: kafka - peers: - cluster: - interface: cluster - provides: - kafka: - interface: kafka -""" - -CustomRelation = namedtuple("Relation", ["id"]) - - -class DummyKafkaCharm(CharmBase): - def __init__(self, *args): - super().__init__(*args) - self.client_relation = KafkaProvider(self) - - -class TestProvider(unittest.TestCase): - def setUp(self): - self.harness = Harness(DummyKafkaCharm, meta=METADATA) - self.addCleanup(self.harness.cleanup) - self.harness.begin_with_initial_hooks() - - @property - def provider(self): - return self.harness.charm.client_relation - - def test_relation_config_new_relation_no_password(self): - self.harness.set_leader(True) - relation_id = self.harness.add_relation("kafka", "client_app") - - config = self.harness.charm.client_relation.relation_config( - relation=self.harness.charm.model.get_relation( - relation_name="kafka", relation_id=relation_id - ) - ) - - self.assertEqual(sorted(["endpoints", "password", "username"]), sorted(config.keys())) - self.assertEqual(sorted(config["endpoints"].split(",")), ["kafka-0.kafka-endpoints"]) - self.assertEqual(len(config["password"]), 32) - - def test_relation_config_existing_relation_password(self): - self.harness.set_leader(True) - relation_id = self.harness.add_relation("kafka", "client_app") - self.harness.update_relation_data( - self.harness.charm.model.get_relation("cluster").id, - "kafka", - {"relation-1": "keepitsecret"}, - ) - - config = self.harness.charm.client_relation.relation_config( - relation=self.harness.charm.model.get_relation( - relation_name="kafka", relation_id=relation_id - ) - ) - - self.assertEqual(config["password"], "keepitsecret")