diff --git a/actions.yaml b/actions.yaml index b1445114..850346c0 100644 --- a/actions.yaml +++ b/actions.yaml @@ -1,2 +1,15 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. + +set-password: + description: Change the system user's password, which is used by the charm. + It is for internal charm users and SHOULD NOT be used by applications. + This action must be called on the leader unit. + params: + username: + type: string + description: The username, the default value 'operator'. + Possible values - operator + password: + type: string + description: The password will be auto-generated if this option is not specified. diff --git a/pyproject.toml b/pyproject.toml index 0e04ddba..bcd209ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ show_missing = true [tool.pytest.ini_options] minversion = "6.0" log_cli_level = "INFO" +asyncio_mode = "auto" # Formatting tools configuration [tool.black] diff --git a/src/charm.py b/src/charm.py index b03baddc..33d7062a 100755 --- a/src/charm.py +++ b/src/charm.py @@ -5,11 +5,9 @@ """Charmed Machine Operator for Apache Kafka.""" import logging -import secrets -import string from typing import List -from ops.charm import CharmBase, RelationEvent, RelationJoinedEvent +from ops.charm import ActionEvent, CharmBase, RelationEvent, RelationJoinedEvent from ops.framework import EventBase from ops.main import main from ops.model import ActiveStatus, BlockedStatus, Container, Relation, WaitingStatus @@ -17,8 +15,9 @@ from config import KafkaConfig from connection_check import broker_active, zookeeper_connected -from literals import CHARM_KEY, PEER, ZOOKEEPER_REL_NAME +from literals import CHARM_KEY, CHARM_USERS, PEER, ZOOKEEPER_REL_NAME from provider import KafkaProvider +from utils import generate_password logger = logging.getLogger(__name__) @@ -44,6 +43,8 @@ def __init__(self, *args): self.on[ZOOKEEPER_REL_NAME].relation_broken, self._on_zookeeper_broken ) + self.framework.observe(self.on.set_password_action, self._set_password_action) + @property def container(self) -> Container: """Grabs the current Kafka container.""" @@ -146,14 +147,9 @@ def _on_kafka_pebble_ready(self, event: EventBase) -> None: def _on_leader_elected(self, _) -> None: """Handler for `leader_elected` event, ensuring sync_passwords gets set.""" sync_password = self.kafka_config.sync_password - if not sync_password: - self.peer_relation.data[self.app].update( - { - "sync_password": "".join( - [secrets.choice(string.ascii_letters + string.digits) for _ in range(32)] - ) - } - ) + self.peer_relation.data[self.app].update( + {"sync_password": sync_password or generate_password()} + ) def _on_zookeeper_joined(self, event: RelationJoinedEvent) -> None: """Handler for `zookeeper_relation_joined` event, ensuring chroot gets set.""" @@ -170,6 +166,43 @@ def _on_zookeeper_broken(self, event: RelationEvent) -> None: self.container.stop(CHARM_KEY) self.unit.status = BlockedStatus("missing required zookeeper relation") + def _set_password_action(self, event: ActionEvent): + """Handler for set-password action. + + Set the password for a specific user, if no passwords are passed, generate them. + """ + if not self.unit.is_leader(): + msg = "Password rotation must be called on leader unit" + logger.error(msg) + event.fail(msg) + return + + username = event.params.get("username", "sync") + if username not in CHARM_USERS: + msg = f"The action can be run only for users used by the charm: {CHARM_USERS} not {username}." + logger.error(msg) + event.fail(msg) + return + + new_password = event.params.get("password", generate_password()) + + if new_password == self.kafka_config.sync_password: + event.log("The old and new passwords are equal.") + event.set_results({f"{username}-password": new_password}) + return + + # Update the user + try: + self.add_user_to_zookeeper(username=username, password=new_password) + except ExecError as e: + logger.error(str(e)) + event.fail(str(e)) + return + + # Store the password on application databag + self.peer_relation.data[self.app].update({f"{username}_password": new_password}) + event.set_results({f"{username}-password": new_password}) + def add_user_to_zookeeper(self, username: str, password: str) -> None: """Adds user credentials to ZooKeeper for authorising clients and brokers. diff --git a/src/literals.py b/src/literals.py index 30a56936..24b3d4e3 100644 --- a/src/literals.py +++ b/src/literals.py @@ -8,3 +8,4 @@ PEER = "cluster" ZOOKEEPER_REL_NAME = "zookeeper" REL_NAME = "kafka" +CHARM_USERS = ["sync"] diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 00000000..4d43c8da --- /dev/null +++ b/src/utils.py @@ -0,0 +1,20 @@ +# !/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Collection of helper methods for checking active connections between ZK and Kafka.""" + +import logging +import secrets +import string + +logger = logging.getLogger(__name__) + + +def generate_password() -> str: + """Creates randomized string for use as app passwords. + + Returns: + String of 32 randomized letter+digit characters + """ + return "".join([secrets.choice(string.ascii_letters + string.digits) for _ in range(32)]) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index d8b78b11..372535e7 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -4,7 +4,7 @@ import re from pathlib import Path from subprocess import PIPE, check_output -from typing import Any, List, Tuple +from typing import Any, Dict, List, Tuple import yaml from pytest_operator.plugin import OpsTest @@ -27,6 +27,19 @@ def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: assert "SCRAM-SHA-512" in result +def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: + """Get information related to a user stored on zookeeper.""" + container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh --container kafka kafka-k8s/0 '{container_command}'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return result + + def show_unit(unit_name: str, model_full_name: str) -> Any: result = check_output( f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", @@ -59,6 +72,34 @@ def get_zookeeper_connection(unit_name: str, model_full_name: str) -> Tuple[List raise Exception("config not found") +def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str, str]: + result = show_unit(unit_name=unit_name, model_full_name=model_full_name) + relations_info = result[unit_name]["relation-info"] + + zk_relation_data = {} + for info in relations_info: + if info["endpoint"] == "zookeeper": + zk_relation_data["chroot"] = info["application-data"]["chroot"] + zk_relation_data["endpoints"] = info["application-data"]["endpoints"] + zk_relation_data["password"] = info["application-data"]["password"] + zk_relation_data["uris"] = info["application-data"]["uris"] + zk_relation_data["username"] = info["application-data"]["username"] + return zk_relation_data + + +async def set_password(ops_test: OpsTest, username="sync", password=None, num_unit=0) -> str: + """Use the charm action to start a password rotation.""" + params = {"username": username} + if password: + params["password"] = password + + action = await ops_test.model.units.get(f"{APP_NAME}/{num_unit}").run_action( + "set-password", **params + ) + password = await action.wait() + return password.results + + def check_application_status(ops_test: OpsTest, app_name: str) -> str: """Return the application status for an app name.""" model_name = ops_test.model.info.name diff --git a/tests/integration/test_password_rotation.py b/tests/integration/test_password_rotation.py new file mode 100644 index 00000000..e4f321bb --- /dev/null +++ b/tests/integration/test_password_rotation.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging + +import pytest +from helpers import ( + APP_NAME, + KAFKA_CONTAINER, + ZK_NAME, + check_application_status, + get_kafka_zk_relation_data, + get_user, + set_password, +) +from pytest_operator.plugin import OpsTest + +logger = logging.getLogger(__name__) + + +@pytest.mark.abort_on_fail +@pytest.mark.skip_if_deployed +async def test_build_and_deploy(ops_test: OpsTest): + kafka_charm = await ops_test.build_charm(".") + await asyncio.gather( + ops_test.model.deploy(ZK_NAME, channel="edge", application_name=ZK_NAME, num_units=3), + ops_test.model.deploy( + kafka_charm, + application_name=APP_NAME, + resources={"kafka-image": KAFKA_CONTAINER}, + num_units=1, + ), + ) + await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3) + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], timeout=1000) + + assert check_application_status(ops_test, APP_NAME) == "waiting" + assert ops_test.model.applications[ZK_NAME].status == "active" + + await ops_test.model.add_relation(APP_NAME, ZK_NAME) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) + + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[ZK_NAME].status == "active" + + +async def test_password_rotation(ops_test: OpsTest): + """Check that password stored on ZK has changed after a password rotation.""" + relation_data = get_kafka_zk_relation_data( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + uri = relation_data["uris"].split(",")[-1] + + initial_sync_user = get_user( + username="sync", + zookeeper_uri=uri, + model_full_name=ops_test.model_full_name, + ) + + result = await set_password(ops_test, username="sync", num_unit=0) + assert "sync-password" in result.keys() + + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) + + new_sync_user = get_user( + username="sync", + zookeeper_uri=uri, + model_full_name=ops_test.model_full_name, + ) + + assert initial_sync_user != new_sync_user