diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a2e7d75c..ea222bff 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -71,6 +71,8 @@ jobs: - integration-upgrade - integration-balancer-single - integration-balancer-multi + - integration-kraft-single + - integration-kraft-multi name: ${{ matrix.tox-environments }} needs: - lint diff --git a/actions.yaml b/actions.yaml index 34fe94dd..cd1fc20c 100644 --- a/actions.yaml +++ b/actions.yaml @@ -36,6 +36,9 @@ get-admin-credentials: The returned client_properties can be used for Kafka bin commands using `--bootstrap-server` and `--command-config` for admin level administration This action must be called on the leader unit. +get-listeners: + description: Get all active listeners and their port allocations + rebalance: description: Trigger a rebalance of cluster partitions based on configured goals params: diff --git a/config.yaml b/config.yaml index 69e76fa0..5948b03b 100644 --- a/config.yaml +++ b/config.yaml @@ -5,7 +5,7 @@ options: roles: description: | Comma separated list of the roles assigned to the nodes of this cluster. - This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control). + This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control), 'controller' (KRaft mode). type: string default: broker compression_type: @@ -92,6 +92,10 @@ options: description: Config options to add extra-sans to the ones used when requesting server certificates. The extra-sans are specified by comma-separated names to be added when requesting signed certificates. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1 when requesting the certificate. type: string default: "" + extra_listeners: + description: "Config options to add extra SANs to the ones used when requesting server certificates, and to define custom `advertised.listeners` and ports for clients external to the Juju model. These items are comma-separated. Use '{unit}' as a placeholder to be filled with the unit number if necessary. For port allocations, providing the port for a given listener will offset the generated port number by that amount, with an accepted value range of 20001-50000. For example, a provided value of 'worker-{unit}.domain.com:30000' will generate listeners for unit 0 with name 'worker-0.domain.com', and be allocated ports 39092, 39093 etc for each authentication scheme." + type: string + default: "" log_level: description: "Level of logging for the different components operated by the charm. Possible values: ERROR, WARNING, INFO, DEBUG" type: string diff --git a/metadata.yaml b/metadata.yaml index d86be201..5206e0bc 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -34,7 +34,7 @@ resources: kafka-image: type: oci-image description: OCI Image for Apache Kafka - upstream-source: ghcr.io/canonical/charmed-kafka@sha256:67d2729ca6c4f158682c481a512c37e555bae6edc30e8f0acdb0460fcbeffe88 + upstream-source: ghcr.io/canonical/charmed-kafka@sha256:0f180540572828e3152ab6a39b80891c6dfb2d6abc79914ec19646e6b487b1c8 peers: cluster: diff --git a/src/charm.py b/src/charm.py index 32d10ab4..680c4529 100755 --- a/src/charm.py +++ b/src/charm.py @@ -99,7 +99,10 @@ def _on_roles_changed(self, _): This handler is in charge of stopping the workloads, since the sub-operators would not be instantiated if roles are changed. """ - if not self.state.runs_broker and self.broker.workload.active(): + if ( + not (self.state.runs_broker or self.state.runs_controller) + and self.broker.workload.active() + ): self.broker.workload.stop() if ( diff --git a/src/core/cluster.py b/src/core/cluster.py index ed0bebe6..f10ee7dd 100644 --- a/src/core/cluster.py +++ b/src/core/cluster.py @@ -23,6 +23,7 @@ from lightkube.core.exceptions import ApiError as LightKubeApiError from ops import Object, Relation from ops.model import Unit +from tenacity import retry, retry_if_exception_cause_type, stop_after_attempt, wait_fixed from core.models import ( BrokerCapacities, @@ -37,7 +38,10 @@ ADMIN_USER, BALANCER, BROKER, + CONTROLLER, + CONTROLLER_PORT, INTERNAL_USERS, + KRAFT_NODE_ID_OFFSET, MIN_REPLICAS, OAUTH_REL_NAME, PEER, @@ -86,7 +90,7 @@ class PeerClusterData(ProviderData, RequirerData): """Broker provider data model.""" SECRET_LABEL_MAP = SECRET_LABEL_MAP - SECRET_FIELDS = BALANCER.requested_secrets + SECRET_FIELDS = list(set(BALANCER.requested_secrets) | set(CONTROLLER.requested_secrets)) class ClusterState(Object): @@ -138,45 +142,48 @@ def peer_cluster_relation(self) -> Relation | None: @property def peer_cluster_orchestrator(self) -> PeerCluster: """The state for the related `peer-cluster-orchestrator` application that this charm is requiring from.""" - balancer_kwargs: dict[str, Any] = ( - { - "balancer_username": self.cluster.balancer_username, - "balancer_password": self.cluster.balancer_password, - "balancer_uris": self.cluster.balancer_uris, - } - if self.runs_balancer - else {} - ) + extra_kwargs: dict[str, Any] = {} + + if self.runs_balancer: + extra_kwargs.update( + { + "balancer_username": self.cluster.balancer_username, + "balancer_password": self.cluster.balancer_password, + "balancer_uris": self.cluster.balancer_uris, + } + ) + + if self.runs_controller: + extra_kwargs.update( + { + "controller_quorum_uris": self.cluster.controller_quorum_uris, + } + ) + return PeerCluster( relation=self.peer_cluster_relation, data_interface=PeerClusterData(self.model, PEER_CLUSTER_RELATION), - **balancer_kwargs, + **extra_kwargs, ) @property def peer_cluster(self) -> PeerCluster: - """The state for the related `peer-cluster` application that this charm is providing to.""" - return PeerCluster( - relation=self.peer_cluster_orchestrator_relation, - data_interface=PeerClusterOrchestratorData( - self.model, PEER_CLUSTER_ORCHESTRATOR_RELATION - ), - ) - - @property - def balancer(self) -> PeerCluster: """The state for the `peer-cluster-orchestrator` related balancer application.""" - balancer_kwargs: dict[str, Any] = ( - { - "balancer_username": self.cluster.balancer_username, - "balancer_password": self.cluster.balancer_password, - "balancer_uris": self.cluster.balancer_uris, - } - if self.runs_balancer - else {} - ) + extra_kwargs: dict[str, Any] = {} - if self.runs_broker: # must be providing, initialise with necessary broker data + if self.runs_controller or self.runs_balancer: + extra_kwargs.update( + { + "balancer_username": self.cluster.balancer_username, + "balancer_password": self.cluster.balancer_password, + "balancer_uris": self.cluster.balancer_uris, + "controller_quorum_uris": self.cluster.controller_quorum_uris, + } + ) + + # FIXME: `cluster_manager` check instead of running broker + # must be providing, initialise with necessary broker data + if self.runs_broker: return PeerCluster( relation=self.peer_cluster_orchestrator_relation, # if same app, this will be None and OK data_interface=PeerClusterOrchestratorData( @@ -185,12 +192,13 @@ def balancer(self) -> PeerCluster: broker_username=ADMIN_USER, broker_password=self.cluster.internal_user_credentials.get(ADMIN_USER, ""), broker_uris=self.bootstrap_server, + cluster_uuid=self.cluster.cluster_uuid, racks=self.racks, broker_capacities=self.broker_capacities, zk_username=self.zookeeper.username, zk_password=self.zookeeper.password, zk_uris=self.zookeeper.uris, - **balancer_kwargs, # in case of roles=broker,balancer on this app + **extra_kwargs, # in case of roles=broker,[balancer,controller] on this app ) else: # must be roles=balancer only then, only load with necessary balancer data @@ -346,7 +354,11 @@ def default_auth(self) -> AuthMap: def enabled_auth(self) -> list[AuthMap]: """The currently enabled auth.protocols and their auth.mechanisms, based on related applications.""" enabled_auth = [] - if self.client_relations or self.runs_balancer or self.peer_cluster_orchestrator_relation: + if ( + self.client_relations + or self.runs_balancer + or BALANCER.value in self.peer_cluster_orchestrator.roles + ): enabled_auth.append(self.default_auth) if self.oauth_relation: enabled_auth.append(AuthMap(self.default_auth.protocol, "OAUTHBEARER")) @@ -356,6 +368,12 @@ def enabled_auth(self) -> list[AuthMap]: return enabled_auth @property + @retry( + wait=wait_fixed(5), + stop=stop_after_attempt(3), + retry=retry_if_exception_cause_type(LightKubeApiError), + reraise=True, + ) def bootstrap_servers_external(self) -> str: """Comma-delimited string of `bootstrap-server` for external access.""" return ",".join( @@ -394,6 +412,21 @@ def bootstrap_server(self) -> str: ) ) + @property + def controller_quorum_uris(self) -> str: + """The current controller quorum uris when running KRaft mode.""" + # FIXME: when running broker node.id will be unit-id + 100. If unit is only running + # the controller node.id == unit-id. This way we can keep a human readable mapping of ids. + if self.runs_controller: + node_offset = KRAFT_NODE_ID_OFFSET if self.runs_broker else 0 + return ",".join( + [ + f"{broker.unit_id + node_offset}@{broker.internal_address}:{CONTROLLER_PORT}" + for broker in self.brokers + ] + ) + return "" + @property def log_dirs(self) -> str: """Builds the necessary log.dirs based on mounted storage volumes. @@ -446,7 +479,7 @@ def ready_to_start(self) -> Status: # noqa: C901 if not self.peer_relation: return Status.NO_PEER_RELATION - for status in [self._broker_status, self._balancer_status]: + for status in [self._broker_status, self._balancer_status, self._controller_status]: if status != Status.ACTIVE: return status @@ -461,29 +494,40 @@ def _balancer_status(self) -> Status: if not self.peer_cluster_relation and not self.runs_broker: return Status.NO_PEER_CLUSTER_RELATION - if not self.balancer.broker_connected: + if not self.peer_cluster.broker_connected: return Status.NO_BROKER_DATA - if len(self.balancer.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS: + if len(self.peer_cluster.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS: return Status.NOT_ENOUGH_BROKERS return Status.ACTIVE @property - def _broker_status(self) -> Status: + def _broker_status(self) -> Status: # noqa: C901 """Checks for role=broker specific readiness.""" if not self.runs_broker: return Status.ACTIVE - if not self.zookeeper: - return Status.ZK_NOT_RELATED + # Neither ZooKeeper or KRaft are active + if self.kraft_mode is None: + return Status.MISSING_MODE + + if self.kraft_mode: + if not self.peer_cluster.controller_quorum_uris: # FIXME: peer_cluster or cluster? + return Status.NO_QUORUM_URIS + if not self.cluster.cluster_uuid: + return Status.NO_CLUSTER_UUID - if not self.zookeeper.zookeeper_connected: - return Status.ZK_NO_DATA + if self.kraft_mode == False: # noqa: E712 + if not self.zookeeper: + return Status.ZK_NOT_RELATED - # TLS must be enabled for Kafka and ZK or disabled for both - if self.cluster.tls_enabled ^ self.zookeeper.tls: - return Status.ZK_TLS_MISMATCH + if not self.zookeeper.zookeeper_connected: + return Status.ZK_NO_DATA + + # TLS must be enabled for Kafka and ZK or disabled for both + if self.cluster.tls_enabled ^ self.zookeeper.tls: + return Status.ZK_TLS_MISMATCH if self.cluster.tls_enabled and not self.unit_broker.certificate: return Status.NO_CERT @@ -493,6 +537,37 @@ def _broker_status(self) -> Status: return Status.ACTIVE + @property + def _controller_status(self) -> Status: + """Checks for role=controller specific readiness.""" + if not self.runs_controller: + return Status.ACTIVE + + if not self.peer_cluster_relation and not self.runs_broker: + return Status.NO_PEER_CLUSTER_RELATION + + if not self.peer_cluster.broker_connected_kraft_mode: + return Status.NO_BROKER_DATA + + return Status.ACTIVE + + @property + def kraft_mode(self) -> bool | None: + """Is the deployment running in KRaft mode? + + Returns: + True if Kraft mode, False if ZooKeeper, None when undefined. + """ + # NOTE: self.roles when running colocated, peer_cluster.roles when multiapp + if CONTROLLER.value in (self.roles + self.peer_cluster.roles): + return True + if self.zookeeper_relation: + return False + + # FIXME raise instead of none. `not kraft_mode` is falsy + # NOTE: if previous checks are not met, we don't know yet how the charm is being deployed + return None + @property def runs_balancer(self) -> bool: """Is the charm enabling the balancer?""" @@ -502,3 +577,8 @@ def runs_balancer(self) -> bool: def runs_broker(self) -> bool: """Is the charm enabling the broker(s)?""" return BROKER.value in self.roles + + @property + def runs_controller(self) -> bool: + """Is the charm enabling the controller?""" + return CONTROLLER.value in self.roles diff --git a/src/core/models.py b/src/core/models.py index be8aa834..bd6eacf1 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -93,6 +93,8 @@ def __init__( broker_username: str = "", broker_password: str = "", broker_uris: str = "", + cluster_uuid: str = "", + controller_quorum_uris: str = "", racks: int = 0, broker_capacities: BrokerCapacities = {}, zk_username: str = "", @@ -106,6 +108,8 @@ def __init__( self._broker_username = broker_username self._broker_password = broker_password self._broker_uris = broker_uris + self._cluster_uuid = cluster_uuid + self._controller_quorum_uris = controller_quorum_uris self._racks = racks self._broker_capacities = broker_capacities self._zk_username = zk_username @@ -174,6 +178,38 @@ def broker_uris(self) -> str: fields=BALANCER.requested_secrets, ).get("broker-uris", "") + @property + def controller_quorum_uris(self) -> str: + """The quorum voters in KRaft mode.""" + if self._controller_quorum_uris: + return self._controller_quorum_uris + + if not self.relation or not self.relation.app: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="controller-quorum-uris" + ) + or "" + ) + + @property + def cluster_uuid(self) -> str: + """The cluster uuid used to format storages in KRaft mode.""" + if self._cluster_uuid: + return self._cluster_uuid + + if not self.relation or not self.relation.app: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="cluster-uuid" + ) + or "" + ) + @property def racks(self) -> int: """The number of racks for the brokers.""" @@ -303,6 +339,7 @@ def balancer_uris(self) -> str: @property def broker_connected(self) -> bool: """Checks if there is an active broker relation with all necessary data.""" + # FIXME rename to specify balancer-broker connection if not all( [ self.broker_username, @@ -319,6 +356,14 @@ def broker_connected(self) -> bool: return True + @property + def broker_connected_kraft_mode(self) -> bool: + """Checks for necessary data required by a controller.""" + if not all([self.broker_username, self.broker_password, self.cluster_uuid]): + return False + + return True + class KafkaCluster(RelationState): """State collection metadata for the peer relation.""" @@ -407,6 +452,16 @@ def balancer_uris(self) -> str: """Persisted balancer uris.""" return self.relation_data.get("balancer-uris", "") + @property + def controller_quorum_uris(self) -> str: + """Persisted controller quorum voters.""" + return self.relation_data.get("controller-quorum-uris", "") + + @property + def cluster_uuid(self) -> str: + """Cluster uuid used for initializing storages.""" + return self.relation_data.get("cluster-uuid", "") + class KafkaBroker(RelationState): """State collection metadata for a unit.""" @@ -448,14 +503,6 @@ def internal_address(self) -> str: return addr - @property - def host(self) -> str: - """Return the hostname of a unit.""" - if self.substrate == "vm": - return self.internal_address - else: - return self.node_ip or self.internal_address - # --- TLS --- @property @@ -701,7 +748,7 @@ def zookeeper_version(self) -> str: # retry to give ZK time to update its broker zNodes before failing @retry( wait=wait_fixed(5), - stop=stop_after_attempt(3), + stop=stop_after_attempt(10), retry=retry_if_result(lambda result: result is False), retry_error_callback=lambda _: False, ) diff --git a/src/core/structured_config.py b/src/core/structured_config.py index cd9a9e45..cd794996 100644 --- a/src/core/structured_config.py +++ b/src/core/structured_config.py @@ -10,7 +10,7 @@ from charms.data_platform_libs.v0.data_models import BaseConfigModel from pydantic import Field, validator -from literals import BALANCER, BROKER, SUBSTRATE +from literals import BALANCER, BROKER, CONTROLLER, SUBSTRATE logger = logging.getLogger(__name__) @@ -74,6 +74,7 @@ class CharmConfig(BaseConfigModel): zookeeper_ssl_cipher_suites: str | None profile: str certificate_extra_sans: str | None + extra_listeners: list[str] log_level: str network_bandwidth: int = Field(default=50000, validate_default=False, gt=0) cruisecontrol_balance_threshold: float = Field(default=1.1, validate_default=False, ge=1) @@ -264,7 +265,49 @@ def roles_values(cls, value: str) -> str: """Check roles values.""" roles = set(map(str.strip, value.split(","))) - if unknown_roles := roles - {BROKER.value, BALANCER.value}: + if unknown_roles := roles - {BROKER.value, BALANCER.value, CONTROLLER.value}: raise ValueError("Unknown role(s):", unknown_roles) return ",".join(sorted(roles)) # this has to be a string as it goes in to properties + + @validator("certificate_extra_sans") + @classmethod + def certificate_extra_sans_values(cls, value: str) -> list[str]: + """Formats certificate_extra_sans values to a list.""" + return value.split(",") if value else [] + + @validator("extra_listeners", pre=True) + @classmethod + def extra_listeners_port_values(cls, value: str) -> list[str]: + """Check extra_listeners port values for each listener, and format values to a list.""" + if not value: + return [] + + listeners = value.split(",") + + ports = [] + for listener in listeners: + if ":" not in listener or not listener.split(":")[1].isdigit(): + raise ValueError("Value for listener does not contain a valid port.") + + port = int(listener.split(":")[1]) + if not 20000 < port < 50000: + raise ValueError( + "Value for port out of accepted values. Accepted values for port are greater than 20000 and less than 50000" + ) + + ports.append(port) + + current_port = 0 + for port in ports: + if not current_port - 100 < int(port) > current_port + 100: + raise ValueError( + "Value for port is too close to other value for port. Accepted values must be at least 100 apart from each other" + ) + + current_port = int(port) + + if len(ports) != len(set(ports)): + raise ValueError("Value for port is not unique for each listener.") + + return listeners diff --git a/src/events/password_actions.py b/src/events/actions.py similarity index 76% rename from src/events/password_actions.py rename to src/events/actions.py index 2049d408..9e360a0a 100644 --- a/src/events/password_actions.py +++ b/src/events/actions.py @@ -2,7 +2,8 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Event handlers for password-related Juju Actions.""" +"""Event handlers for Juju Actions.""" + import logging from typing import TYPE_CHECKING @@ -18,11 +19,11 @@ logger = logging.getLogger(__name__) -class PasswordActionEvents(Object): - """Event handlers for password-related Juju Actions.""" +class ActionEvents(Object): + """Event handlers for Juju Actions.""" def __init__(self, dependent: "BrokerOperator") -> None: - super().__init__(dependent, "password_events") + super().__init__(dependent, "action_events") self.dependent = dependent self.charm: "KafkaCharm" = dependent.charm @@ -33,6 +34,31 @@ def __init__(self, dependent: "BrokerOperator") -> None: getattr(self.charm.on, "get_admin_credentials_action"), self._get_admin_credentials_action, ) + self.framework.observe( + getattr(self.charm.on, "get_listeners_action"), self._get_listeners_action + ) + + def _get_listeners_action(self, event: ActionEvent) -> None: + """Handler for get-listeners action.""" + listeners = self.dependent.config_manager.all_listeners + + result = {} + for listener in listeners: + key = listener.name.replace("_", "-").lower() + result.update( + { + key: { + "name": listener.name, + "scope": listener.scope, + "port": listener.port, + "protocol": listener.protocol, + "auth-mechanism": listener.mechanism, + "advertised-listener": listener.advertised_listener, + } + } + ) + + event.set_results(result) def _set_password_action(self, event: ActionEvent) -> None: """Handler for set-password action. diff --git a/src/events/balancer.py b/src/events/balancer.py index e4efc661..d46c8832 100644 --- a/src/events/balancer.py +++ b/src/events/balancer.py @@ -111,10 +111,11 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: payload = { "balancer-username": BALANCER_WEBSERVER_USER, "balancer-password": self.charm.workload.generate_password(), - "balancer-uris": f"{self.charm.state.unit_broker.host}:{BALANCER_WEBSERVER_PORT}", + "balancer-uris": f"{self.charm.state.unit_broker.internal_address}:{BALANCER_WEBSERVER_PORT}", } # Update relation data intra & extra cluster (if it exists) self.charm.state.cluster.update(payload) + if self.charm.state.peer_cluster_orchestrator: self.charm.state.peer_cluster_orchestrator.update(payload) @@ -169,7 +170,7 @@ def _on_config_changed(self, _: EventBase) -> None: content_changed = True # On k8s, adding/removing a broker does not change the bootstrap server property if exposed by nodeport - broker_capacities = self.charm.state.balancer.broker_capacities + broker_capacities = self.charm.state.peer_cluster.broker_capacities if ( file_content := json.loads( "".join(self.workload.read(self.workload.paths.capacity_jbod_json)) @@ -200,8 +201,8 @@ def rebalance(self, event: ActionEvent) -> None: available_brokers = [broker.unit_id for broker in self.charm.state.brokers] else: brokers = ( - [broker.name for broker in self.charm.state.balancer.relation.units] - if self.charm.state.balancer.relation + [broker.name for broker in self.charm.state.peer_cluster.relation.units] + if self.charm.state.peer_cluster.relation else [] ) available_brokers = [int(broker.split("/")[1]) for broker in brokers] diff --git a/src/events/broker.py b/src/events/broker.py index 0d0065ba..ebd5de77 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -23,8 +23,8 @@ UpdateStatusEvent, ) +from events.actions import ActionEvents from events.oauth import OAuthHandler -from events.password_actions import PasswordActionEvents from events.provider import KafkaProvider from events.upgrade import KafkaDependencyModel, KafkaUpgrade from events.zookeeper import ZooKeeperHandler @@ -32,12 +32,13 @@ from literals import ( BROKER, CONTAINER, + CONTROLLER, DEPENDENCIES, GROUP, + INTERNAL_USERS, PEER, PROFILE_TESTING, REL_NAME, - STORAGE, USER, Status, ) @@ -75,7 +76,7 @@ def __init__(self, charm) -> None: ) # Fast exit after workload instantiation, but before any event observer - if BROKER.value not in self.charm.config.roles: + if not any(role in self.charm.config.roles for role in [BROKER.value, CONTROLLER.value]): return self.health = KafkaHealth(self) if self.charm.substrate == "vm" else None @@ -86,8 +87,11 @@ def __init__(self, charm) -> None: **DEPENDENCIES # pyright: ignore[reportArgumentType] ), ) - self.password_action_events = PasswordActionEvents(self) - self.zookeeper = ZooKeeperHandler(self) + self.action_events = ActionEvents(self) + + if not self.charm.state.runs_controller: + self.zookeeper = ZooKeeperHandler(self) + self.provider = KafkaProvider(self) self.oauth = OAuthHandler(self) @@ -148,7 +152,7 @@ def _on_install(self, event: InstallEvent) -> None: f"{TESTING_OPTIONS}" ) - def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: + def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 """Handler for `start` or `pebble-ready` events.""" if not self.workload.container_can_connect: event.defer() @@ -163,14 +167,22 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: if not self.upgrade.idle: return - self.update_external_services() + if self.charm.state.kraft_mode: + self._init_kraft_mode() + # FIXME ready to start probably needs to account for credentials being created beforehand current_status = self.charm.state.ready_to_start if current_status is not Status.ACTIVE: self.charm._set_status(current_status) event.defer() return + if self.charm.state.kraft_mode: + self.config_manager.set_server_properties() + self._format_storages() + + self.update_external_services() + # required settings given zookeeper connection config has been created self.config_manager.set_server_properties() self.config_manager.set_zk_jaas_config() @@ -193,6 +205,20 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: self.workload.start() logger.info("Kafka service started") + # TODO: Update users. Not sure if this is the best place, as cluster might be still + # stabilizing. + # if self.charm.state.kraft_mode and self.charm.state.runs_broker: + # for username, password in self.charm.state.cluster.internal_user_credentials.items(): + # try: + # self.auth_manager.add_user( + # username=username, password=password, zk_auth=False, internal=True, + # ) + # except subprocess.CalledProcessError: + # logger.warning("Error adding users, cluster might not be ready yet") + # logger.error(f"\n\tOn start:\nAdding user {username} failed. Let the rest of the hook run\n") + # # event.defer() + # continue + # service_start might fail silently, confirm with ZK if kafka is actually connected self.charm.on.update_status.emit() @@ -287,7 +313,7 @@ def _on_config_changed(self, event: EventBase) -> None: if self.model.relations.get(REL_NAME, None) and self.charm.unit.is_leader(): self.update_client_data() - if self.charm.state.peer_cluster_relation and self.charm.unit.is_leader(): + if self.charm.state.peer_cluster_orchestrator_relation and self.charm.unit.is_leader(): self.update_peer_cluster_data() def _on_update_status(self, _: UpdateStatusEvent) -> None: @@ -295,9 +321,10 @@ def _on_update_status(self, _: UpdateStatusEvent) -> None: if not self.upgrade.idle or not self.healthy: return - if not self.charm.state.zookeeper.broker_active(): - self.charm._set_status(Status.ZK_NOT_CONNECTED) - return + if not self.charm.state.kraft_mode: + if not self.charm.state.zookeeper.broker_active(): + self.charm._set_status(Status.ZK_NOT_CONNECTED) + return # NOTE for situations like IP change and late integration with rack-awareness charm. # If properties have changed, the broker will restart. @@ -333,14 +360,13 @@ def _on_storage_attached(self, event: StorageAttachedEvent) -> None: self.charm.state.unit_broker.update({"storages": self.balancer_manager.storages}) - if self.charm.substrate == "vm": + # FIXME: if KRaft, don't execute + if self.charm.substrate == "vm" and not self.charm.state.kraft_mode: # new dirs won't be used until topic partitions are assigned to it # either automatically for new topics, or manually for existing # set status only for running services, not on startup + # FIXME re-add this self.workload.exec(["chmod", "-R", "750", f"{self.workload.paths.data_path}"]) - self.workload.exec( - ["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"] - ) self.workload.exec( [ "bash", @@ -349,13 +375,12 @@ def _on_storage_attached(self, event: StorageAttachedEvent) -> None: ] ) - if self.charm.substrate == "k8s": - self.workload.exec( - ["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}/{STORAGE}"] - ) - self.workload.exec( - ["rm", "-rf", f"{self.workload.paths.data_path}/{STORAGE}/lost+found"] - ) + # all mounted data dirs should have correct ownership + self.workload.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"]) + + # run this regardless of role, needed for cloud storages + ceph + for storage in self.charm.state.log_dirs.split(","): + self.workload.exec(["rm", "-rf", f"{storage}/lost+found"]) # checks first whether the broker is active before warning if self.workload.active(): @@ -396,6 +421,51 @@ def healthy(self) -> bool: return True + def _init_kraft_mode(self) -> None: + """Initialize the server when running controller mode.""" + # NOTE: checks for `runs_broker` in this method should be `is_cluster_manager` in + # the large deployment feature. + if not self.model.unit.is_leader(): + return + + if not self.charm.state.cluster.internal_user_credentials and self.charm.state.runs_broker: + credentials = [ + (username, self.charm.workload.generate_password()) for username in INTERNAL_USERS + ] + for username, password in credentials: + self.charm.state.cluster.update({f"{username}-password": password}) + + # cluster-uuid is only created on the broker (`cluster-manager` in large deployments) + if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker: + uuid = self.workload.run_bin_command( + bin_keyword="storage", bin_args=["random-uuid"] + ).strip() + + self.charm.state.cluster.update({"cluster-uuid": uuid}) + self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) + + # Controller is tasked with populating quorum uris + if self.charm.state.runs_controller: + quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} + self.charm.state.cluster.update(quorum_uris) + + if self.charm.state.peer_cluster_orchestrator: + self.charm.state.peer_cluster_orchestrator.update(quorum_uris) + + def _format_storages(self) -> None: + """Format storages provided relevant keys exist.""" + if self.charm.state.runs_broker: + credentials = self.charm.state.cluster.internal_user_credentials + elif self.charm.state.runs_controller: + credentials = { + self.charm.state.peer_cluster.broker_username: self.charm.state.peer_cluster.broker_password + } + + self.workload.format_storages( + uuid=self.charm.state.peer_cluster.cluster_uuid, + internal_user_credentials=credentials, + ) + def update_external_services(self) -> None: """Attempts to update any external Kubernetes services.""" if not self.charm.substrate == "k8s": @@ -441,17 +511,18 @@ def update_peer_cluster_data(self) -> None: if not self.charm.unit.is_leader() or not self.healthy: return - self.charm.state.balancer.update( + self.charm.state.peer_cluster.update( { "roles": self.charm.state.roles, - "broker-username": self.charm.state.balancer.broker_username, - "broker-password": self.charm.state.balancer.broker_password, - "broker-uris": self.charm.state.balancer.broker_uris, - "racks": str(self.charm.state.balancer.racks), - "broker-capacities": json.dumps(self.charm.state.balancer.broker_capacities), - "zk-uris": self.charm.state.balancer.zk_uris, - "zk-username": self.charm.state.balancer.zk_username, - "zk-password": self.charm.state.balancer.zk_password, + "broker-username": self.charm.state.peer_cluster.broker_username, + "broker-password": self.charm.state.peer_cluster.broker_password, + "broker-uris": self.charm.state.peer_cluster.broker_uris, + "cluster-uuid": self.charm.state.peer_cluster.cluster_uuid, + "racks": str(self.charm.state.peer_cluster.racks), + "broker-capacities": json.dumps(self.charm.state.peer_cluster.broker_capacities), + "zk-uris": self.charm.state.peer_cluster.zk_uris, + "zk-username": self.charm.state.peer_cluster.zk_username, + "zk-password": self.charm.state.peer_cluster.zk_password, } ) diff --git a/src/events/peer_cluster.py b/src/events/peer_cluster.py index 78bd9e22..1b9e066c 100644 --- a/src/events/peer_cluster.py +++ b/src/events/peer_cluster.py @@ -16,13 +16,20 @@ diff, set_encoded_field, ) -from ops.charm import RelationChangedEvent, RelationCreatedEvent, RelationEvent, SecretChangedEvent +from ops.charm import ( + RelationBrokenEvent, + RelationChangedEvent, + RelationCreatedEvent, + RelationEvent, + SecretChangedEvent, +) from ops.framework import Object from core.cluster import custom_secret_groups from literals import ( BALANCER, BROKER, + CONTROLLER, PEER_CLUSTER_ORCHESTRATOR_RELATION, PEER_CLUSTER_RELATION, ) @@ -72,18 +79,20 @@ def _on_peer_cluster_created(self, event: RelationCreatedEvent) -> None: if not self.charm.unit.is_leader() or not event.relation.app: return - requested_secrets = ( - BALANCER.requested_secrets - if self.charm.state.runs_balancer - else BROKER.requested_secrets - ) or [] + requested_secrets = set() + if self.charm.state.runs_balancer: + requested_secrets |= set(BALANCER.requested_secrets) + if self.charm.state.runs_controller: + requested_secrets |= set(CONTROLLER.requested_secrets) + if self.charm.state.runs_broker: + requested_secrets |= set(BROKER.requested_secrets) # request secrets for the relation set_encoded_field( event.relation, self.charm.state.cluster.app, REQ_SECRET_FIELDS, - requested_secrets, + list(requested_secrets), ) # explicitly update the roles early, as we can't determine which model to instantiate @@ -92,21 +101,19 @@ def _on_peer_cluster_created(self, event: RelationCreatedEvent) -> None: def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None: """Generic handler for peer-cluster `relation-changed` events.""" - if ( - not self.charm.unit.is_leader() - or not self.charm.state.runs_balancer # only balancer need to handle this event - or not self.charm.state.balancer.roles # ensures secrets have set-up before writing - ): + # ensures secrets have set-up before writing + if not self.charm.unit.is_leader() or not self.charm.state.peer_cluster.roles: return self._default_relation_changed(event) # will no-op if relation does not exist - self.charm.state.balancer.update( + self.charm.state.peer_cluster.update( { - "balancer-username": self.charm.state.balancer.balancer_username, - "balancer-password": self.charm.state.balancer.balancer_password, - "balancer-uris": self.charm.state.balancer.balancer_uris, + "balancer-username": self.charm.state.peer_cluster.balancer_username, + "balancer-password": self.charm.state.peer_cluster.balancer_password, + "balancer-uris": self.charm.state.peer_cluster.balancer_uris, + "controller-quorum-uris": self.charm.state.peer_cluster.controller_quorum_uris, } ) @@ -114,33 +121,65 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None: def _on_peer_cluster_orchestrator_changed(self, event: RelationChangedEvent) -> None: """Generic handler for peer-cluster-orchestrator `relation-changed` events.""" + # TODO: `cluster_manager` check instead of runs_broker if ( not self.charm.unit.is_leader() or not self.charm.state.runs_broker # only broker needs handle this event - or "balancer" - not in self.charm.state.balancer.roles # ensures secret have set-up before writing, and only writing to balancers + or not any( + role in self.charm.state.peer_cluster.roles + for role in [BALANCER.value, CONTROLLER.value] + ) # ensures secrets have set-up before writing, and only writing to controller,balancers ): return self._default_relation_changed(event) # will no-op if relation does not exist - self.charm.state.balancer.update( + self.charm.state.peer_cluster.update( { "roles": self.charm.state.roles, - "broker-username": self.charm.state.balancer.broker_username, - "broker-password": self.charm.state.balancer.broker_password, - "broker-uris": self.charm.state.balancer.broker_uris, - "racks": str(self.charm.state.balancer.racks), - "broker-capacities": json.dumps(self.charm.state.balancer.broker_capacities), - "zk-uris": self.charm.state.balancer.zk_uris, - "zk-username": self.charm.state.balancer.zk_username, - "zk-password": self.charm.state.balancer.zk_password, + "broker-username": self.charm.state.peer_cluster.broker_username, + "broker-password": self.charm.state.peer_cluster.broker_password, + "broker-uris": self.charm.state.peer_cluster.broker_uris, + "cluster-uuid": self.charm.state.peer_cluster.cluster_uuid, + "racks": str(self.charm.state.peer_cluster.racks), + "broker-capacities": json.dumps(self.charm.state.peer_cluster.broker_capacities), + "zk-uris": self.charm.state.peer_cluster.zk_uris, + "zk-username": self.charm.state.peer_cluster.zk_username, + "zk-password": self.charm.state.peer_cluster.zk_password, } ) self.charm.on.config_changed.emit() # ensure both broker+balancer get a changed event + def _on_peer_cluster_broken(self, _: RelationBrokenEvent): + """Handle the required logic to remove.""" + if self.charm.state.kraft_mode is not None: + return + + self.charm.workload.stop() + logger.info(f'Service {self.model.unit.name.split("/")[1]} stopped') + + # FIXME: probably a mix between cluster_manager and broker + if self.charm.state.runs_broker: + # Kafka keeps a meta.properties in every log.dir with a unique ClusterID + # this ID is provided by ZK, and removing it on relation-broken allows + # re-joining to another ZK cluster. + for storage in self.charm.model.storages["data"]: + self.charm.workload.exec( + [ + "rm", + f"{storage.location}/meta.properties", + f"{storage.location}/__cluster_metadata-0/quorum-state", + ] + ) + + if self.charm.unit.is_leader(): + # other charm methods assume credentials == ACLs + # necessary to clean-up credentials once ZK relation is lost + for username in self.charm.state.cluster.internal_user_credentials: + self.charm.state.cluster.update({f"{username}-password": ""}) + def _default_relation_changed(self, event: RelationChangedEvent): """Implements required logic from multiple 'handled' events from the `data-interfaces` library.""" if not isinstance(event, RelationEvent) or not event.relation or not event.relation.app: diff --git a/src/events/tls.py b/src/events/tls.py index abedc4bd..9970b5e3 100644 --- a/src/events/tls.py +++ b/src/events/tls.py @@ -9,6 +9,7 @@ import logging import os import re +import warnings from typing import TYPE_CHECKING from charms.tls_certificates_interface.v1.tls_certificates import ( @@ -151,7 +152,9 @@ def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None: relation_id=event.relation.id, ) subject = ( - os.uname()[1] if self.charm.substrate == "k8s" else self.charm.state.unit_broker.host + os.uname()[1] + if self.charm.substrate == "k8s" + else self.charm.state.unit_broker.internal_address ) sans = self.charm.broker.tls_manager.build_sans() csr = ( @@ -294,6 +297,13 @@ def _request_certificate(self): sans = self.charm.broker.tls_manager.build_sans() + # only warn during certificate creation, not every event if in structured_config + if self.charm.config.certificate_extra_sans: + warnings.warn( + "'certificate_extra_sans' config option is deprecated, use 'extra_listeners' instead", + DeprecationWarning, + ) + csr = generate_csr( private_key=self.charm.state.unit_broker.private_key.encode("utf-8"), subject=self.charm.state.unit_broker.relation_data.get("private-address", ""), diff --git a/src/literals.py b/src/literals.py index d6cae208..387bf8dd 100644 --- a/src/literals.py +++ b/src/literals.py @@ -66,11 +66,12 @@ class Ports: client: int internal: int external: int + extra: int = 0 AuthProtocol = Literal["SASL_PLAINTEXT", "SASL_SSL", "SSL"] AuthMechanism = Literal["SCRAM-SHA-512", "OAUTHBEARER", "SSL"] -Scope = Literal["INTERNAL", "CLIENT", "EXTERNAL"] +Scope = Literal["INTERNAL", "CLIENT", "EXTERNAL", "EXTRA"] AuthMap = NamedTuple("AuthMap", protocol=AuthProtocol, mechanism=AuthMechanism) SECURITY_PROTOCOL_PORTS: dict[AuthMap, Ports] = { @@ -80,6 +81,13 @@ class Ports: AuthMap("SASL_PLAINTEXT", "OAUTHBEARER"): Ports(9095, 19095, 29095), AuthMap("SASL_SSL", "OAUTHBEARER"): Ports(9096, 19096, 29096), } +# FIXME this port should exist on the previous abstraction +CONTROLLER_PORT = 9097 +CONTROLLER_LISTENER_NAME = "INTERNAL_CONTROLLER" + +# FIXME: when running broker node.id will be unit-id + 100. If unit is only running +# the controller node.id == unit-id. This way we can keep a human readable mapping of ids. +KRAFT_NODE_ID_OFFSET = 100 DebugLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"] DatabagScope = Literal["unit", "app"] @@ -117,7 +125,7 @@ class Role: service: str paths: dict[str, str] relation: str - requested_secrets: list[str] | None = None + requested_secrets: list[str] def __eq__(self, value: object, /) -> bool: """Provide an easy comparison to the configuration key.""" @@ -135,9 +143,19 @@ def __eq__(self, value: object, /) -> bool: "balancer-uris", ], ) +CONTROLLER = Role( + value="controller", + service="daemon", + paths=PATHS["kafka"], + relation=PEER_CLUSTER_RELATION, + requested_secrets=[ + "broker-username", + "broker-password", + ], +) BALANCER = Role( value="balancer", - service="balancer", + service="cruise-control", paths=PATHS["cruise-control"], relation=PEER_CLUSTER_RELATION, requested_secrets=[ @@ -175,10 +193,7 @@ def __eq__(self, value: object, /) -> bool: "CpuCapacity", "ReplicaDistribution", ] - -BALANCER_GOALS_TESTING = [ - "ReplicaDistribution", -] +BALANCER_GOALS_TESTING = ["ReplicaDistribution"] MODE_FULL = "full" @@ -208,6 +223,9 @@ class Status(Enum): BROKER_NOT_RUNNING = StatusLevel(BlockedStatus("Broker not running"), "WARNING") NOT_ALL_RELATED = StatusLevel(MaintenanceStatus("not all units related"), "DEBUG") CC_NOT_RUNNING = StatusLevel(BlockedStatus("Cruise Control not running"), "WARNING") + MISSING_MODE = StatusLevel(BlockedStatus("Application needs ZooKeeper or KRaft mode"), "DEBUG") + NO_CLUSTER_UUID = StatusLevel(WaitingStatus("Waiting for cluster uuid"), "DEBUG") + NO_QUORUM_URIS = StatusLevel(WaitingStatus("Waiting for quorum uris"), "DEBUG") ZK_NOT_RELATED = StatusLevel(BlockedStatus("missing required zookeeper relation"), "DEBUG") ZK_NOT_CONNECTED = StatusLevel(BlockedStatus("unit not connected to zookeeper"), "ERROR") ZK_TLS_MISMATCH = StatusLevel( diff --git a/src/managers/auth.py b/src/managers/auth.py index c26e131e..99890139 100644 --- a/src/managers/auth.py +++ b/src/managers/auth.py @@ -13,6 +13,7 @@ from core.cluster import ClusterState from core.workload import WorkloadBase +from literals import SECURITY_PROTOCOL_PORTS logger = logging.getLogger(__name__) @@ -136,7 +137,9 @@ def _generate_consumer_acls(topic: str, username: str, group: str | None = None) return consumer_acls - def add_user(self, username: str, password: str, zk_auth: bool = False) -> None: + def add_user( + self, username: str, password: str, zk_auth: bool = False, internal: bool = False + ) -> None: """Adds new user credentials to ZooKeeper. Args: @@ -144,6 +147,7 @@ def add_user(self, username: str, password: str, zk_auth: bool = False) -> None: password: the user password zk_auth: flag to specify adding users using ZooKeeper authorizer For use before cluster start + internal: flag to use internal ports or client ones Raises: `(subprocess.CalledProcessError | ops.pebble.ExecError)`: if the error returned a non-zero exit code @@ -164,8 +168,13 @@ def add_user(self, username: str, password: str, zk_auth: bool = False) -> None: ] opts = [self.kafka_opts] else: + bootstrap_server = ( + f"{self.state.unit_broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.state.default_auth].internal}" + if internal + else self.state.bootstrap_server + ) command = base_command + [ - f"--bootstrap-server={self.state.bootstrap_server}", + f"--bootstrap-server={bootstrap_server}", f"--command-config={self.workload.paths.client_properties}", ] opts = [] @@ -229,12 +238,20 @@ def add_acl( ] if resource_type == "TOPIC": - command += [f"--topic={resource_name}"] + if len(resource_name) > 3 and resource_name.endswith("*"): + pattern = "PREFIXED" + resource_name = resource_name[:-1] + else: + pattern = "LITERAL" + + command += [f"--topic={resource_name}", f"--resource-pattern-type={pattern}"] + if resource_type == "GROUP": command += [ f"--group={resource_name}", "--resource-pattern-type=PREFIXED", ] + logger.info(f"CREATE ACL - {command}") self.workload.run_bin_command(bin_keyword="acls", bin_args=command, opts=[self.log4j_opts]) def remove_acl( @@ -263,7 +280,14 @@ def remove_acl( ] if resource_type == "TOPIC": - command += [f"--topic={resource_name}"] + if len(resource_name) > 3 and resource_name.endswith("*"): + pattern = "PREFIXED" + resource_name = resource_name[:-1] + else: + pattern = "LITERAL" + + command += [f"--topic={resource_name}", f"--resource-pattern-type={pattern}"] + if resource_type == "GROUP": command += [ f"--group={resource_name}", diff --git a/src/managers/balancer.py b/src/managers/balancer.py index 870e6e18..8104518b 100644 --- a/src/managers/balancer.py +++ b/src/managers/balancer.py @@ -137,8 +137,8 @@ def __init__(self, dependent: "BrokerOperator | BalancerOperator") -> None: def cruise_control(self) -> CruiseControlClient: """Client for the CruiseControl REST API.""" return CruiseControlClient( - username=self.charm.state.balancer.balancer_username, - password=self.charm.state.balancer.balancer_password, + username=self.charm.state.peer_cluster.balancer_username, + password=self.charm.state.peer_cluster.balancer_password, ) @property @@ -160,7 +160,7 @@ def storages(self) -> str: def create_internal_topics(self) -> None: """Create Cruise Control topics.""" - bootstrap_servers = self.charm.state.balancer.broker_uris + bootstrap_servers = self.charm.state.peer_cluster.broker_uris property_file = f'{BALANCER.paths["CONF"]}/cruisecontrol.properties' for topic in BALANCER_TOPICS: diff --git a/src/managers/config.py b/src/managers/config.py index 5b53b4dd..63f86c2d 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -24,6 +24,8 @@ BALANCER, BALANCER_GOALS_TESTING, BROKER, + CONTROLLER_LISTENER_NAME, + CONTROLLER_PORT, DEFAULT_BALANCER_GOALS, HARD_BALANCER_GOALS, INTER_BROKER_USER, @@ -31,6 +33,7 @@ JMX_EXPORTER_PORT, JVM_MEM_MAX_GB, JVM_MEM_MIN_GB, + KRAFT_NODE_ID_OFFSET, PROFILE_TESTING, SECURITY_PROTOCOL_PORTS, AuthMap, @@ -41,7 +44,6 @@ DEFAULT_CONFIG_OPTIONS = """ sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 -authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false auto.create.topics.enable=false """ @@ -70,7 +72,14 @@ num.partition.metrics.windows=3 num.broker.metrics.windows=10 """ -SERVER_PROPERTIES_BLACKLIST = ["profile", "log_level", "certificate_extra_sans"] +SERVER_PROPERTIES_BLACKLIST = [ + "profile", + "log_level", + "certificate_extra_sans", + "extra_listeners", + "roles", + "expose_external", +] class Listener: @@ -78,19 +87,28 @@ class Listener: Args: auth_map: AuthMap representing the auth.protocol and auth.mechanism for the listener - scope: scope of the listener, CLIENT, INTERNAL or EXTERNAL + scope: scope of the listener, CLIENT, INTERNAL, EXTERNAL or EXTRA host: string with the host that will be announced + baseport (optional): integer port to offset CLIENT port numbers for EXTRA listeners node_port (optional): the node-port for the listener if scope=EXTERNAL """ def __init__( - self, auth_map: AuthMap, scope: Scope, host: str = "", node_port: int | None = None + self, + auth_map: AuthMap, + scope: Scope, + host: str = "", + baseport: int = 30000, + extra_count: int = -1, + node_port: int | None = None, ): self.auth_map = auth_map self.protocol = auth_map.protocol self.mechanism = auth_map.mechanism self.host = host self.scope = scope + self.baseport = baseport + self.extra_count = extra_count self.node_port = node_port @property @@ -101,8 +119,8 @@ def scope(self) -> Scope: @scope.setter def scope(self, value): """Internal scope validator.""" - if value not in ["CLIENT", "INTERNAL", "EXTERNAL"]: - raise ValueError("Only CLIENT, INTERNAL and EXTERNAL scopes are accepted") + if value not in ["CLIENT", "INTERNAL", "EXTERNAL", "EXTRA"]: + raise ValueError("Only CLIENT, INTERNAL, EXTERNAL and EXTRA scopes are accepted") self._scope = value @@ -113,12 +131,18 @@ def port(self) -> int: Returns: Integer of port number """ + # generates ports 39092, 39192, 39292 etc for listener auth if baseport=30000 + if self.scope == "EXTRA": + return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], "client") + self.baseport + return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], self.scope.lower()) @property def name(self) -> str: """Name of the listener.""" - return f"{self.scope}_{self.protocol}_{self.mechanism.replace('-', '_')}" + return f"{self.scope}_{self.protocol}_{self.mechanism.replace('-', '_')}" + ( + f"_{self.extra_count}" if self.extra_count >= 0 else "" + ) @property def protocol_map(self) -> str: @@ -273,9 +297,11 @@ def __init__( @property @override def kafka_opts(self) -> str: - opts = [ - f"-Djava.security.auth.login.config={self.workload.paths.zk_jaas}", - ] + opts = [] + if not self.state.runs_controller: + opts = [ + f"-Djava.security.auth.login.config={self.workload.paths.zk_jaas}", + ] http_proxy = os.environ.get("JUJU_CHARM_HTTP_PROXY") https_proxy = os.environ.get("JUJU_CHARM_HTTPS_PROXY") @@ -317,6 +343,9 @@ def auth_properties(self) -> list[str]: Returns: List of properties to be set """ + if self.state.kraft_mode: + return [] + return [ f"broker.id={self.state.unit_broker.unit_id}", f"zookeeper.connect={self.state.zookeeper.connect}", @@ -370,7 +399,7 @@ def scram_properties(self) -> list[str]: f'listener.name.{listener_name}.{listener_mechanism}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";', f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}", ] - for auth in self.client_listeners + self.external_listeners: + for auth in self.client_listeners + self.external_listeners + self.extra_listeners: if not auth.mechanism.startswith("SCRAM"): continue @@ -445,8 +474,39 @@ def internal_listener(self) -> Listener: ) @property - def client_listeners(self) -> list[Listener]: + def controller_listener(self) -> None: + """Return the controller listener.""" + pass # TODO: No good abstraction in place for the controller use case + + @property + def extra_listeners(self) -> list[Listener]: """Return a list of extra listeners.""" + extra_host_baseports = [ + tuple(listener.split(":")) for listener in self.config.extra_listeners + ] + + extra_listeners = [] + extra_count = 0 + for host, baseport in extra_host_baseports: + for auth_map in self.state.enabled_auth: + host = host.replace("{unit}", str(self.state.unit_broker.unit_id)) + extra_listeners.append( + Listener( + host=host, + auth_map=auth_map, + scope="EXTRA", + baseport=int(baseport), + extra_count=extra_count, + ) + ) + + extra_count += 1 + + return extra_listeners + + @property + def client_listeners(self) -> list[Listener]: + """Return a list of client listeners.""" return [ Listener( host=self.state.unit_broker.internal_address, auth_map=auth_map, scope="CLIENT" @@ -478,7 +538,7 @@ def external_listeners(self) -> list[Listener]: Listener( auth_map=auth, scope="EXTERNAL", - host=self.state.unit_broker.host, + host=self.state.unit_broker.node_ip, # default in case service not created yet during cluster init # will resolve during config-changed node_port=node_port, @@ -490,7 +550,12 @@ def external_listeners(self) -> list[Listener]: @property def all_listeners(self) -> list[Listener]: """Return a list with all expected listeners.""" - return [self.internal_listener] + self.client_listeners + self.external_listeners + return ( + [self.internal_listener] + + self.client_listeners + + self.external_listeners + + self.extra_listeners + ) @property def inter_broker_protocol_version(self) -> str: @@ -567,6 +632,41 @@ def metrics_reporter_properties(self) -> list[str]: username=ADMIN_USER, prefix="cruise.control.metrics.reporter" ) + @property + def authorizer_class(self) -> list[str]: + """Return the authorizer Java class used on Kafka.""" + if self.state.kraft_mode: + # return ["authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer"] + return [] + return ["authorizer.class.name=kafka.security.authorizer.AclAuthorizer"] + + @property + def controller_properties(self) -> list[str]: + """Builds all properties necessary for starting Kafka controller service. + + Returns: + List of properties to be set + """ + if self.state.kraft_mode == False: # noqa: E712 + return [] + + roles = [] + node_id = self.state.unit_broker.unit_id + if self.state.runs_broker: + roles.append("broker") + node_id += KRAFT_NODE_ID_OFFSET + if self.state.runs_controller: + roles.append("controller") + + properties = [ + f"process.roles={','.join(roles)}", + f"node.id={node_id}", + f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}", + f"controller.listener.names={CONTROLLER_LISTENER_NAME}", + ] + + return properties + @property def server_properties(self) -> list[str]: """Builds all properties necessary for starting Kafka service. @@ -583,6 +683,24 @@ def server_properties(self) -> list[str]: listeners_repr = [listener.listener for listener in self.all_listeners] advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] + if self.state.kraft_mode: + controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" + controller_listener = f"{CONTROLLER_LISTENER_NAME}://0.0.0.0:{CONTROLLER_PORT}" + + # NOTE: Case where the controller is running standalone. Early return with a + # smaller subset of config options + if not self.state.runs_broker: + properties = ( + [f"log.dirs={self.state.log_dirs}", f"listeners={controller_listener}"] + + self.controller_properties + # + self.authorizer_class + ) + return properties + + protocol_map.append(controller_protocol_map) + if self.state.runs_controller: + listeners_repr.append(controller_listener) + properties = ( [ f"super.users={self.state.super_users}", @@ -594,16 +712,20 @@ def server_properties(self) -> list[str]: f"inter.broker.protocol.version={self.inter_broker_protocol_version}", ] + self.scram_properties + + self.auth_properties + self.oauth_properties + self.config_properties + self.default_replication_properties - + self.auth_properties + self.rack_properties + DEFAULT_CONFIG_OPTIONS.split("\n") + + self.authorizer_class + + self.controller_properties ) if self.state.cluster.tls_enabled and self.state.unit_broker.certificate: - properties += self.tls_properties + self.zookeeper_tls_properties + properties += self.tls_properties + if self.state.kraft_mode == False: # noqa: E712 + properties += self.zookeeper_tls_properties if self.state.runs_balancer or BALANCER.value in self.state.peer_cluster.roles: properties += KAFKA_CRUISE_CONTROL_OPTIONS.splitlines() @@ -738,10 +860,12 @@ def goals(self) -> list[str]: if self.config.profile == PROFILE_TESTING: goals = BALANCER_GOALS_TESTING - if self.state.balancer.racks: + if self.state.peer_cluster.racks: if ( - min([3, len(self.state.balancer.broker_capacities.get("brokerCapacities", []))]) - > self.state.balancer.racks + min( + [3, len(self.state.peer_cluster.broker_capacities.get("brokerCapacities", []))] + ) + > self.state.peer_cluster.racks ): # replication-factor > racks is not ideal goals = goals + ["RackAwareDistribution"] else: @@ -795,10 +919,10 @@ def cruise_control_properties(self) -> list[str]: """ properties = ( [ - f"bootstrap.servers={self.state.balancer.broker_uris}", - f"zookeeper.connect={self.state.balancer.zk_uris}", + f"bootstrap.servers={self.state.peer_cluster.broker_uris}", + f"zookeeper.connect={self.state.peer_cluster.zk_uris}", "zookeeper.security.enabled=true", - f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{self.state.balancer.broker_username}" password="{self.state.balancer.broker_password}";', + f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{self.state.peer_cluster.broker_username}" password="{self.state.peer_cluster.broker_password}";', f"sasl.mechanism={self.state.default_auth.mechanism}", f"security.protocol={self.state.default_auth.protocol}", f"capacity.config.file={self.workload.paths.capacity_jbod_json}", @@ -824,8 +948,8 @@ def jaas_config(self) -> str: f""" Client {{ org.apache.zookeeper.server.auth.DigestLoginModule required - username="{self.state.balancer.zk_username}" - password="{self.state.balancer.zk_password}"; + username="{self.state.peer_cluster.zk_username}" + password="{self.state.peer_cluster.zk_password}"; }}; """ ) @@ -844,7 +968,7 @@ def set_cruise_control_properties(self) -> None: def set_broker_capacities(self) -> None: """Writes all broker storage capacities to `capacityJBOD.json`.""" self.workload.write( - content=json.dumps(self.state.balancer.broker_capacities), + content=json.dumps(self.state.peer_cluster.broker_capacities), path=self.workload.paths.capacity_jbod_json, ) diff --git a/src/managers/tls.py b/src/managers/tls.py index b6b64efa..fd937a1c 100644 --- a/src/managers/tls.py +++ b/src/managers/tls.py @@ -129,14 +129,11 @@ def remove_cert(self, alias: str) -> None: def _build_extra_sans(self) -> list[str]: """Parse the certificate_extra_sans config option.""" - extra_sans = self.config.certificate_extra_sans or "" - parsed_sans = [] - - if extra_sans == "": - return parsed_sans - - for sans in extra_sans.split(","): - parsed_sans.append(sans.replace("{unit}", str(self.state.unit_broker.unit_id))) + extra_sans = self.config.extra_listeners or self.config.certificate_extra_sans or [] + clean_sans = [san.split(":")[0] for san in extra_sans] + parsed_sans = [ + san.replace("{unit}", str(self.state.unit_broker.unit_id)) for san in clean_sans + ] return parsed_sans @@ -145,7 +142,7 @@ def build_sans(self) -> Sans: if self.substrate == "vm": return { "sans_ip": [ - self.state.unit_broker.host, + self.state.unit_broker.internal_address, ], "sans_dns": [self.state.unit_broker.unit.name, socket.getfqdn()] + self._build_extra_sans(), diff --git a/src/workload.py b/src/workload.py index ccce0948..96cfefd1 100644 --- a/src/workload.py +++ b/src/workload.py @@ -114,6 +114,32 @@ def run_bin_command( command = f"{self.paths.binaries_path}/bin/kafka-{bin_keyword}.sh {' '.join(bin_args)}" return self.exec(command=command.split(), env=parsed_opts or None) + def format_storages( + self, uuid: str, internal_user_credentials: dict[str, str] | None = None + ) -> None: + """Use a passed uuid to format storages.""" + # NOTE data dirs have changed permissions by storage_attached hook. For some reason + # storage command bin needs these locations to be root owned. Momentarily raise permissions + # during the format phase. + self.exec(["chown", "-R", "root:root", f"{self.paths.data_path}"]) + + command = [ + "format", + "--ignore-formatted", + "--cluster-id", + uuid, + "-c", + self.paths.server_properties, + ] + if internal_user_credentials: + for user, password in internal_user_credentials.items(): + command += ["--add-scram", f"SCRAM-SHA-512=[name={user},password={password}]"] + self.run_bin_command(bin_keyword="storage", bin_args=command) + + # Drop permissions again for the main process + self.exec(["chmod", "-R", "750", f"{self.paths.data_path}"]) + self.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.paths.data_path}"]) + # ------- Kafka vm specific ------- def install(self) -> None: diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py new file mode 100644 index 00000000..33bb91af --- /dev/null +++ b/tests/integration/test_kraft.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import os + +import pytest +from pytest_operator.plugin import OpsTest + +from literals import ( + CONTROLLER_PORT, + PEER_CLUSTER_ORCHESTRATOR_RELATION, + PEER_CLUSTER_RELATION, + SECURITY_PROTOCOL_PORTS, +) + +from .helpers import ( + APP_NAME, + KAFKA_CONTAINER, + get_address, + netcat, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.kraft + +CONTROLLER_APP = "controller" +PRODUCER_APP = "producer" + + +class TestKRaft: + + deployment_strat: str = os.environ.get("DEPLOYMENT", "multi") + controller_app: str = {"single": APP_NAME, "multi": CONTROLLER_APP}[deployment_strat] + + @pytest.mark.abort_on_fail + async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm): + await asyncio.gather( + ops_test.model.deploy( + kafka_charm, + application_name=APP_NAME, + num_units=1, + series="jammy", + config={ + "roles": "broker,controller" if self.controller_app == APP_NAME else "broker", + "profile": "testing", + }, + resources={"kafka-image": KAFKA_CONTAINER}, + trust=True, + ), + ops_test.model.deploy( + "kafka-test-app", + application_name=PRODUCER_APP, + channel="edge", + num_units=1, + series="jammy", + config={ + "topic_name": "HOT-TOPIC", + "num_messages": 100000, + "role": "producer", + "partitions": 20, + "replication_factor": "1", + }, + trust=True, + ), + ) + + if self.controller_app != APP_NAME: + await ops_test.model.deploy( + kafka_charm, + application_name=self.controller_app, + num_units=1, + series="jammy", + config={ + "roles": self.controller_app, + "profile": "testing", + }, + resources={"kafka-image": KAFKA_CONTAINER}, + trust=True, + ) + + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), + idle_period=30, + timeout=1800, + raise_on_error=False, + ) + if self.controller_app != APP_NAME: + assert ops_test.model.applications[APP_NAME].status == "blocked" + assert ops_test.model.applications[self.controller_app].status == "blocked" + else: + assert ops_test.model.applications[APP_NAME].status == "active" + + @pytest.mark.abort_on_fail + async def test_integrate(self, ops_test: OpsTest): + if self.controller_app != APP_NAME: + await ops_test.model.add_relation( + f"{APP_NAME}:{PEER_CLUSTER_ORCHESTRATOR_RELATION}", + f"{CONTROLLER_APP}:{PEER_CLUSTER_RELATION}", + ) + + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), idle_period=30 + ) + + async with ops_test.fast_forward(fast_interval="40s"): + await asyncio.sleep(120) + + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[self.controller_app].status == "active" + + @pytest.mark.abort_on_fail + async def test_listeners(self, ops_test: OpsTest): + print("SLEEPING") + logger.info("SLEEPING") + await asyncio.sleep(300) + address = await get_address(ops_test=ops_test) + assert netcat( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal + ) # Internal listener + + # Client listener should not be enabled if there is no relations + assert not netcat( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client + ) + + # Check controller socket + if self.controller_app != APP_NAME: + address = await get_address(ops_test=ops_test, app_name=self.controller_app) + + assert netcat(address, CONTROLLER_PORT) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index f467f62b..ce8adbae 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -136,7 +136,7 @@ def test_ready_to_start_blocks_no_zookeeper_relation(ctx: Context, base_state: S state_out = ctx.run(ctx.on.start(), state_in) # Then - assert state_out.unit_status == Status.ZK_NOT_RELATED.value.status + assert state_out.unit_status == Status.MISSING_MODE.value.status def test_ready_to_start_waits_no_zookeeper_data(ctx: Context, base_state: State) -> None: @@ -581,7 +581,7 @@ def test_storage_add_disableenables_and_starts( def test_zookeeper_changed_sets_passwords_and_creates_users_with_zk( - ctx: Context, base_state: State, zk_data: dict[str, str], passwords_data: dict[str, str] + ctx: Context, base_state: State, zk_data: dict[str, str] ) -> None: """Checks inter-broker passwords are created on zookeeper-changed hook using zk auth.""" # Given @@ -795,7 +795,7 @@ def test_config_changed_updates_client_data(ctx: Context, base_state: State) -> patched_update_client_data.assert_called_once() -def test_config_changed_restarts(ctx: Context, base_state: State, monkeypatch) -> None: +def test_config_changed_restarts(ctx: Context, base_state: State) -> None: """Checks units rolling-restat on config changed hook.""" # Given cluster_peer = PeerRelation(PEER, PEER) @@ -841,7 +841,7 @@ def test_on_remove_sysctl_is_deleted(ctx: Context, base_state: State): patched_sysctl_remove.assert_called_once() -def test_workload_version_is_setted(ctx: Context, base_state: State, monkeypatch): +def test_workload_version_is_setted(ctx: Context, base_state: State): # Given output_bin_install = "3.6.0-ubuntu0" output_bin_changed = "3.6.1-ubuntu0" diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 91fd845e..bb94d261 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -128,7 +128,7 @@ def test_log_dirs_in_server_properties(ctx: Context, base_state: State) -> None: def test_listeners_in_server_properties( - charm_configuration: dict, base_state: State, zk_data: dict[str, str], monkeypatch + charm_configuration: dict, base_state: State, zk_data: dict[str, str] ) -> None: """Checks that listeners are split into INTERNAL, CLIENT and EXTERNAL.""" # Given @@ -196,6 +196,97 @@ def test_listeners_in_server_properties( assert listener in advertised_listeners +def test_extra_listeners_in_server_properties( + charm_configuration: dict, base_state: State, zk_data: dict[str, str] +): + """Checks that the extra-listeners are properly set from config.""" + # Given + charm_configuration["options"]["extra_listeners"][ + "default" + ] = "worker-{unit}.foo.com:30000,worker-{unit}.bar.com:40000" + cluster_peer = PeerRelation(PEER, PEER, local_unit_data={"private-address": "treebeard"}) + zk_relation = Relation(ZK, ZK, remote_app_data=zk_data) + client_relation = Relation( + REL_NAME, "app", remote_app_data={"extra-user-roles": "admin,producer"} + ) + state_in = dataclasses.replace( + base_state, relations=[cluster_peer, zk_relation, client_relation] + ) + ctx = Context( + KafkaCharm, meta=METADATA, config=charm_configuration, actions=ACTIONS, unit_id=0 + ) + expected_listener_names = { + "INTERNAL_SASL_PLAINTEXT_SCRAM_SHA_512", + "CLIENT_SASL_PLAINTEXT_SCRAM_SHA_512", + "CLIENT_SSL_SSL", + "EXTRA_SASL_PLAINTEXT_SCRAM_SHA_512_0", + "EXTRA_SASL_PLAINTEXT_SCRAM_SHA_512_1", + "EXTRA_SSL_SSL_0", + "EXTRA_SSL_SSL_1", + } + + # When + with ctx(ctx.on.config_changed(), state_in) as manager: + charm = cast(KafkaCharm, manager.charm) + + # Then + # 2 extra, 1 internal, 1 client + assert len(charm.broker.config_manager.all_listeners) == 4 + + # Adding SSL + cluster_peer = dataclasses.replace(cluster_peer, local_app_data={"tls": "enabled"}) + state_in = dataclasses.replace(base_state, relations=[cluster_peer, client_relation]) + + # When + with ctx(ctx.on.config_changed(), state_in) as manager: + charm = cast(KafkaCharm, manager.charm) + + # Then + # 2 extra, 1 internal, 1 client + assert len(charm.broker.config_manager.all_listeners) == 4 + + # Adding SSL + cluster_peer = dataclasses.replace( + cluster_peer, local_app_data={"tls": "enabled", "mtls": "enabled"} + ) + state_in = dataclasses.replace(base_state, relations=[cluster_peer, client_relation]) + + # When + with ctx(ctx.on.config_changed(), state_in) as manager: + charm = cast(KafkaCharm, manager.charm) + + # Then + # 2 extra sasl_ssl, 2 extra ssl, 1 internal, 2 client + assert len(charm.broker.config_manager.all_listeners) == 7 + + advertised_listeners_prop = "" + for prop in charm.broker.config_manager.server_properties: + if "advertised.listener" in prop: + advertised_listeners_prop = prop + + # validating every expected listener is present + for name in expected_listener_names: + assert name in advertised_listeners_prop + + # validating their allocated ports are expected + ports = [] + for listener in advertised_listeners_prop.split("=")[1].split(","): + name, _, port = listener.split(":") + + if name.endswith("_0") or name.endswith("_1"): + # verifying allocation uses the baseport + digit = 10**4 + assert int(port) // digit * digit in (30000, 40000) + + # verifying allocation is in steps of 100 + digit = 10**2 + assert int(port) // digit * digit in (39000, 39100, 49000, 49100) + + # verifying all ports are unique + assert port not in ports + ports.append(port) + + def test_oauth_client_listeners_in_server_properties(ctx: Context, base_state: State) -> None: """Checks that oauth client listeners are properly set when a relating through oauth.""" # Given diff --git a/tests/unit/test_kraft.py b/tests/unit/test_kraft.py new file mode 100644 index 00000000..7c6b6911 --- /dev/null +++ b/tests/unit/test_kraft.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import dataclasses +import json +import logging +from pathlib import Path +from unittest.mock import patch + +import pytest +import yaml +from ops import ActiveStatus +from ops.testing import Container, Context, PeerRelation, Relation, State + +from charm import KafkaCharm +from literals import ( + CONTAINER, + PEER, + PEER_CLUSTER_ORCHESTRATOR_RELATION, + PEER_CLUSTER_RELATION, + SUBSTRATE, + Status, +) + +pytestmark = pytest.mark.kraft + +logger = logging.getLogger(__name__) + + +CONFIG = yaml.safe_load(Path("./config.yaml").read_text()) +ACTIONS = yaml.safe_load(Path("./actions.yaml").read_text()) +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) + + +@pytest.fixture() +def charm_configuration(): + """Enable direct mutation on configuration dict.""" + return json.loads(json.dumps(CONFIG)) + + +@pytest.fixture() +def base_state(): + + if SUBSTRATE == "k8s": + state = State(leader=True, containers=[Container(name=CONTAINER, can_connect=True)]) + + else: + state = State(leader=True) + + return state + + +def test_ready_to_start_maintenance_no_peer_relation(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + state_in = base_state + + # When + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_PEER_RELATION.value.status + + +def test_ready_to_start_no_peer_cluster(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + state_in = dataclasses.replace(base_state, relations=[cluster_peer]) + + # When + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_PEER_CLUSTER_RELATION.value.status + + +def test_ready_to_start_missing_data_as_controller(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + peer_cluster = Relation(PEER_CLUSTER_RELATION, "peer_cluster") + state_in = dataclasses.replace(base_state, relations=[cluster_peer, peer_cluster]) + + # When + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_BROKER_DATA.value.status + + +def test_ready_to_start_missing_data_as_broker(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "broker" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + peer_cluster = Relation( + PEER_CLUSTER_ORCHESTRATOR_RELATION, "peer_cluster", remote_app_data={"roles": "controller"} + ) + state_in = dataclasses.replace(base_state, relations=[cluster_peer, peer_cluster]) + + # When + with patch("workload.KafkaWorkload.run_bin_command", return_value="cluster-uuid-number"): + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_QUORUM_URIS.value.status + + +def test_ready_to_start(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "broker,controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + state_in = dataclasses.replace(base_state, relations=[cluster_peer]) + + # When + with ( + patch( + "workload.KafkaWorkload.run_bin_command", return_value="cluster-uuid-number" + ) as patched_run_bin_command, + patch("health.KafkaHealth.machine_configured", return_value=True), + patch("workload.KafkaWorkload.start"), + patch("workload.KafkaWorkload.active", return_value=True), + patch("charms.operator_libs_linux.v1.snap.SnapCache"), + ): + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + # Second call of format will have to pass "cluster-uuid-number" as set above + assert "cluster-uuid-number" in patched_run_bin_command.call_args_list[1][1]["bin_args"] + assert "cluster-uuid" in state_out.get_relations(PEER)[0].local_app_data + assert "controller-quorum-uris" in state_out.get_relations(PEER)[0].local_app_data + # Only the internal users should be created. + assert "admin-password" in next(iter(state_out.secrets)).latest_content + assert "sync-password" in next(iter(state_out.secrets)).latest_content + assert state_out.unit_status == ActiveStatus() diff --git a/tests/unit/test_structured_config.py b/tests/unit/test_structured_config.py index fbe810d7..c9c6b284 100644 --- a/tests/unit/test_structured_config.py +++ b/tests/unit/test_structured_config.py @@ -174,3 +174,14 @@ def test_incorrect_roles(): valid_values = ["broker", "balancer", "balancer,broker", "broker, balancer "] check_invalid_values("roles", erroneus_values) check_valid_values("roles", valid_values) + + +def test_incorrect_extra_listeners(): + erroneus_values = [ + "missing.port", + "low.port:15000", + "high.port:60000", + "non.unique:30000,other.non.unique:30000", + "close.port:30000,other.close.port:30001", + ] + check_invalid_values("extra_listeners", erroneus_values) diff --git a/tests/unit/test_tls.py b/tests/unit/test_tls.py index bc351d95..c5172af1 100644 --- a/tests/unit/test_tls.py +++ b/tests/unit/test_tls.py @@ -94,18 +94,31 @@ def test_mtls_added(ctx: Context, base_state: State) -> None: @pytest.mark.parametrize( - ["extra_sans", "expected"], + ["config_option", "extra_sans", "expected"], [ - ("", []), - ("worker{unit}.com", ["worker0.com"]), - ("worker{unit}.com,{unit}.example", ["worker0.com", "0.example"]), + ("certificate_extra_sans", "", []), + ("certificate_extra_sans", "worker{unit}.com", ["worker0.com"]), + ( + "certificate_extra_sans", + "worker{unit}.com,{unit}.example", + ["worker0.com", "0.example"], + ), + ( + "extra_listeners", + "worker{unit}.com:30000,{unit}.example:40000,nonunit.domain.com:45000", + ["worker0.com", "0.example", "nonunit.domain.com"], + ), ], ) def test_extra_sans_config( - charm_configuration: dict, base_state: State, extra_sans: str, expected: list[str] + charm_configuration: dict, + base_state: State, + config_option: str, + extra_sans: str, + expected: list[str], ) -> None: # Given - charm_configuration["options"]["certificate_extra_sans"]["default"] = extra_sans + charm_configuration["options"][config_option]["default"] = extra_sans cluster_peer = PeerRelation( PEER, PEER, diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index 5992501c..d4e6ba0c 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -13,11 +13,11 @@ import yaml from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError, DependencyModel from kazoo.client import KazooClient -from ops.testing import ActionFailed, Container, Context, Harness, PeerRelation, State +from ops.testing import ActionFailed, Container, Context, PeerRelation, State from charm import KafkaCharm from events.upgrade import KafkaDependencyModel -from literals import CHARM_KEY, CONTAINER, DEPENDENCIES, PEER, SUBSTRATE, ZK +from literals import CONTAINER, DEPENDENCIES, PEER, SUBSTRATE logger = logging.getLogger(__name__) @@ -61,34 +61,6 @@ def upgrade_func() -> str: return "_on_upgrade_granted" -@pytest.fixture -def harness(zk_data): - harness = Harness(KafkaCharm, meta=str(METADATA), config=str(CONFIG), actions=str(ACTIONS)) - harness.add_relation("restart", CHARM_KEY) - harness.add_relation("upgrade", CHARM_KEY) - - if SUBSTRATE == "k8s": - harness.set_can_connect(CONTAINER, True) - - peer_rel_id = harness.add_relation(PEER, CHARM_KEY) - zk_rel_id = harness.add_relation(ZK, ZK) - harness._update_config( - { - "log_retention_ms": "-1", - "compression_type": "producer", - } - ) - harness.begin() - with harness.hooks_disabled(): - harness.add_relation_unit(peer_rel_id, f"{CHARM_KEY}/0") - harness.update_relation_data( - peer_rel_id, f"{CHARM_KEY}/0", {"private-address": "000.000.000"} - ) - harness.update_relation_data(zk_rel_id, ZK, zk_data) - - return harness - - def test_pre_upgrade_check_raises_not_stable(ctx: Context, base_state: State) -> None: # Given state_in = base_state diff --git a/tox.ini b/tox.ini index c01d6502..94e96431 100644 --- a/tox.ini +++ b/tox.ini @@ -31,6 +31,9 @@ set_env = ha: TEST_FILE=ha/test_ha.py balancer-single: DEPLOYMENT=single balancer-multi: DEPLOYMENT=multi + kraft-single: DEPLOYMENT=single + kraft-multi: DEPLOYMENT=multi + pass_env = PYTHONPATH CHARM_BUILD_DIR @@ -82,6 +85,18 @@ commands = poetry run coverage report poetry run coverage xml + +[testenv:integration] +description = Run integration tests +pass_env = + {[testenv]pass_env} + CI + CI_PACKED_CHARMS +commands = + poetry install --no-root --with integration + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ + + [testenv:integration-{charm,provider,scaling,password-rotation,tls,upgrade,ha}] description = Run integration tests pass_env = @@ -99,3 +114,16 @@ pass_env = commands = poetry install --no-root --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_balancer.py + +[testenv:integration-kraft-{single,multi}] +description = Run KRaft mode tests +set_env = + {[testenv]set_env} + # Workaround for https://github.com/python-poetry/poetry/issues/6958 + POETRY_INSTALLER_PARALLEL = false +pass_env = + {[testenv]pass_env} + CI +commands = + poetry install --no-root --with integration + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_kraft.py