diff --git a/metadata.yaml b/metadata.yaml index 755bb216..7f854dd5 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -13,14 +13,14 @@ containers: kafka: resource: kafka-image mounts: - - storage: log-data - location: /logs/kafka + - storage: data + location: /var/lib/kafka/data resources: kafka-image: type: oci-image description: OCI Image for Apache Kafka - upstream-source: ghcr.io/canonical/charmed-kafka:3.3.2-22.04_edge + upstream-source: ghcr.io/canonical/charmed-kafka:3-edge peers: cluster: @@ -48,10 +48,11 @@ provides: interface: grafana_dashboard storage: - log-data: + data: type: filesystem description: Directories where the log data is stored minimum-size: 10G + location: /var/lib/kafka/data assumes: - k8s-api diff --git a/src/auth.py b/src/auth.py index 70799956..a82aebde 100644 --- a/src/auth.py +++ b/src/auth.py @@ -7,9 +7,7 @@ import logging import re from dataclasses import asdict, dataclass -from typing import List, Optional, Set - -from ops.model import Container +from typing import Optional, Set from utils import run_bin_command @@ -29,11 +27,11 @@ class Acl: class KafkaAuth: """Object for updating Kafka users and ACLs.""" - def __init__(self, charm, opts: List[str], zookeeper: str, container: Container): + def __init__(self, charm): self.charm = charm - self.opts = " ".join(opts) - self.zookeeper = zookeeper - self.container = container + self.opts = self.charm.kafka_config.auth_args + self.zookeeper = self.charm.kafka_config.zookeeper_config.get("connect", "") + self.container = self.charm.container self.current_acls: Set[Acl] = set() self.new_user_acls: Set[Acl] = set() diff --git a/src/charm.py b/src/charm.py index 1f6a0315..683a347e 100755 --- a/src/charm.py +++ b/src/charm.py @@ -15,17 +15,16 @@ from ops import pebble from ops.charm import ( ActionEvent, - LeaderElectedEvent, - PebbleReadyEvent, + RelationChangedEvent, + RelationCreatedEvent, RelationEvent, - RelationJoinedEvent, StorageAttachedEvent, StorageDetachingEvent, ) from ops.framework import EventBase from ops.main import main -from ops.model import ActiveStatus, BlockedStatus, Container, Relation, WaitingStatus -from ops.pebble import Layer, PathError, ProtocolError +from ops.model import Container, Relation, StatusBase +from ops.pebble import ExecError, Layer, PathError, ProtocolError from auth import KafkaAuth from config import KafkaConfig @@ -33,11 +32,15 @@ ADMIN_USER, CHARM_KEY, CONTAINER, - INTER_BROKER_USER, + INTERNAL_USERS, + JAVA_HOME, JMX_EXPORTER_PORT, + LOGS_PATH, PEER, REL_NAME, ZOOKEEPER_REL_NAME, + DebugLevel, + Status, ) from provider import KafkaProvider from structured_config import CharmConfig @@ -66,22 +69,25 @@ def __init__(self, *args): self.grafana_dashboards = GrafanaDashboardProvider(self) self.loki_push = LogProxyConsumer( self, - log_files=["/opt/kafka/logs/server.log"], + log_files=[f"{LOGS_PATH}/server.log"], relation_name="logging", container_name="kafka", ) self.framework.observe(getattr(self.on, "kafka_pebble_ready"), self._on_kafka_pebble_ready) - self.framework.observe(getattr(self.on, "leader_elected"), self._on_leader_elected) self.framework.observe(getattr(self.on, "config_changed"), self._on_config_changed) + self.framework.observe(getattr(self.on, "update_status"), self._on_update_status) self.framework.observe(self.on[PEER].relation_changed, self._on_config_changed) self.framework.observe( - self.on[ZOOKEEPER_REL_NAME].relation_joined, self._on_zookeeper_joined + self.on[ZOOKEEPER_REL_NAME].relation_created, self._on_zookeeper_created ) self.framework.observe( - self.on[ZOOKEEPER_REL_NAME].relation_changed, self._on_config_changed + self.on[ZOOKEEPER_REL_NAME].relation_joined, self._on_zookeeper_changed + ) + self.framework.observe( + self.on[ZOOKEEPER_REL_NAME].relation_changed, self._on_zookeeper_changed ) self.framework.observe( self.on[ZOOKEEPER_REL_NAME].relation_broken, self._on_zookeeper_broken @@ -94,10 +100,10 @@ def __init__(self, *args): ) self.framework.observe( - getattr(self.on, "log_data_storage_attached"), self._on_storage_attached + getattr(self.on, "data_storage_attached"), self._on_storage_attached ) self.framework.observe( - getattr(self.on, "log_data_storage_detaching"), self._on_storage_detaching + getattr(self.on, "data_storage_detaching"), self._on_storage_detaching ) @property @@ -117,7 +123,13 @@ def _kafka_layer(self) -> Layer: "summary": "kafka", "command": self.kafka_config.kafka_command, "startup": "enabled", - "environment": {"KAFKA_OPTS": " ".join(self.kafka_config.extra_args)}, + "user": "kafka", + "group": "kafka", + "environment": { + "KAFKA_OPTS": " ".join(self.kafka_config.extra_args), + "JAVA_HOME": JAVA_HOME, + "LOG_DIR": LOGS_PATH, + }, } }, } @@ -144,93 +156,172 @@ def unit_peer_data(self) -> MutableMapping[str, str]: return self.peer_relation.data[self.unit] + @property + def ready_to_start(self) -> bool: + """Check for active ZooKeeper relation and adding of inter-broker auth username. + + Returns: + True if ZK is related and `sync` user has been added. False otherwise. + """ + if not self.peer_relation: + self._set_status(Status.NO_PEER_RELATION) + return False + + if not self.kafka_config.zookeeper_related: + self._set_status(Status.ZK_NOT_RELATED) + return False + + if not self.kafka_config.zookeeper_connected: + self._set_status(Status.ZK_NO_DATA) + return False + + # TLS must be enabled for Kafka and ZK or disabled for both + if self.tls.enabled ^ ( + self.kafka_config.zookeeper_config.get("tls", "disabled") == "enabled" + ): + self._set_status(Status.ZK_TLS_MISMATCH) + return False + + if not self.kafka_config.internal_user_credentials: + self._set_status(Status.NO_BROKER_CREDS) + return False + + return True + + @property + def healthy(self) -> bool: + """Checks and updates various charm lifecycle states. + + Is slow to fail due to retries, to be used sparingly. + + Returns: + True if service is alive and active. Otherwise False + """ + if not self.ready_to_start: + return False + + if not self.container.get_service("kafka").is_running(): + self._set_status(Status.SERVICE_NOT_RUNNING) + return False + + return True + + def _on_update_status(self, _: EventBase) -> None: + """Handler for `update-status` events.""" + if not self.healthy: + return + + if not broker_active( + unit=self.unit, + zookeeper_config=self.kafka_config.zookeeper_config, + ): + self._set_status(Status.ZK_NOT_CONNECTED) + return + + self._set_status(Status.ACTIVE) + def _on_storage_attached(self, event: StorageAttachedEvent) -> None: """Handler for `storage_attached` events.""" # checks first whether the broker is active before warning - if not self.kafka_config.zookeeper_connected or not broker_active( - unit=self.unit, zookeeper_config=self.kafka_config.zookeeper_config - ): + if not self.ready_to_start: return # new dirs won't be used until topic partitions are assigned to it # either automatically for new topics, or manually for existing - message = ( - "manual partition reassignment may be needed for Kafka to utilize new storage volumes" - ) - logger.warning(f"attaching storage - {message}") - self.unit.status = ActiveStatus(message) - + self._set_status(Status.ADDED_STORAGE) self._on_config_changed(event) def _on_storage_detaching(self, event: StorageDetachingEvent) -> None: """Handler for `storage_detaching` events.""" - # checks first whether the broker is active before warning - if not self.kafka_config.zookeeper_connected or not broker_active( - unit=self.unit, zookeeper_config=self.kafka_config.zookeeper_config - ): - return - # in the case where there may be replication recovery may be possible if self.peer_relation and len(self.peer_relation.units): - message = "manual partition reassignment from replicated brokers recommended due to lost partitions on removed storage volumes" - logger.warning(f"removing storage - {message}") - self.unit.status = BlockedStatus(message) + self._set_status(Status.REMOVED_STORAGE) else: - message = "potential log-data loss due to storage removal without replication" - logger.error(f"removing storage - {message}") - self.unit.status = BlockedStatus(message) + self._set_status(Status.REMOVED_STORAGE_NO_REPL) self._on_config_changed(event) - def _on_kafka_pebble_ready(self, event: PebbleReadyEvent) -> None: - """Handler for `kafka_pebble_ready` event.""" + def _on_zookeeper_created(self, event: RelationCreatedEvent) -> None: + """Handler for `zookeeper_relation_created` events.""" + if self.unit.is_leader(): + event.relation.data[self.app].update({"chroot": "/" + self.app.name}) + + def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None: + """Handler for `zookeeper_relation_created/joined/changed` events, ensuring internal users get created.""" if not self.container.can_connect(): event.defer() return if not self.kafka_config.zookeeper_connected: - self.unit.status = WaitingStatus("waiting for zookeeper relation") + self._set_status(Status.ZK_NO_DATA) event.defer() return - # required settings given zookeeper connection config has been created - self.kafka_config.set_server_properties() - self.kafka_config.set_zk_jaas_config() - self.kafka_config.set_client_properties() + # TLS must be enabled for Kafka and ZK or disabled for both + if self.tls.enabled ^ ( + self.kafka_config.zookeeper_config.get("tls", "disabled") == "enabled" + ): + event.defer() + self._set_status(Status.ZK_TLS_MISMATCH) + return - # do not start units until SCRAM users have been added to ZooKeeper for server-server auth - # set internal passwords - if self.unit.is_leader(): - for username, password in self.kafka_config.internal_user_credentials.items(): - updated_user = self.update_internal_user(username=username, password=password) - # do not continue until both internal users are updated - if not updated_user: - event.defer() - return + # do not create users until certificate + keystores created + # otherwise unable to authenticate to ZK + if self.tls.enabled and not self.tls.certificate: + event.defer() + self._set_status(Status.NO_CERT) + return + + if not self.kafka_config.internal_user_credentials and self.unit.is_leader(): + # loading the minimum config needed to authenticate to zookeeper + self.kafka_config.set_environment() + self.kafka_config.set_zk_jaas_config() + self.kafka_config.set_server_properties() + + try: + internal_user_credentials = self._create_internal_credentials() + except (KeyError, RuntimeError, ExecError) as e: + logger.warning(str(e)) + event.defer() + return + + # only set to relation data when all set + for username, password in internal_user_credentials: + self.set_secret(scope="app", key=f"{username}-password", value=password) + + self._on_config_changed(event) + + def _on_zookeeper_broken(self, event: RelationEvent) -> None: + """Handler for `zookeeper_relation_departed/broken` events.""" + if not self.container.can_connect(): + event.defer() + return - # updating non-leader units of creation of internal users - self.peer_relation.data[self.app].update({"broker-creds": "added"}) + logger.info("stopping kafka service") + self.container.stop(CONTAINER) + self._set_status(Status.ZK_NOT_RELATED) - # for non-leader units + def _on_kafka_pebble_ready(self, event: EventBase) -> None: + """Handler for `start` event.""" if not self.ready_to_start: event.defer() return + # required settings given zookeeper connection config has been created + self.kafka_config.set_environment() + self.kafka_config.set_server_properties() + self.kafka_config.set_zk_jaas_config() + self.kafka_config.set_client_properties() + # start kafka service self.container.add_layer(CONTAINER, self._kafka_layer, combine=True) self.container.replan() + # flag that the unit has actually started the new layer service, not default + self.unit_peer_data.update({"started": "True"}) + # service_start might fail silently, confirm with ZK if kafka is actually connected - if broker_active( - unit=self.unit, - zookeeper_config=self.kafka_config.zookeeper_config, - ): - logger.info(f'Broker {self.unit.name.split("/")[1]} connected') - self.unit_peer_data.update({"state": "started"}) - self.unit.status = ActiveStatus() - else: - self.unit.status = BlockedStatus("kafka unit not connected to ZooKeeper") - return + self._on_update_status(event) def _on_config_changed(self, event: EventBase) -> None: """Generic handler for most `config_changed` events across relations.""" @@ -272,46 +363,57 @@ def _on_config_changed(self, event: EventBase) -> None: if self.model.relations.get(REL_NAME, None) and self.unit.is_leader(): self.client_relations.update_connection_info() - def _on_leader_elected(self, event: LeaderElectedEvent) -> None: - """Handler for `leader_elected` event, ensuring internal user passwords get set.""" - if not self.peer_relation: - logger.debug("no peer relation") + def _restart(self, event: EventBase) -> None: + """Handler for `rolling_ops` restart events.""" + # only attempt restart if service is already active + if not self.healthy or not self.unit_peer_data.get("started", ""): event.defer() return - self.set_internal_passwords() + self.container.restart(CONTAINER) - def _on_zookeeper_joined(self, event: RelationJoinedEvent) -> None: - """Handler for `zookeeper_relation_joined` event, ensuring chroot gets set.""" - if self.unit.is_leader(): - event.relation.data[self.app].update({"chroot": "/" + self.app.name}) + if self.healthy: + logger.info(f'Broker {self.unit.name.split("/")[1]} restarted') + else: + logger.error(f"Broker {self.unit.name.split('/')[1]} failed to restart") - def _on_zookeeper_broken(self, event: RelationEvent) -> None: - """Handler for `zookeeper_relation_departed/broken` events.""" - if not self.container.can_connect(): - event.defer() + def _get_admin_credentials_action(self, event: ActionEvent) -> None: + raw_properties = str( + self.container.pull(self.kafka_config.client_properties_filepath).read() + ) + client_properties = raw_properties.splitlines() + + if not client_properties: + msg = "client.properties file not found on target unit." + logger.error(msg) + event.fail(msg) return - logger.info("stopping kafka service") - self.container.stop(CONTAINER) - self.unit.status = BlockedStatus("missing required zookeeper relation") + admin_properties = set(client_properties) - set(self.kafka_config.tls_properties) + + event.set_results( + { + "username": ADMIN_USER, + "password": self.kafka_config.internal_user_credentials[ADMIN_USER], + "client-properties": "\n".join(admin_properties), + } + ) def _set_password_action(self, event: ActionEvent) -> None: """Handler for set-password action. Set the password for a specific user, if no passwords are passed, generate them. """ - if not self.peer_relation: - logger.debug("no peer relation") - event.defer() - return - if not self.unit.is_leader(): msg = "Password rotation must be called on leader unit" logger.error(msg) event.fail(msg) return + if not self.healthy: + event.defer() + return + username = event.params["username"] new_password = event.params.get("password", generate_password()) @@ -321,125 +423,55 @@ def _set_password_action(self, event: ActionEvent) -> None: event.fail(msg) return - user_updated = self.update_internal_user(username=username, password=new_password) - if not user_updated: - event.fail("Unable to update user.") - return + try: + self._update_internal_user(username=username, password=new_password) + except Exception as e: + logger.error(str(e)) + event.fail(f"unable to set password for {username}") # Store the password on application databag self.set_secret(scope="app", key=f"{username}-password", value=new_password) event.set_results({f"{username}-password": new_password}) - def _restart(self, event: EventBase) -> None: - """Handler for `rolling_ops` restart events.""" - # ensures service isn't referenced before pebble ready - if not self.unit_peer_data.get("state", None) == "started": - event.defer() - return - - self.container.restart(CONTAINER) + def _update_internal_user(self, username: str, password: str) -> None: + """Updates internal SCRAM usernames and passwords. - if broker_active( - unit=self.unit, - zookeeper_config=self.kafka_config.zookeeper_config, - ): - logger.info(f'Broker {self.unit.name.split("/")[1]} restarted') - self.unit.status = ActiveStatus() - else: - self.unit.status = BlockedStatus( - f"Broker {self.unit.name.split('/')[1]} failed to restart" - ) - return - - @property - def ready_to_start(self) -> bool: - """Check for active ZooKeeper relation and adding of inter-broker auth username. - - Returns: - True if ZK is related and `sync` user has been added. False otherwise. + Raises: + RuntimeError if called from non-leader unit + KeyError if attempted to update non-leader unit + ExecError if command to ZooKeeper failed """ - if not self.peer_relation: - logger.debug("no peer relation") - return False - - # TLS must be enabled for Kafka and ZK or disabled for both - if self.tls.enabled ^ ( - self.kafka_config.zookeeper_config.get("tls", "disabled") == "enabled" - ): - msg = "TLS must be enabled for Zookeeper and Kafka" - logger.error(msg) - self.unit.status = BlockedStatus(msg) - return False - - if not self.kafka_config.zookeeper_connected or not self.peer_relation.data[self.app].get( - "broker-creds", None - ): - return False - - return True - - def _get_admin_credentials_action(self, event: ActionEvent) -> None: - raw_properties = str( - self.container.pull(self.kafka_config.client_properties_filepath).read() - ) - client_properties = raw_properties.splitlines() - - if not client_properties: - msg = "client.properties file not found on target unit." - logger.error(msg) - event.fail(msg) - return - - admin_properties = set(client_properties) - set(self.kafka_config.tls_properties) - - event.set_results( - { - "username": ADMIN_USER, - "password": self.kafka_config.internal_user_credentials[ADMIN_USER], - "client-properties": "\n".join(admin_properties), - } - ) - - def update_internal_user(self, username: str, password: str) -> bool: - """Updates internal SCRAM usernames and passwords.""" if not self.unit.is_leader(): - logger.debug("Cannot update internal user from non-leader unit.") - return False + raise RuntimeError("Cannot update internal user from non-leader unit.") - if username not in [INTER_BROKER_USER, ADMIN_USER]: - msg = f"Can only update internal charm users: {INTER_BROKER_USER} or {ADMIN_USER}, not {username}." - logger.error(msg) - return False + if username not in INTERNAL_USERS: + raise KeyError( + f"Can only update internal charm users: {INTERNAL_USERS}, not {username}." + ) # do not start units until SCRAM users have been added to ZooKeeper for server-server auth - kafka_auth = KafkaAuth( - self, - opts=self.kafka_config.auth_args, - zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), - container=self.container, + kafka_auth = KafkaAuth(self) + kafka_auth.add_user( + username=username, + password=password, ) - try: - kafka_auth.add_user( - username=username, - password=password, - ) - return True - except Exception as e: - # command to add users fails if attempted too early - logger.info(f"Exception: {str(e)}") - return False + def _create_internal_credentials(self) -> list[tuple[str, str]]: + """Creates internal SCRAM users during cluster start. - def set_internal_passwords(self) -> None: - """Sets inter-broker and admin user passwords to app relation data.""" - if not self.unit.is_leader(): - return + Returns: + List of (username, password) for all internal users - for password in [f"{INTER_BROKER_USER}-password", f"{ADMIN_USER}-password"]: - current_password = self.get_secret(scope="app", key=password) - self.set_secret( - scope="app", key=password, value=(current_password or generate_password()) - ) + Raises: + RuntimeError if called from non-leader unit + KeyError if attempted to update non-leader unit + subprocess.CalledProcessError if command to ZooKeeper failed + """ + credentials = [(username, generate_password()) for username in INTERNAL_USERS] + for username, password in credentials: + self._update_internal_user(username=username, password=password) + + return credentials def get_secret(self, scope: str, key: str) -> Optional[str]: """Get TLS secret from the secret storage. @@ -480,6 +512,14 @@ def set_secret(self, scope: str, key: str, value: Optional[str]) -> None: else: raise RuntimeError("Unknown secret scope.") + def _set_status(self, key: Status) -> None: + """Sets charm status.""" + status: StatusBase = key.value.status + log_level: DebugLevel = key.value.log_level + + getattr(logger, log_level.lower())(status.message) + self.unit.status = status + if __name__ == "__main__": main(KafkaK8sCharm) diff --git a/src/config.py b/src/config.py index 1b2ae64e..b21a831d 100644 --- a/src/config.py +++ b/src/config.py @@ -11,10 +11,14 @@ from literals import ( ADMIN_USER, + BINARIES_PATH, + CONF_PATH, CONTAINER, - DATA_DIR, INTER_BROKER_USER, + INTERNAL_USERS, + JAVA_HOME, JMX_EXPORTER_PORT, + LOGS_PATH, PEER, REL_NAME, SECURITY_PROTOCOL_PORTS, @@ -102,28 +106,32 @@ class KafkaConfig: def __init__(self, charm): self.charm = charm - self.default_config_path = f"{DATA_DIR}/config" - self.server_properties_filepath = f"{self.default_config_path}/server.properties" - self.client_properties_filepath = f"{self.default_config_path}/client.properties" - self.zk_jaas_filepath = f"{self.default_config_path}/kafka-jaas.cfg" - self.keystore_filepath = f"{self.default_config_path}/keystore.p12" - self.truststore_filepath = f"{self.default_config_path}/truststore.jks" - self.jmx_exporter_filepath = "/opt/kafka/extra/jmx_prometheus_javaagent.jar" - self.jmx_config_filepath = "/opt/kafka/default-config/jmx_prometheus.yaml" + self.server_properties_filepath = f"{CONF_PATH}/server.properties" + self.client_properties_filepath = f"{CONF_PATH}/client.properties" + self.zk_jaas_filepath = f"{CONF_PATH}/kafka-jaas.cfg" + self.keystore_filepath = f"{CONF_PATH}/keystore.p12" + self.truststore_filepath = f"{CONF_PATH}/truststore.jks" + self.jmx_exporter_filepath = f"{BINARIES_PATH}/jmx_prometheus_javaagent.jar" + self.jmx_config_filepath = f"{CONF_PATH}/jmx_prometheus.yaml" @property def internal_user_credentials(self) -> Dict[str, str]: """The charm internal usernames and passwords, e.g `sync` and `admin`. Returns: - Dict of usernames and passwords. + Dict of usernames and passwords """ - return { + credentials = { user: password - for user in [INTER_BROKER_USER, ADMIN_USER] + for user in INTERNAL_USERS if (password := self.charm.get_secret(scope="app", key=f"{user}-password")) } + if not len(credentials) == len(INTERNAL_USERS): + return {} + + return credentials + @property def container(self) -> Container: """Grabs the current Kafka container.""" @@ -145,6 +153,9 @@ def zookeeper_config(self) -> Dict[str, str]: zookeeper_config = {} # loop through all relations to ZK, attempt to find all needed config for relation in self.charm.model.relations[ZOOKEEPER_REL_NAME]: + if not relation.app: + continue + zk_keys = ["username", "password", "endpoints", "chroot", "uris", "tls"] missing_config = any( relation.data[relation.app].get(key, None) is None for key in zk_keys @@ -167,6 +178,15 @@ def zookeeper_config(self) -> Dict[str, str]: return zookeeper_config + @property + def zookeeper_related(self) -> bool: + """Checks if there is a relation with ZooKeeper. + + Returns: + True if there is a ZooKeeper relation. Otherwise False + """ + return bool(self.charm.model.relations[ZOOKEEPER_REL_NAME]) + @property def zookeeper_connected(self) -> bool: """Checks if there is an active ZooKeeper relation. @@ -227,7 +247,7 @@ def kafka_command(self) -> str: Returns: String of startup command and expected config filepath """ - entrypoint = "/opt/kafka/bin/kafka-server-start.sh" + entrypoint = f"{BINARIES_PATH}/bin/kafka-server-start.sh" return f"{entrypoint} {self.server_properties_filepath}" @property @@ -299,15 +319,18 @@ def scram_properties(self) -> List[str]: Returns: list of scram properties to be set. """ + username = INTER_BROKER_USER + password = self.internal_user_credentials.get(INTER_BROKER_USER, "") + scram_properties = [ - f'listener.name.{self.internal_listener.name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{INTER_BROKER_USER}" password="{self.internal_user_credentials[INTER_BROKER_USER]}";' + f'listener.name.{self.internal_listener.name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";' ] client_scram = [ auth.name for auth in self.client_listeners if auth.protocol.startswith("SASL_") ] for name in client_scram: scram_properties.append( - f'listener.name.{name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{INTER_BROKER_USER}" password="{self.internal_user_credentials[INTER_BROKER_USER]}";' + f'listener.name.{name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";' ) return scram_properties @@ -489,11 +512,11 @@ def set_client_properties(self) -> None: container=self.container, ) - def set_kafka_opts(self) -> None: + def set_environment(self) -> None: """Writes the env-vars needed for SASL/SCRAM auth to `/etc/environment` on the unit.""" opts_string = " ".join(self.extra_args) push( - content=f"KAFKA_OPTS={opts_string}", + content=f"KAFKA_OPTS={opts_string}\nLOG_DIR={LOGS_PATH}\nJAVA_HOME={JAVA_HOME}", path="/etc/environment", container=self.container, ) diff --git a/src/literals.py b/src/literals.py index 07ad3fc9..85977280 100644 --- a/src/literals.py +++ b/src/literals.py @@ -5,8 +5,11 @@ """Literals used by the Kafka K8s charm.""" from dataclasses import dataclass +from enum import Enum from typing import Dict, Literal +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, StatusBase, WaitingStatus + CHARM_KEY = "kafka-k8s" ZK_NAME = "zookeeper-k8s" PEER = "cluster" @@ -15,18 +18,22 @@ REL_NAME = "kafka-client" TLS_RELATION = "certificates" CONTAINER = "kafka" -STORAGE = "log-data" -DATA_DIR = "/data/kafka" -LOG_DIR = "/logs/kafka" +STORAGE = "data" JMX_EXPORTER_PORT = 9101 +CONF_PATH = "/etc/kafka" +DATA_PATH = "/var/lib/kafka" +LOGS_PATH = "/var/log/kafka" +BINARIES_PATH = "/opt/kafka" +JAVA_HOME = "/usr/lib/jvm/java-17-openjdk-amd64" + INTER_BROKER_USER = "sync" ADMIN_USER = "admin" - INTERNAL_USERS = [INTER_BROKER_USER, ADMIN_USER] AuthMechanism = Literal["SASL_PLAINTEXT", "SASL_SSL", "SSL"] Scope = Literal["INTERNAL", "CLIENT"] +DebugLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"] @dataclass @@ -40,3 +47,39 @@ class Ports: "SASL_SSL": Ports(9093, 19093), "SSL": Ports(9094, 19094), } + + +@dataclass +class StatusLevel: + status: StatusBase + log_level: DebugLevel + + +class Status(Enum): + ACTIVE = StatusLevel(ActiveStatus(), "DEBUG") + NO_PEER_RELATION = StatusLevel(MaintenanceStatus("no peer relation yet"), "DEBUG") + ZK_NOT_RELATED = StatusLevel(BlockedStatus("missing required zookeeper relation"), "ERROR") + ZK_NOT_CONNECTED = StatusLevel(BlockedStatus("unit not connected to zookeeper"), "ERROR") + ZK_TLS_MISMATCH = StatusLevel( + BlockedStatus("tls must be enabled on both kafka and zookeeper"), "ERROR" + ) + ZK_NO_DATA = StatusLevel(WaitingStatus("zookeeper credentials not created yet"), "INFO") + ADDED_STORAGE = StatusLevel( + ActiveStatus("manual partition reassignment may be needed to utilize new storage volumes"), + "WARNING", + ) + REMOVED_STORAGE = StatusLevel( + ActiveStatus( + "manual partition reassignment from replicated brokers recommended due to lost partitions on removed storage volumes" + ), + "ERROR", + ) + REMOVED_STORAGE_NO_REPL = StatusLevel( + ActiveStatus("potential data loss due to storage removal without replication"), + "ERROR", + ) + NO_BROKER_CREDS = StatusLevel( + WaitingStatus("internal broker credentials not yet added"), "INFO" + ) + NO_CERT = StatusLevel(WaitingStatus("unit waiting for signed certificates"), "INFO") + SERVICE_NOT_RUNNING = StatusLevel(BlockedStatus("service not running"), "WARNING") diff --git a/src/provider.py b/src/provider.py index 53c19d78..6dd941a9 100644 --- a/src/provider.py +++ b/src/provider.py @@ -5,14 +5,17 @@ """KafkaProvider class and methods.""" import logging +from typing import Optional from charms.data_platform_libs.v0.data_interfaces import KafkaProvides, TopicRequestedEvent from ops.charm import RelationBrokenEvent, RelationCreatedEvent from ops.framework import Object +from ops.model import Relation +from ops.pebble import ExecError from auth import KafkaAuth from config import KafkaConfig -from literals import CONTAINER, REL_NAME +from literals import PEER, REL_NAME from utils import generate_password logger = logging.getLogger(__name__) @@ -27,9 +30,6 @@ def __init__(self, charm) -> None: self.kafka_config = KafkaConfig(self.charm) self.kafka_auth = KafkaAuth( charm, - container=self.charm.unit.get_container(CONTAINER), - opts=self.kafka_config.auth_args, - zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), ) self.kafka_provider = KafkaProvides(self.charm, REL_NAME) @@ -40,6 +40,11 @@ def __init__(self, charm) -> None: self.framework.observe(self.kafka_provider.on.topic_requested, self.on_topic_requested) + @property + def peer_relation(self) -> Optional[Relation]: + """The Kafka cluster's peer relation.""" + return self.charm.model.get_relation(PEER) + def _on_relation_created(self, event: RelationCreatedEvent) -> None: """Handler for `kafka-client-relation-created` event.""" # this will trigger kafka restart (if needed) before granting credentials @@ -47,30 +52,21 @@ def _on_relation_created(self, event: RelationCreatedEvent) -> None: def on_topic_requested(self, event: TopicRequestedEvent): """Handle the on topic requested event.""" - if not self.charm.unit.is_leader(): - return - - if not self.charm.ready_to_start: - logger.debug("cannot add user, ZooKeeper not yet connected") - event.defer() - return - - if not self.charm.kafka_config.zookeeper_connected: - logger.debug("cannot update ACLs, ZooKeeper not yet connected") + if not self.charm.healthy: event.defer() return + # on all unit update the server properties to enable client listener if needed self.charm._on_config_changed(event) - extra_user_roles = event.extra_user_roles - topic = event.topic + if not self.charm.unit.is_leader() or not self.peer_relation: + return + extra_user_roles = event.extra_user_roles or "" + topic = event.topic or "" relation = event.relation - username = f"relation-{relation.id}" - password = ( - self.charm.peer_relation.data[self.charm.app].get(username) or generate_password() - ) + password = self.peer_relation.data[self.charm.app].get(username) or generate_password() bootstrap_server = self.charm.kafka_config.bootstrap_server zookeeper_uris = self.charm.kafka_config.zookeeper_config.get("connect", "") tls = "enabled" if self.charm.tls.enabled else "disabled" @@ -78,18 +74,20 @@ def on_topic_requested(self, event: TopicRequestedEvent): consumer_group_prefix = ( event.consumer_group_prefix or f"{username}-" if "consumer" in extra_user_roles else "" ) + + # catching error here in case listeners not established for bootstrap-server auth try: self.kafka_auth.add_user( username=username, password=password, ) - except Exception: - logger.warning("unable to create user just yet") + except ExecError: + logger.warning("unable to create internal user just yet") event.defer() return # non-leader units need cluster_config_changed event to update their super.users - self.charm.peer_relation.data[self.charm.app].update({username: password}) + self.peer_relation.data[self.charm.app].update({username: password}) self.kafka_auth.load_current_acls() @@ -101,7 +99,7 @@ def on_topic_requested(self, event: TopicRequestedEvent): ) # non-leader units need cluster_config_changed event to update their super.users - self.charm.peer_relation.data[self.charm.app].update( + self.peer_relation.data[self.charm.app].update( {"super-users": self.kafka_config.super_users} ) diff --git a/src/tls.py b/src/tls.py index b1e431f9..d5c2285f 100644 --- a/src/tls.py +++ b/src/tls.py @@ -19,7 +19,7 @@ from ops.model import Container, Relation from ops.pebble import ExecError -from literals import TLS_RELATION +from literals import CONF_PATH, TLS_RELATION from utils import generate_password, parse_tls_file, push logger = logging.getLogger(__name__) @@ -242,7 +242,7 @@ def set_server_key(self) -> None: push( container=self.charm.container, content=self.private_key, - path=f"{self.charm.kafka_config.default_config_path}/server.key", + path=f"{CONF_PATH}/server.key", ) def set_ca(self) -> None: @@ -254,7 +254,7 @@ def set_ca(self) -> None: push( container=self.charm.container, content=self.ca, - path=f"{self.charm.kafka_config.default_config_path}/ca.pem", + path=f"{CONF_PATH}/ca.pem", ) def set_certificate(self) -> None: @@ -266,7 +266,7 @@ def set_certificate(self) -> None: push( container=self.charm.container, content=self.certificate, - path=f"{self.charm.kafka_config.default_config_path}/server.pem", + path=f"{CONF_PATH}/server.pem", ) def set_truststore(self) -> None: @@ -287,8 +287,10 @@ def set_truststore(self) -> None: f"{self.truststore_password}", "-noprompt", ], - working_dir=self.charm.kafka_config.default_config_path, + working_dir=CONF_PATH, ).wait_output() + self.container.exec(["chown", "kafka:kafka", f"{CONF_PATH}/truststore.jks"]) + self.container.exec(["chmod", "770", f"{CONF_PATH}/truststore.jks"]) except ExecError as e: # in case this reruns and fails expected_error_string = "alias already exists" @@ -320,8 +322,10 @@ def set_keystore(self) -> None: "-password", f"pass:{self.keystore_password}", ], - working_dir=self.charm.kafka_config.default_config_path, + working_dir=CONF_PATH, ).wait_output() + self.container.exec(["chown", "kafka:kafka", f"{CONF_PATH}/keystore.p12"]) + self.container.exec(["chmod", "770", f"{CONF_PATH}/keystore.p12"]) except ExecError as e: logger.error(str(e.stdout)) raise @@ -338,7 +342,7 @@ def remove_stores(self) -> None: "*.p12", "*.jks", ], - working_dir=self.charm.zookeeper_config.default_config_path, + working_dir=CONF_PATH, ) logger.debug(str(proc.wait_output()[1])) except ExecError as e: diff --git a/src/utils.py b/src/utils.py index 6fd74d8c..cd27d9ab 100644 --- a/src/utils.py +++ b/src/utils.py @@ -20,6 +20,8 @@ from tenacity.stop import stop_after_attempt from tenacity.wait import wait_fixed +from literals import BINARIES_PATH, CONF_PATH, JAVA_HOME + logger = logging.getLogger(__name__) @@ -109,10 +111,10 @@ def run_bin_command( Returns: String of kafka bin command output """ - zk_tls_config_file = zk_tls_config_filepath or "/data/kafka/config/server.properties" - environment = {"KAFKA_OPTS": extra_args} + zk_tls_config_file = zk_tls_config_filepath or f"{CONF_PATH}/server.properties" + environment = {"KAFKA_OPTS": " ".join(extra_args), "JAVA_HOME": JAVA_HOME} command = ( - [f"/opt/kafka/bin/kafka-{bin_keyword}.sh"] + [f"{BINARIES_PATH}/bin/kafka-{bin_keyword}.sh"] + bin_args + [f"--zk-tls-config-file={zk_tls_config_file}"] ) @@ -120,10 +122,9 @@ def run_bin_command( try: process = container.exec(command=command, environment=environment) output, _ = process.wait_output() - logger.debug(f"{output=}") return output except ExecError as e: - logger.debug(f"cmd failed:\ncommand={e.command}\nstdout={e.stdout}\nstderr={e.stderr}") + logger.error(str(e.stderr)) raise e diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index eee9ed1f..e1bb43a8 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -7,14 +7,21 @@ import socket from contextlib import closing from pathlib import Path -from subprocess import PIPE, check_output +from subprocess import PIPE, CalledProcessError, check_output from typing import Any, Dict, List, Optional, Set, Tuple import yaml from pytest_operator.plugin import OpsTest from auth import Acl, KafkaAuth -from literals import REL_NAME, SECURITY_PROTOCOL_PORTS +from literals import ( + BINARIES_PATH, + CONF_PATH, + DATA_PATH, + REL_NAME, + SECURITY_PROTOCOL_PORTS, + STORAGE, +) logger = logging.getLogger(__name__) @@ -28,7 +35,7 @@ def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]: - container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeeper_uri} --list" + container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config={CONF_PATH}/kafka-jaas.cfg {BINARIES_PATH}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeeper_uri} --list" result = check_output( f"JUJU_MODEL={model_full_name} juju ssh --container kafka kafka-k8s/0 '{container_command}'", stderr=PIPE, @@ -41,7 +48,7 @@ def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]: def load_super_users(model_full_name: str) -> List[str]: result = check_output( - f"JUJU_MODEL={model_full_name} juju ssh --container kafka {APP_NAME}/0 'cat /data/kafka/config/server.properties'", + f"JUJU_MODEL={model_full_name} juju ssh --container kafka {APP_NAME}/0 'cat {CONF_PATH}/server.properties'", stderr=PIPE, shell=True, universal_newlines=True, @@ -56,7 +63,7 @@ def load_super_users(model_full_name: str) -> List[str]: def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: - container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" + container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config={CONF_PATH}/kafka-jaas.cfg {BINARIES_PATH}/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" result = check_output( f"JUJU_MODEL={model_full_name} juju ssh --container kafka kafka-k8s/0 '{container_command}'", stderr=PIPE, @@ -69,15 +76,14 @@ def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> str: """Get information related to a user stored on zookeeper.""" - container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg ./opt/kafka/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" - logger.info(f"Container command: {container_command}") + container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config={CONF_PATH}/kafka-jaas.cfg {BINARIES_PATH}/bin/kafka-configs.sh --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}" result = check_output( f"JUJU_MODEL={model_full_name} juju ssh --container kafka kafka-k8s/0 '{container_command}'", stderr=PIPE, shell=True, universal_newlines=True, - ).splitlines() - return result[-1] + ) + return result def show_unit(unit_name: str, model_full_name: str) -> Any: @@ -188,18 +194,21 @@ def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str return zk_relation_data -def check_tls(ip: str, port: int) -> None: - result = check_output( - f"echo | openssl s_client -connect {ip}:{port}", - stderr=PIPE, - shell=True, - universal_newlines=True, - ) - - # FIXME: The server cannot be validated, we would need to try to connect using the CA - # from self-signed certificates. This is indication enough that the server is sending a - # self-signed key. - assert "CN = kafka" in result +def check_tls(ip: str, port: int) -> bool: + try: + result = check_output( + f"echo | openssl s_client -connect {ip}:{port}", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + # FIXME: The server cannot be validated, we would need to try to connect using the CA + # from self-signed certificates. This is indication enough that the server is sending a + # self-signed key. + return "CN = kafka" in result + except CalledProcessError as e: + logger.error(f"command '{e.cmd}' return with error (code {e.returncode}): {e.output}") + return False def get_provider_data( @@ -237,21 +246,19 @@ def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: AssertionError: if logs aren't found for desired topic """ logs = check_output( - f"JUJU_MODEL={model_full_name} juju ssh --container kafka {kafka_unit_name} 'find /var/lib/juju/storage/log-data'", + f"JUJU_MODEL={model_full_name} juju ssh --container kafka {kafka_unit_name} 'find {DATA_PATH}/{STORAGE}'", stderr=PIPE, shell=True, universal_newlines=True, ).splitlines() - logger.debug(f"{logs=}") - passed = False for log in logs: if topic and "index" in log: passed = True break - assert passed, "logs not found" + assert logs and passed, "logs not found" def check_socket(host: str, port: int) -> bool: @@ -266,7 +273,7 @@ async def run_client_properties(ops_test: OpsTest) -> str: + f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}" ) - container_command = f"./opt/kafka/bin/kafka-configs.sh --bootstrap-server {bootstrap_server} --describe --all --command-config /data/kafka/config/client.properties --entity-type users" + container_command = f"KAFKA_OPTS=-Djava.security.auth.login.config={CONF_PATH}/kafka-jaas.cfg {BINARIES_PATH}/bin/kafka-configs.sh --bootstrap-server {bootstrap_server} --describe --all --command-config {CONF_PATH}/client.properties --entity-type users" result = check_output( f"JUJU_MODEL={ops_test.model_full_name} juju ssh --container kafka kafka-k8s/0 '{container_command}'", diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index e65d7284..2cd02470 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -83,14 +83,21 @@ async def test_listeners(ops_test: OpsTest, app_charm): await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[DUMMY_NAME].status == "active" - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME, DUMMY_NAME]) + + await ops_test.model.wait_for_idle( + apps=[APP_NAME, ZK_NAME, DUMMY_NAME], idle_period=30, status="active", timeout=600 + ) + # check that client listener is active assert check_socket(address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT"].client) # remove relation and check that client listerner is not active await ops_test.model.applications[APP_NAME].remove_relation( f"{APP_NAME}:{REL_NAME}", f"{DUMMY_NAME}:{REL_NAME_ADMIN}" ) - await ops_test.model.wait_for_idle(apps=[APP_NAME]) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], idle_period=30, status="active", timeout=600 + ) + assert not check_socket(address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT"].client) @@ -99,7 +106,11 @@ async def test_client_properties_makes_admin_connection(ops_test: OpsTest): await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[DUMMY_NAME].status == "active" - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME, DUMMY_NAME]) + + await ops_test.model.wait_for_idle( + apps=[APP_NAME, ZK_NAME, DUMMY_NAME], idle_period=30, status="active", timeout=600 + ) + result = await run_client_properties(ops_test=ops_test) assert result assert len(result.strip().split("\n")) == 3 diff --git a/tests/integration/test_password_rotation.py b/tests/integration/test_password_rotation.py index 8d94dd04..73cd7c51 100644 --- a/tests/integration/test_password_rotation.py +++ b/tests/integration/test_password_rotation.py @@ -49,7 +49,7 @@ async def test_build_and_deploy(ops_test: OpsTest): async with ops_test.fast_forward(fast_interval="30s"): await ops_test.model.wait_for_idle( - apps=[APP_NAME, ZK_NAME], idle_period=20, status="active", timeout=2000 + apps=[APP_NAME, ZK_NAME], idle_period=60, status="active", timeout=2000 ) assert ops_test.model.applications[APP_NAME].status == "active" diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py index 61dc0b8c..ee7d5b8b 100644 --- a/tests/integration/test_provider.py +++ b/tests/integration/test_provider.py @@ -103,7 +103,7 @@ async def test_deploy_multiple_charms_same_topic_relate_active( """Test relation with multiple applications.""" await ops_test.model.deploy( app_charm, application_name=DUMMY_NAME_2, num_units=1, series="jammy" - ), + ) await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2], timeout=1000, idle_period=30) await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME_2}:{REL_NAME_CONSUMER}") await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_2], timeout=1000, idle_period=30) diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index 7f6ab7e7..e70b3ab2 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -91,14 +91,14 @@ async def test_kafka_tls(ops_test: OpsTest): await ops_test.model.add_relation(APP_NAME, TLS_NAME) logger.info("Relate Kafka to TLS") - async with ops_test.fast_forward(fast_interval="30s"): + async with ops_test.fast_forward(): await ops_test.model.wait_for_idle( - apps=[APP_NAME, ZK_NAME, TLS_NAME], idle_period=20, timeout=2000, status="active" + apps=[APP_NAME, ZK_NAME, TLS_NAME], idle_period=30, timeout=2000, status="active" ) kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME) logger.info("Check for Kafka TLS") - check_tls(ip=kafka_address, port=19093) + assert check_tls(ip=kafka_address, port=19093) # Rotate credentials new_private_key = generate_private_key().decode("utf-8") @@ -142,4 +142,4 @@ async def test_kafka_tls_scaling(ops_test: OpsTest): """ kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=2) - check_tls(ip=kafka_address, port=19093) + assert check_tls(ip=kafka_address, port=19093) diff --git a/tests/unit/test_auth.py b/tests/unit/test_auth.py index 28d4168e..ad7b133e 100644 --- a/tests/unit/test_auth.py +++ b/tests/unit/test_auth.py @@ -112,9 +112,6 @@ def test_get_acls_tls_adds_zk_tls_flag(harness): harness.update_relation_data(peer_rel_id, CHARM_KEY, {"tls": "enabled"}) auth = KafkaAuth( harness.charm, - opts=["mordor"], - zookeeper="server.1:gandalf.the.grey", - container=harness.charm.container, ) with patch("ops.model.Container.exec", return_value=DummyExec()) as patched_exec: @@ -148,9 +145,6 @@ def test_add_user_adds_zk_tls_flag(harness): harness.update_relation_data(peer_rel_id, CHARM_KEY, {"tls": "enabled"}) auth = KafkaAuth( harness.charm, - opts=["mordor"], - zookeeper="server.1:gandalf.the.grey", - container=harness.charm.container, ) with patch("ops.model.Container.exec", return_value=DummyExec()) as patched_exec: @@ -183,9 +177,6 @@ def test_delete_user_adds_zk_tls_flag(harness): harness.update_relation_data(peer_rel_id, CHARM_KEY, {"tls": "enabled"}) auth = KafkaAuth( harness.charm, - opts=["mordor"], - zookeeper="server.1:gandalf.the.grey", - container=harness.charm.container, ) with patch("ops.model.Container.exec", return_value=DummyExec()) as patched_exec: diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7c641b0c..0ee48f5f 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -9,16 +9,13 @@ import pytest import yaml -from ops.model import BlockedStatus, WaitingStatus +from ops.model import BlockedStatus from ops.testing import Harness -from tenacity.wait import wait_none from charm import KafkaK8sCharm from literals import ( - ADMIN_USER, CHARM_KEY, CONTAINER, - INTER_BROKER_USER, PEER, REL_NAME, STORAGE, @@ -56,19 +53,11 @@ def test_opts_in_pebble_layer(harness): assert layer["services"][CONTAINER].get("environment", {}).get("KAFKA_OPTS") -def test_pebble_ready_waits_until_zookeeper_relation(harness): +def test_pebble_ready_blocks_until_zookeeper_relation(harness): """Checks unit goes to WaitingStatus without ZK relation on install hook.""" + harness.add_relation(PEER, CHARM_KEY) harness.container_pebble_ready(CONTAINER) - assert isinstance(harness.charm.unit.status, WaitingStatus) - - -def test_leader_elected_sets_passwords(harness): - """Checks inter-broker passwords are created on leaderelected hook.""" - peer_rel_id = harness.add_relation(PEER, CHARM_KEY) - harness.add_relation_unit(peer_rel_id, f"{CHARM_KEY}/0") - harness.set_leader(True) - - assert harness.charm.app_peer_data.get("sync-password", None) + assert isinstance(harness.charm.unit.status, BlockedStatus) def test_zookeeper_joined_sets_chroot(harness): @@ -122,6 +111,8 @@ def test_pebble_ready_sets_necessary_config(harness): ) with ( + patch("charm.KafkaK8sCharm.ready_to_start", return_value=True), + patch("charm.broker_active", return_value=True), patch("config.KafkaConfig.set_zk_jaas_config") as patched_jaas, patch("config.KafkaConfig.set_server_properties") as patched_server_properties, patch("config.KafkaConfig.set_client_properties") as patched_client_properties, @@ -132,49 +123,6 @@ def test_pebble_ready_sets_necessary_config(harness): patched_client_properties.assert_called_once() -def test_pebble_ready_sets_auth_and_broker_creds_on_leader(harness): - """Checks inter-broker user is created on leader on pebble_ready hook.""" - peer_rel_id = harness.add_relation(PEER, CHARM_KEY) - zk_rel_id = harness.add_relation(ZOOKEEPER_REL_NAME, ZOOKEEPER_REL_NAME) - harness.add_relation_unit(zk_rel_id, "zookeeper/0") - harness.update_relation_data( - zk_rel_id, - ZOOKEEPER_REL_NAME, - { - "username": "relation-1", - "password": "mellon", - "endpoints": "123.123.123", - "chroot": "/kafka", - "uris": "123.123.123/kafka", - "tls": "disabled", - }, - ) - harness.update_relation_data(peer_rel_id, CHARM_KEY, {"sync-password": "mellon"}) - - with ( - patch("auth.KafkaAuth.add_user") as patched_add_user, - patch("config.KafkaConfig.set_zk_jaas_config"), - patch("config.KafkaConfig.set_server_properties"), - patch("config.KafkaConfig.set_client_properties"), - patch("charm.broker_active") as patched_broker_active, - ): - # verify non-leader does not set creds - patched_broker_active.retry.wait = wait_none - harness.container_pebble_ready(CONTAINER) - patched_add_user.assert_not_called() - assert not harness.charm.app_peer_data.get("broker-creds", None) - - # verify leader sets creds - harness.set_leader(True) - harness.container_pebble_ready(CONTAINER) - patched_add_user.assert_called() - - for call in patched_add_user.call_args_list: - assert call.kwargs["username"] in [INTER_BROKER_USER, ADMIN_USER] - - assert harness.charm.app_peer_data.get("broker-creds", None) - - def test_pebble_ready_does_not_start_if_not_ready(harness): """Checks service does not start before ready on pebble_ready hook.""" peer_rel_id = harness.add_relation(PEER, CHARM_KEY) @@ -407,6 +355,9 @@ def test_config_changed_restarts(harness): ), patch("charm.KafkaK8sCharm.ready_to_start", new_callable=PropertyMock, return_value=True), patch("ops.model.Container.pull", return_value=io.StringIO("gandalf=white")), + patch("config.KafkaConfig.set_zk_jaas_config"), + patch("config.KafkaConfig.set_client_properties"), + patch("config.KafkaConfig.set_server_properties"), patch("utils.push", return_value=None), patch("ops.model.Container.restart") as patched_restart, patch("charm.broker_active", return_value=True), diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index ff82d991..dab68d2e 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -9,7 +9,15 @@ from ops.testing import Harness from charm import KafkaK8sCharm -from literals import ADMIN_USER, INTER_BROKER_USER, INTERNAL_USERS, STORAGE +from literals import ( + ADMIN_USER, + BINARIES_PATH, + CONF_PATH, + INTER_BROKER_USER, + INTERNAL_USERS, + JMX_EXPORTER_PORT, + STORAGE, +) ops.testing.SIMULATE_CAN_CONNECT = True @@ -138,13 +146,13 @@ def test_zookeeper_config_succeeds_valid_config(zk_relation_id, harness): def test_auth_args(harness): args = harness.charm.kafka_config.auth_args - assert "-Djava.security.auth.login.config=/data/kafka/config/kafka-jaas.cfg" in args + assert f"-Djava.security.auth.login.config={CONF_PATH}/kafka-jaas.cfg" in args def test_extra_args(harness): args = harness.charm.kafka_config.extra_args assert ( - "-javaagent:/opt/kafka/extra/jmx_prometheus_javaagent.jar=9101:/opt/kafka/default-config/jmx_prometheus.yaml" + f"-javaagent:{BINARIES_PATH}/jmx_prometheus_javaagent.jar={JMX_EXPORTER_PORT}:{CONF_PATH}/jmx_prometheus.yaml" in args ) diff --git a/tests/unit/test_provider.py b/tests/unit/test_provider.py index 85cbb5fd..aaa27b38 100644 --- a/tests/unit/test_provider.py +++ b/tests/unit/test_provider.py @@ -12,7 +12,7 @@ from ops.testing import Harness from charm import KafkaK8sCharm -from literals import CHARM_KEY, PEER, REL_NAME +from literals import CHARM_KEY, CONTAINER, PEER, REL_NAME from .helpers import DummyExec @@ -26,6 +26,7 @@ @pytest.fixture def harness(): harness = Harness(KafkaK8sCharm, meta=METADATA) + harness.set_can_connect(CONTAINER, True) harness.add_relation("restart", CHARM_KEY) harness._update_config( { @@ -79,6 +80,8 @@ def test_client_relation_created_adds_user(harness): new_callable=PropertyMock, return_value={"connect": "yes"}, ), + patch("charm.KafkaK8sCharm.healthy", return_value=True), + patch("ops.model.Container.restart"), ): harness.set_leader(True) client_rel_id = harness.add_relation(REL_NAME, "app") @@ -113,6 +116,8 @@ def test_client_relation_broken_removes_user(harness): new_callable=PropertyMock, return_value={"connect": "yes"}, ), + patch("charm.KafkaK8sCharm.healthy", return_value=True), + patch("ops.model.Container.restart"), ): harness.set_leader(True) client_rel_id = harness.add_relation(REL_NAME, "app") @@ -153,6 +158,8 @@ def test_client_relation_joined_sets_necessary_relation_data(harness): new_callable=PropertyMock, return_value={"connect": "yes"}, ), + patch("charm.KafkaK8sCharm.healthy", return_value=True), + patch("ops.model.Container.restart"), ): harness.set_leader(True) client_rel_id = harness.add_relation(REL_NAME, "app")