diff --git a/lib/charms/zookeeper/v0/client.py b/lib/charms/zookeeper/v0/client.py index e7f03cc5..d0b2d78a 100644 --- a/lib/charms/zookeeper/v0/client.py +++ b/lib/charms/zookeeper/v0/client.py @@ -74,7 +74,7 @@ def update_cluster(new_members: List[str], event: EventBase) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 6 +LIBPATCH = 8 logger = logging.getLogger(__name__) @@ -101,6 +101,12 @@ class QuorumLeaderNotFoundError(Exception): pass +class NoUnitFoundError(Exception): + """Generic exception for when there are no running zk unit in the app.""" + + pass + + class ZooKeeperManager: """Handler for performing ZK commands.""" @@ -114,6 +120,7 @@ def __init__( keyfile_path: Optional[str] = "", keyfile_password: Optional[str] = "", certfile_path: Optional[str] = "", + read_only: bool = True, ): self.hosts = hosts self.username = username @@ -123,12 +130,21 @@ def __init__( self.keyfile_path = keyfile_path self.keyfile_password = keyfile_password self.certfile_path = certfile_path - self.leader = "" + self.zk_host = "" + self.read_only = read_only - try: - self.leader = self.get_leader() - except RetryError: - raise QuorumLeaderNotFoundError("quorum leader not found") + if not read_only: + try: + self.zk_host = self.get_leader() + except RetryError: + raise QuorumLeaderNotFoundError("quorum leader not found") + + else: + try: + self.zk_host = self.get_any_unit() + + except RetryError: + raise NoUnitFoundError @retry( wait=wait_fixed(3), @@ -170,6 +186,35 @@ def get_leader(self) -> str: return leader or "" + @retry( + wait=wait_fixed(3), + stop=stop_after_attempt(2), + retry=retry_if_not_result(lambda result: True if result else False), + ) + def get_any_unit(self) -> str: + any_host = None + for host in self.hosts: + try: + with ZooKeeperClient( + host=host, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + response = zk.srvr + if response: + any_host = host + break + except KazooTimeoutError: # in the case of having a dead unit in relation data + logger.debug(f"TIMEOUT - {host}") + continue + + return any_host or "" + @property def server_members(self) -> Set[str]: """The current members within the ZooKeeper quorum. @@ -179,7 +224,7 @@ def server_members(self) -> Set[str]: e.g {"server.1=10.141.78.207:2888:3888:participant;0.0.0.0:2181"} """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -200,7 +245,7 @@ def config_version(self) -> int: The zookeeper config version decoded from base16 """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -221,7 +266,7 @@ def members_syncing(self) -> bool: True if any members are syncing. Otherwise False. """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -305,7 +350,7 @@ def add_members(self, members: Iterable[str]) -> None: # specific connection to leader with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -330,7 +375,7 @@ def remove_members(self, members: Iterable[str]) -> None: for member in members: member_id = re.findall(r"server.([0-9]+)", member)[0] with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -356,7 +401,7 @@ def leader_znodes(self, path: str) -> Set[str]: Set of all nested child zNodes """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -369,7 +414,7 @@ def leader_znodes(self, path: str) -> Set[str]: return all_znode_children - def create_znode_leader(self, path: str, acls: List[ACL]) -> None: + def create_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None: """Creates a new zNode on the current quorum leader with given ACLs. Args: @@ -377,7 +422,7 @@ def create_znode_leader(self, path: str, acls: List[ACL]) -> None: acls: the ACLs to be set on that path """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -388,7 +433,7 @@ def create_znode_leader(self, path: str, acls: List[ACL]) -> None: ) as zk: zk.create_znode(path=path, acls=acls) - def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None: + def set_acls_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None: """Updates ACLs for an existing zNode on the current quorum leader. Args: @@ -396,7 +441,7 @@ def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None: acls: the new ACLs to be set on that path """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -414,7 +459,7 @@ def delete_znode_leader(self, path: str) -> None: path: the zNode path to delete """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -432,7 +477,7 @@ def get_version(self) -> str: String of ZooKeeper service version """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -577,7 +622,7 @@ def delete_znode(self, path: str) -> None: return self.client.delete(path, recursive=True) - def create_znode(self, path: str, acls: List[ACL]) -> None: + def create_znode(self, path: str, acls: List[ACL] | None = None) -> None: """Create new znode. Args: @@ -599,7 +644,7 @@ def get_acls(self, path: str) -> List[ACL]: return acl_list if acl_list else [] - def set_acls(self, path: str, acls: List[ACL]) -> None: + def set_acls(self, path: str, acls: List[ACL] | None = None) -> None: """Sets acls for a desired znode path. Args: diff --git a/requirements.txt b/requirements.txt index c4e99f5d..5a13f4e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ charset-normalizer==3.4.0 ; python_version >= "3.10" and python_version < "4.0" cryptography==43.0.3 ; python_version >= "3.10" and python_version < "4.0" exceptiongroup==1.2.2 ; python_version >= "3.10" and python_version < "3.11" h11==0.14.0 ; python_version >= "3.10" and python_version < "4.0" -httpcore==1.0.6 ; python_version >= "3.10" and python_version < "4.0" +httpcore==1.0.7 ; python_version >= "3.10" and python_version < "4.0" httpx==0.27.2 ; python_version >= "3.10" and python_version < "4.0" idna==3.10 ; python_version >= "3.10" and python_version < "4.0" jsonschema-specifications==2024.10.1 ; python_version >= "3.10" and python_version < "4.0" @@ -17,11 +17,11 @@ lightkube==0.15.0 ; python_version >= "3.10" and python_version < "4.0" ops==2.17.0 ; python_version >= "3.10" and python_version < "4.0" pure-sasl==0.6.2 ; python_version >= "3.10" and python_version < "4.0" pycparser==2.22 ; python_version >= "3.10" and python_version < "4.0" and platform_python_implementation != "PyPy" -pydantic==1.10.18 ; python_version >= "3.10" and python_version < "4.0" +pydantic==1.10.19 ; python_version >= "3.10" and python_version < "4.0" pyyaml==6.0.2 ; python_version >= "3.10" and python_version < "4.0" referencing==0.35.1 ; python_version >= "3.10" and python_version < "4.0" requests==2.32.3 ; python_version >= "3.10" and python_version < "4.0" -rpds-py==0.18.1 ; python_version >= "3.10" and python_version < "4.0" +rpds-py==0.21.0 ; python_version >= "3.10" and python_version < "4.0" sniffio==1.3.1 ; python_version >= "3.10" and python_version < "4.0" tenacity==9.0.0 ; python_version >= "3.10" and python_version < "4.0" typing-extensions==4.12.2 ; python_version >= "3.10" and python_version < "4.0" diff --git a/src/core/models.py b/src/core/models.py index bd6eacf1..ace4752d 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -15,7 +15,11 @@ DataPeerData, DataPeerUnitData, ) -from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager +from charms.zookeeper.v0.client import ( + NoUnitFoundError, + QuorumLeaderNotFoundError, + ZooKeeperManager, +) from kazoo.client import AuthFailedError, ConnectionLoss, NoNodeError from kazoo.exceptions import NoAuthError from lightkube.resources.core_v1 import Node, Pod @@ -689,23 +693,6 @@ def chroot(self) -> str: or "" ) - @property - def uris(self) -> str: - """Comma separated connection string, containing endpoints + chroot.""" - if not self.relation: - return "" - - return ",".join( - sorted( # sorting as they may be disordered - ( - self.data_interface.fetch_relation_field( - relation_id=self.relation.id, field="uris" - ) - or "" - ).split(",") - ) - ) - @property def tls(self) -> bool: """Check if TLS is enabled on ZooKeeper.""" @@ -737,11 +724,45 @@ def zookeeper_connected(self) -> bool: return True + @property + def hosts(self) -> list[str]: + """Get the hosts from the databag.""" + return [host.split(":")[0] for host in self.endpoints.split(",")] + + @property + def uris(self): + """Comma separated connection string, containing endpoints + chroot.""" + return f"{self.endpoints.removesuffix('/')}/{self.database.removeprefix('/')}" + + @property + def port(self) -> int: + """Get the port in use from the databag. + + We can extract from: + - host1:port,host2:port + - host1,host2:port + """ + try: + port = next( + iter([int(host.split(":")[1]) for host in reversed(self.endpoints.split(","))]), + 2181, + ) + except IndexError: + # compatibility with older zk versions + port = 2181 + + return port + @property def zookeeper_version(self) -> str: """Get running zookeeper version.""" - hosts = self.endpoints.split(",") - zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password) + zk = ZooKeeperManager( + hosts=self.hosts, + client_port=self.port, + username=self.username, + password=self.password, + use_ssl=self.tls, + ) return zk.get_version() @@ -755,16 +776,22 @@ def zookeeper_version(self) -> str: def broker_active(self) -> bool: """Checks if broker id is recognised as active by ZooKeeper.""" broker_id = self.data_interface.local_unit.name.split("/")[1] - hosts = self.endpoints.split(",") path = f"{self.database}/brokers/ids/" - zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password) try: + zk = ZooKeeperManager( + hosts=self.hosts, + client_port=self.port, + username=self.username, + password=self.password, + use_ssl=self.tls, + ) brokers = zk.leader_znodes(path=path) except ( NoNodeError, AuthFailedError, QuorumLeaderNotFoundError, + NoUnitFoundError, ConnectionLoss, NoAuthError, ) as e: diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index fcd9bbcd..6272ee94 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -8,7 +8,7 @@ import subprocess from pathlib import Path from subprocess import PIPE, CalledProcessError, check_output -from typing import Any, Dict, List, Optional, Set +from typing import Any, List, Optional, Set import yaml from charms.kafka.client import KafkaClient @@ -485,7 +485,7 @@ def get_provider_data( return provider_relation_data | user_secret | tls_secret -def get_active_brokers(config: Dict) -> Set[str]: +def get_active_brokers(config: dict[str, str]) -> set[str]: """Gets all brokers currently connected to ZooKeeper. Args: @@ -495,9 +495,9 @@ def get_active_brokers(config: Dict) -> Set[str]: Set of active broker ids """ chroot = config.get("database", config.get("chroot", "")) - hosts = config.get("endpoints", "").split(",") username = config.get("username", "") password = config.get("password", "") + hosts = [host.split(":")[0] for host in config.get("endpoints", "").split(",")] zk = ZooKeeperManager(hosts=hosts, username=username, password=password) path = f"{chroot}/brokers/ids/" diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index bb94d261..321330dd 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -392,8 +392,7 @@ def test_zookeeper_config_succeeds_fails_config(ctx: Context, base_state: State) "database": "/kafka", "chroot": "/kafka", "username": "moria", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181,2.2.2.2:2181/kafka", + "endpoints": "1.1.1.1:2181,2.2.2.2:2181", "tls": "disabled", }, ) @@ -418,8 +417,7 @@ def test_zookeeper_config_succeeds_valid_config(ctx: Context, base_state: State) "chroot": "/kafka", "username": "moria", "password": "mellon", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", + "endpoints": "1.1.1.1:2181,2.2.2.2:2181", "tls": "disabled", }, ) diff --git a/tox.ini b/tox.ini index 94e96431..d660be5d 100644 --- a/tox.ini +++ b/tox.ini @@ -60,6 +60,7 @@ commands = --skip {tox_root}/.git \ --skip {tox_root}/.tox \ --skip {tox_root}/build \ + --skip {tox_root}/docs \ --skip {tox_root}/lib \ --skip {tox_root}/tests/integration/*/lib \ --skip {tox_root}/venv \