From ce3b16a6896a95a977af29cc0f05bbfad75fe91c Mon Sep 17 00:00:00 2001 From: Carl Csaposs Date: Wed, 17 Apr 2024 11:57:54 +0200 Subject: [PATCH] Only acquire opensearch lock if 2+ units online (instead of 1+ units) Context: https://chat.canonical.com/canonical/pl/4pheeofpab8f7dpmoddprh4too --- .../opensearch/v0/opensearch_locking.py | 142 ++++++++++-------- 1 file changed, 83 insertions(+), 59 deletions(-) diff --git a/lib/charms/opensearch/v0/opensearch_locking.py b/lib/charms/opensearch/v0/opensearch_locking.py index fc3c16c8e..99a2ef1b7 100644 --- a/lib/charms/opensearch/v0/opensearch_locking.py +++ b/lib/charms/opensearch/v0/opensearch_locking.py @@ -7,6 +7,7 @@ import typing import ops +from charms.opensearch.v0.helper_cluster import ClusterTopology from charms.opensearch.v0.opensearch_exceptions import OpenSearchHttpError if typing.TYPE_CHECKING: @@ -168,68 +169,91 @@ def acquired(self) -> bool: # noqa: C901 host = None alt_hosts = [host for host in self._charm.alt_hosts if self._opensearch.is_node_up(host)] if host or alt_hosts: - logger.debug("[Node lock] Using opensearch for lock") - # Acquire opensearch lock - # Create index if it doesn't exist + logger.debug("[Node lock] 1+ opensearch nodes online") try: - self._opensearch.request( - "PUT", - endpoint=f"/{self._OPENSEARCH_INDEX}", - host=host, - alt_hosts=alt_hosts, - retries=3, - payload={"settings": {"index": {"auto_expand_replicas": "0-all"}}}, - ) - except OpenSearchHttpError as e: - if ( - e.response_code == 400 - and e.response_body.get("error", {}).get("type") - == "resource_already_exists_exception" - ): - # Index already created - pass - else: - logger.exception("Error creating OpenSearch lock index") - return False - # Attempt to create document id 0 - try: - self._opensearch.request( - "PUT", - endpoint=f"/{self._OPENSEARCH_INDEX}/_create/0?refresh=true", - host=host, - alt_hosts=alt_hosts, - retries=3, - payload={"unit-name": self._charm.unit.name}, + online_nodes = len( + ClusterTopology.nodes( + self._opensearch, use_localhost=host is not None, hosts=alt_hosts + ) ) - except OpenSearchHttpError as e: - if e.response_code == 409 and "document already exists" in e.response_body.get( - "error", {} - ).get("reason", ""): - # Document already created - if (unit := self._unit_with_lock(host)) != self._charm.unit.name: - # Another unit has lock - # (Or document deleted after last request & before request in - # `self._lock_acquired()`) - logger.debug( - f"[Node lock] Not acquired. Unit with opensearch lock: {unit}" - ) + except OpenSearchHttpError: + logger.exception("Error getting OpenSearch nodes") + return False + logger.debug(f"[Node lock] Opensearch {online_nodes=}") + assert online_nodes > 0 + if online_nodes >= 2: + logger.debug("[Node lock] Attempting to acquire opensearch lock") + # Acquire opensearch lock + # Create index if it doesn't exist + try: + self._opensearch.request( + "PUT", + endpoint=f"/{self._OPENSEARCH_INDEX}", + host=host, + alt_hosts=alt_hosts, + retries=3, + payload={"settings": {"index": {"auto_expand_replicas": "0-all"}}}, + ) + except OpenSearchHttpError as e: + if ( + e.response_code == 400 + and e.response_body.get("error", {}).get("type") + == "resource_already_exists_exception" + ): + # Index already created + pass + else: + logger.exception("Error creating OpenSearch lock index") return False - else: - logger.exception("Error creating OpenSearch lock document") - return False - # Lock acquired - # Release peer databag lock, if any - logger.debug("[Node lock] Acquired via opensearch") - self._peer.release() - logger.debug("[Node lock] Released redundant peer lock (if held)") - return True - else: - logger.debug("[Node lock] Using peer databag for lock") - # Request peer databag lock - # If return value is True: - # - Lock granted in previous Juju event - # - OR, unit is leader & lock granted in this Juju event - return self._peer.acquired + # Attempt to create document id 0 + try: + self._opensearch.request( + "PUT", + endpoint=f"/{self._OPENSEARCH_INDEX}/_create/0?refresh=true", + host=host, + alt_hosts=alt_hosts, + retries=3, + payload={"unit-name": self._charm.unit.name}, + ) + except OpenSearchHttpError as e: + if e.response_code == 409 and "document already exists" in e.response_body.get( + "error", {} + ).get("reason", ""): + # Document already created + pass + else: + logger.exception("Error creating OpenSearch lock document") + return False + unit = self._unit_with_lock(host) + if unit == self._charm.unit.name: + # Lock acquired + # Release peer databag lock, if any + logger.debug("[Node lock] Acquired via opensearch") + self._peer.release() + logger.debug("[Node lock] Released redundant peer lock (if held)") + return True + if unit or online_nodes >= 2: + # Another unit has lock + # (Or document deleted after request to create document & before request in + # `self._unit_with_lock()`) + logger.debug(f"[Node lock] Not acquired. Unit with opensearch lock: {unit}") + return False + # If online_nodes == 1, we should acquire the lock via the peer databag. + # If we acquired the lock via OpenSearch and this unit was stopping, we would be unable + # to release the OpenSearch lock. For example, when scaling to 0. + # Then, when 1+ OpenSearch nodes are online, a unit that no longer exists could hold + # the lock. + # Note: if online_nodes > 1, this situation is still possible (e.g. if this unit was + # stopping and another unit went offline simultaneously)—but it's an edge case we don't + # support (to reduce complexity & improve robustness in other cases). + # If online_nodes > 1, we should re-attempt to acquire the OpenSearch lock. + logger.debug("[Node lock] No unit has opensearch lock") + logger.debug("[Node lock] Using peer databag for lock") + # Request peer databag lock + # If return value is True: + # - Lock granted in previous Juju event + # - OR, unit is leader & lock granted in this Juju event + return self._peer.acquired def release(self): """Release lock.