diff --git a/lib/charms/opensearch/v0/constants_charm.py b/lib/charms/opensearch/v0/constants_charm.py index bac758da5..8c64fcb86 100644 --- a/lib/charms/opensearch/v0/constants_charm.py +++ b/lib/charms/opensearch/v0/constants_charm.py @@ -14,6 +14,9 @@ LIBPATCH = 1 +SERVICE_MANAGER = "service" + + # Blocked statuses WaitingToStart = "Waiting for OpenSearch to start..." InstallError = "Could not install OpenSearch." @@ -57,6 +60,7 @@ PClusterWrongNodesCountForQuorum = ( "Even number of members in quorum if current unit started. Add or remove 1 unit." ) +LockIsBlockedOnUnit = "Lock in {} is blocked on unit: {}" # Wait status RequestUnitServiceOps = "Requesting lock on operation: {}" diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index 0718e0401..625a03bc0 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -10,6 +10,7 @@ from charms.grafana_agent.v0.cos_agent import COSAgentProvider from charms.opensearch.v0.constants_charm import ( + SERVICE_MANAGER, AdminUserInitProgress, CertsExpirationError, ClientRelationName, @@ -61,7 +62,7 @@ from charms.opensearch.v0.opensearch_locking import ( OpenSearchOpsLock, OpenSearchRetryLockLaterException, - RollingOpsManagerWithExclusions, + OpenSearchRollingOpsManager, ) from charms.opensearch.v0.opensearch_nodes_exclusions import ( ALLOCS_TO_DELETE, @@ -114,7 +115,6 @@ LIBPATCH = 2 -SERVICE_MANAGER = "service" STORAGE_NAME = "opensearch-data" @@ -154,7 +154,7 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): self.plugin_manager = OpenSearchPluginManager(self) self.backup = OpenSearchBackup(self) - self.service_manager = RollingOpsManagerWithExclusions( + self.service_manager = OpenSearchRollingOpsManager( self, relation=SERVICE_MANAGER, callback=self._restart_opensearch ) self.user_manager = OpenSearchUserManager(self) @@ -646,14 +646,12 @@ def _start_opensearch(self, _) -> None: # noqa: C901 # Retrieve the nodes of the cluster, needed to configure this node nodes = self._get_nodes(False) - # validate the roles prior to starting - self.opensearch_peer_cm.validate_roles(nodes, on_new_unit=True) logger.debug("_start_opensearch: _set_node_conf is being called") # Set the configuration of the node self._set_node_conf(nodes) - logger.debug("_start_opensearch: roles validated") + logger.debug("_start_opensearch: start service") self.opensearch.start( wait_until_http_200=( @@ -726,19 +724,32 @@ def _restart_opensearch(self, event: EventBase) -> None: service_was_stopped = True logger.debug("Rolling Ops Manager: stop_opensearch called") + # Retrieve the nodes of the cluster, needed to configure this node + nodes = self._get_nodes(False) + # validate the roles prior to starting + # We want to do it only once, as we may start the service, which changes + # the node count, but we retry the _start_opensearch a couple of times + # while the service itself comes up + self.opensearch_peer_cm.validate_roles(nodes, on_new_unit=True) + self._start_opensearch(event) + except OpenSearchProvidedRolesException as e: + logger.error("Restart failed: provided roles are wrong") + self.app.status = BlockedStatus(str(e)) + # We do not restart the service. + # We want to review the provided roles first + retry_restart_later = False except OpenSearchError as e: # An error happened: no python-native exception # In this case, we want to retry later - logger.error(f"Rolling Ops Manager: Restarting OpenSearch failed: {e}") + logger.error(f"Restarting OpenSearch failed: {e}") retry_restart_later = True - finally: # in any error, we want to get the service up and running if it # was the case before. That tries to assure we did not lose a node # because we did not restart it correctly in the event of a failure. if service_was_stopped and not self.opensearch.is_active(): self.opensearch.start() - + finally: if retry_restart_later: # Message the lock manager we want to retry this lock later. raise OpenSearchRetryLockLaterException() diff --git a/lib/charms/opensearch/v0/opensearch_distro.py b/lib/charms/opensearch/v0/opensearch_distro.py index f3d7badb4..10f5367e5 100644 --- a/lib/charms/opensearch/v0/opensearch_distro.py +++ b/lib/charms/opensearch/v0/opensearch_distro.py @@ -17,6 +17,7 @@ import requests import urllib3.exceptions +from charms.opensearch.v0.constants_charm import SERVICE_MANAGER from charms.opensearch.v0.constants_secrets import ADMIN_PW from charms.opensearch.v0.helper_cluster import Node from charms.opensearch.v0.helper_conf_setter import YamlConfigSetter @@ -24,6 +25,7 @@ get_host_ip, is_reachable, reachable_hosts, + unit_ip, ) from charms.opensearch.v0.opensearch_exceptions import ( OpenSearchCmdError, @@ -161,6 +163,16 @@ def is_node_up(self) -> bool: except (OpenSearchHttpError, Exception): return False + def is_remote_node_up(self, unit, relation: str = SERVICE_MANAGER) -> bool: + """Get status of current node. This assumes OpenSearch is Running.""" + try: + resp_code = self.request( + "GET", "/_nodes", host=unit_ip(self._charm, unit, relation), resp_status_code=True + ) + return resp_code < 400 + except (OpenSearchHttpError, Exception): + return False + def run_bin(self, bin_script_name: str, args: str = None, stdin: str = None) -> str: """Run opensearch provided bin command, relative to OPENSEARCH_BIN. diff --git a/lib/charms/opensearch/v0/opensearch_locking.py b/lib/charms/opensearch/v0/opensearch_locking.py index 33b87656d..345828779 100644 --- a/lib/charms/opensearch/v0/opensearch_locking.py +++ b/lib/charms/opensearch/v0/opensearch_locking.py @@ -23,23 +23,30 @@ That assures any unit can access locking information at any time, even during a storage-detaching event on a peer unit. -The last important point is that we must avoid having both lock types conceeding -locks at the same time. For that, the RollingOpsManager is overloaded here -and the new class will also take the status of OpenSearchOpsLock into account -before granting locks. +We must avoid having both lock types conceeding locks at the same time. +We also must avoid providing locks when the cluster has nodes shutdown. + +For that, the charm leader needs to also check if remote units are up and +running and manage the lock accordingly. This monitoring should start once +the unit has finished its start-up process. """ import logging +from enum import Enum -from charms.opensearch.v0.constants_charm import LockRetryLater, PeerRelationName +from charms.opensearch.v0.constants_charm import ( + LockIsBlockedOnUnit, + LockRetryLater, + PeerRelationName, +) from charms.opensearch.v0.opensearch_exceptions import ( OpenSearchError, OpenSearchHttpError, OpenSearchOpsLockAlreadyAcquiredError, ) from charms.opensearch.v0.opensearch_internal_data import Scope -from charms.rolling_ops.v0.rollingops import Lock, RollingOpsManager -from ops.model import WaitingStatus +from charms.rolling_ops.v0.rollingops import Lock, Locks, LockState, RollingOpsManager +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, WaitingStatus from tenacity import retry, stop_after_attempt, wait_fixed # The unique Charmhub library identifier, never change it @@ -55,11 +62,175 @@ logger = logging.getLogger(__name__) +class OpenSearchLockState(Enum): + """Reports the status of a given OpenSearch node for locking purposes. + + Besides the original: ACQUIRE, RELEASE, GRANTED, IDLE, also count the cases: + - DEPARTING: node has requested a lock via ops_lock during storage-detaching + - ACQUIRE_WITH_SVC_STOPPED: requested the lock while its service is stopped + - STOPPED_BUT_NOT_REQUESTED: the service is stopped, but the lock was not requested + - RETRY_LOCK: the lock should be retried later + """ + + ACQUIRE = "acquire" + RELEASE = "release" + GRANTED = "granted" + IDLE = "idle" + + # New states + DEPARTING = "departing" + RETRY_LOCK = "retry-lock" + # TODO: the following states are not really necessary if shards are healthy. + # That means, it is not about the service being stopped, but rather + # if the current cluster can handle yet another node going away, and which. + # We should improve this method to check for the shard health. + # Last Note: the cluster health is also not a good indicator. We may have + # to restart nodes EVEN if the health is not green. + # For the moment: take the easy way out - not providing keys if we have stopped + # nodes in the cluster AND these nodes are not requesting the lock. + ACQUIRE_WITH_SVC_STOPPED = "acquire-with-svc-stopped" + STOPPED_BUT_NOT_REQUESTED = "stopped-but-not-requested" + + class OpenSearchRetryLockLaterException(OpenSearchError): """Exception thrown when the lock should be retried later.""" -class RollingOpsManagerWithExclusions(RollingOpsManager): +class OpenSearchLock(Lock): + """Class for controlling the locks in OpenSearch Charm.""" + + def __init__(self, manager, unit): + super().__init__(manager, unit=unit) + self._opensearch = manager.charm.opensearch + self._ops_lock = manager.charm.ops_lock.is_held() + self.charm = manager.charm + # Run it once, so we can store the status of the unit. + self.unit_is_up = self._opensearch.is_remote_node_up(self.unit) + if self.charm.unit == self.unit and self.unit_is_up: + self.started() + + @property + def _state(self) -> OpenSearchLockState: + """Method for getting the lock state.""" + if self._ops_lock: + # Simple case, we will not process locks until the unit is removed. + return OpenSearchLockState.DEPARTING + + app_state = OpenSearchLockState(super()._state.value) + if app_state in [OpenSearchLockState.RELEASE, OpenSearchLockState.GRANTED]: + return app_state + + # Now, we need to figure out if the extra status of the lock. + # Currently, we know the lock is either ACQUIRE or IDLE. + if self.retrial_count > 0: + # The unit is requesting another retry. + return OpenSearchLockState.RETRY_LOCK + + # Is the unit down? + if not self.unit_is_up: + if app_state == OpenSearchLockState.ACQUIRE: + # The unit is requesting the lock. + return OpenSearchLockState.ACQUIRE_WITH_SVC_STOPPED + return OpenSearchLockState.STOPPED_BUT_NOT_REQUESTED + + # Return can either be originals ACQUIRE or IDLE + if app_state == OpenSearchLockState.ACQUIRE: + # The unit is requesting the lock. + return OpenSearchLockState.ACQUIRE + return OpenSearchLockState.IDLE + + @_state.setter + def _state(self, s: LockState): + """Method for setting the lock state. + + Although the lock may have more states, these are calculated at _state call. + The states to be stored remains the same as the parent class. + """ + state = OpenSearchLockState(s.value) + if state in [ + OpenSearchLockState.ACQUIRE, + OpenSearchLockState.DEPARTING, + OpenSearchLockState.RETRY_LOCK, + OpenSearchLockState.ACQUIRE_WITH_SVC_STOPPED, + ]: + self.relation.data[self.unit].update({"state": LockState.ACQUIRE.value}) + elif state == OpenSearchLockState.RELEASE: + self.relation.data[self.unit].update({"state": LockState.RELEASE.value}) + elif state == OpenSearchLockState.GRANTED: + self.relation.data[self.app].update({str(self.unit): LockState.GRANTED.value}) + elif state in [ + OpenSearchLockState.IDLE, + OpenSearchLockState.STOPPED_BUT_NOT_REQUESTED, + ]: + self.relation.data[self.app].update({str(self.unit): LockState.IDLE.value}) + + def is_blocked(self) -> bool: + """Method for checking if the lock is blocked.""" + return self.has_started() and ( + self._state == OpenSearchLockState.DEPARTING + or self._state == OpenSearchLockState.STOPPED_BUT_NOT_REQUESTED + ) + + def is_held(self): + """This unit holds the lock.""" + return self._state == OpenSearchLockState.GRANTED + + def release_requested(self): + """A unit has reported that they are finished with the lock.""" + return self._state == OpenSearchLockState.RELEASE + + def is_pending(self): + """Is this unit waiting for a lock?""" + return self._state in [ + OpenSearchLockState.ACQUIRE, + OpenSearchLockState.DEPARTING, + OpenSearchLockState.RETRY_LOCK, + OpenSearchLockState.ACQUIRE_WITH_SVC_STOPPED, + ] + + def has_started(self) -> bool: + """Method for checking if the unit has started.""" + return self.relation.data[self.unit].get("has_started") == "True" + + def started(self): + """Sets the started flag. + + Should be called once the unit has finished its start process. + """ + self.relation.data[self.unit]["has_started"] = "True" + + def retry(self) -> bool: + """Method for checking if the lock should be retried.""" + return self._state == OpenSearchLockState.RETRY_LOCK + + @property + def retrial_count(self) -> int: + """Method for getting the retrial count.""" + return int(self.relation.data[self.unit].get(OpenSearchLockState.RETRY_LOCK.value, 0)) + + @retrial_count.setter + def retrial_count(self, count: int): + """Method for getting the retrial count.""" + self.relation.data[self.unit][OpenSearchLockState.RETRY_LOCK.value] = str(count) + + def acquire_with_stopped_service(self) -> bool: + """Method for checking if the lock is acquired with the service stopped.""" + return self._state == OpenSearchLockState.ACQUIRE_WITH_SVC_STOPPED + + +class OpenSearchLocks(Locks): + """Generator that returns a list of locks.""" + + def __init__(self, manager): + super().__init__(manager) + + def __iter__(self): + """Yields a lock for each unit we can find on the relation.""" + for unit in self.units: + yield OpenSearchLock(self.manager, unit=unit) + + +class OpenSearchRollingOpsManager(RollingOpsManager): """Class for controlling the locks in OpenSearch Charm. It differs from the main RollingOpsManager in two ways: @@ -70,10 +241,8 @@ class RollingOpsManagerWithExclusions(RollingOpsManager): whenever a given unit depends on the charm leader, for example, to progress. """ - RETRY_LOCK = "retry-lock-counter" - def __init__(self, charm, relation, callback): - """Constructor for RollingOpsManagerWithExclusions.""" + """Constructor for the manager.""" super().__init__(charm, relation, callback) self.ops_lock = charm.ops_lock @@ -93,15 +262,13 @@ def __init__(self, charm, relation, callback): callback = self.relation.data[self.charm.unit].get("callback_override", "") charm.on[self.name].acquire_lock.emit( callback_override=self.relation.data[self.charm.unit].update( - { - "callback_override": callback - } + {"callback_override": callback} ) ) def _on_acquire_lock(self, event): """Method for acquiring the lock. Restart the retry-lock counter.""" - self.relation.data[self.charm.model.unit][self.RETRY_LOCK] = "0" # reset counter + OpenSearchLock(self, self.charm.unit).retrial_count = 0 return super()._on_acquire_lock(event) def _should_lock_be_reacquired(self): @@ -110,55 +277,125 @@ def _should_lock_be_reacquired(self): For that, the unit has registered the restart-repeatable flag in the service relation data and the lock is not held or pending anymore. """ + lock = OpenSearchLock(self, self.charm.unit) return ( # TODO: consider cases with a limitation in the amount of retries - int(self.relation.data[self.charm.model.unit].get(self.RETRY_LOCK, 0)) > 0 - and not (Lock(self).is_held() or Lock(self).is_pending()) + lock.retry() + and not (lock.is_held() or lock.is_pending()) ) def _on_run_with_lock(self, event): """Method for running with lock.""" + lock = OpenSearchLock(self, self.charm.unit) try: super()._on_run_with_lock(event) - if self.model.unit.status.message == LockRetryLater.format(self.name): - self.charm.status.clear(LockRetryLater.format(self.name)) + self.charm.status.clear(LockRetryLater.format(self.name)) return except OpenSearchRetryLockLaterException: logger.info("Retrying to acquire the lock later.") - self.relation.data[self.charm.model.unit][self.RETRY_LOCK] = str( - int(self.relation.data[self.charm.model.unit].get(self.RETRY_LOCK, 0)) + 1 - ) - except: + lock.retrial_count = lock.retrial_count + 1 + except Exception: raise # A retriable error happened, raised by the callback method + # It means the logic after callback execution was not ran. # Release the lock now, so we can reissue it later - lock = Lock(self) lock.release() # Updates relation data + # cleanup old callback overrides: # we do not clean up the callback override, so we can reissue it later # self.relation.data[self.charm.unit].update({"callback_override": ""}) if self.model.unit.status.message == f"Executing {self.name} operation": - self.charm.status.set(WaitingStatus(LockRetryLater.format(self.name))) + self.model.unit.status = ActiveStatus() + self.charm.status.set(WaitingStatus(LockRetryLater.format(self.name))) - def _on_process_locks(self, event): + def _on_process_locks(self, _): # noqa: C901 """Method for processing the locks. - We should only grant a lock here if the ops_lock is free and then, - check with the parent RollingOpsManager. + There are certain special rules to be considered when providing the lock: + 1) The node is trying to acquire it + 2) There is no node departing in the cluster + 3) There is no node with the service stopped in the cluster + + We build the lock following the original _on_process_locks scheme. Then, + we should check the ops_lock and ensure it is not held. If it is, we abandon + the event and wait for the next peer relation departed to reprocess. + + We check that each node is reachable and healthy. + If not and node is requesting lock, then the it is set to: ACQUIRE_WITH_SVC_STOPPED - We need to consider the fact that storage-detaching may be happening. - In this case, we should not grant the lock until ops_lock is released. + Finally, if we have any of the following: + 1) Nodes helding this lock + 2) At least one node departing via ops_lock + 3) Nodes with stopped service that did not try to acquire the lock + We abandon the process and do not restart the cluster any further. """ - if not self.charm.model.unit.is_leader(): + if not self.charm.unit.is_leader(): return - if self.ops_lock.is_held(): - logger.info("Another unit is being removed, skipping the rolling ops.") + pending = [] + + # First pass: + # Find if we can process locks or should we wait for the next event. + # Build a list of units that are pending. + for lock in OpenSearchLocks(self): + if lock.is_held(): + # One of our units has the lock -- return without further processing. + return + + if lock.release_requested(): + lock.clear() # Updates relation data + + if lock.is_blocked(): + self.model.app.status = BlockedStatus( + LockIsBlockedOnUnit.format(self.name, lock.unit.name) + ) + return + + if lock.is_pending(): + if lock.unit == self.model.unit: + # Always run on the leader last. + pending.insert(0, lock) + else: + pending.append(lock) + + self.charm.status.clear( + LockIsBlockedOnUnit[:-4].format(self.name), + pattern=self.charm.status.CheckPattern.Start, + ) + + # Find the next lock we want to process. We check for lock priority + # 1) Do we have any locks with: ACQUIRE_WITH_SVC_STOPPED + # 2) Do we have any locks with: RETRY_LOCK + # 3) Do we have any locks with: ACQUIRE (all the remaining) + next_lock_to_process = None + for lock in pending: + # 1) Do we have any locks with: ACQUIRE_WITH_SVC_STOPPED + if lock.acquire_with_stopped_service(): + next_lock_to_process = lock + break + + # 2) Do we have any locks with: RETRY_LOCK + if lock.retry(): + next_lock_to_process = lock + break + + if not next_lock_to_process and pending: + # 3) Do we have any locks with: ACQUIRE (all the remaining) + next_lock_to_process = pending[-1] + + # If we reach this point, and we have pending units, we want to grant a lock to + # one of them. + if next_lock_to_process: + self.model.app.status = MaintenanceStatus("Beginning rolling {}".format(self.name)) + next_lock_to_process.grant() + if next_lock_to_process.unit == self.model.unit: + # It's time for the leader to run with lock. + self.charm.on[self.name].run_with_lock.emit() return - # Call the parent method. - super()._on_process_locks(event) + if self.model.app.status.message == f"Beginning rolling {self.name}": + self.model.app.status = ActiveStatus() class OpenSearchOpsLock: diff --git a/lib/charms/opensearch/v0/opensearch_plugin_manager.py b/lib/charms/opensearch/v0/opensearch_plugin_manager.py index 1043796f4..8f3317b8c 100644 --- a/lib/charms/opensearch/v0/opensearch_plugin_manager.py +++ b/lib/charms/opensearch/v0/opensearch_plugin_manager.py @@ -268,26 +268,35 @@ def apply_config(self, config: OpenSearchPluginConfig) -> bool: """ self._keystore.delete(config.secret_entries_to_del) self._keystore.add(config.secret_entries_to_add) + if config.secret_entries_to_del or config.secret_entries_to_add: + self._keystore.reload_keystore() + # Add and remove configuration if applies - settings = ClusterTopology.get_cluster_settings( + current_settings = ClusterTopology.get_cluster_settings( self._charm.opensearch, include_defaults=True, ) - should_restart = False - # Do we have the option set? - if any([key in config.config_entries_to_del for key in settings.keys()]): - self._opensearch_config.delete_plugin(config.config_entries_to_del) - should_restart = True + to_remove = config.config_entries_to_del + if isinstance(config.config_entries_to_del, list): + # No keys should have value "None", therefore, setting them to None means + # the original current_settings will be changed. + to_remove = dict( + zip(config.config_entries_to_del, [None] * len(config.config_entries_to_del)) + ) + if current_settings == { + **current_settings, + **to_remove, + **config.config_entries_to_add, + }: + # Nothing to do here + return False + # Update the configuration + if config.config_entries_to_del: + self._opensearch_config.delete_plugin(config.config_entries_to_del) if config.config_entries_to_add: self._opensearch_config.add_plugin(config.config_entries_to_add) - should_restart = True - - if config.secret_entries_to_del or config.secret_entries_to_add: - self._keystore.reload_keystore() - - # Return True if some configuration entries changed - return should_restart + return True def status(self, plugin: OpenSearchPlugin) -> PluginState: """Returns the status for a given plugin.""" diff --git a/tests/integration/ha/test_large_deployments.py b/tests/integration/ha/test_large_deployments.py index 0eec7a8dc..7326fb29e 100644 --- a/tests/integration/ha/test_large_deployments.py +++ b/tests/integration/ha/test_large_deployments.py @@ -120,6 +120,8 @@ async def test_set_roles_manually( assert node.temperature is None, "Node temperature was erroneously set." # change cluster name and roles + temperature, should trigger a rolling restart + + logger.info("Changing cluster name and roles + temperature.") await ops_test.model.applications[app].set_config( {"cluster_name": "new_cluster_name", "roles": "cluster_manager, data.cold"} ) @@ -131,6 +133,8 @@ async def test_set_roles_manually( wait_for_exact_units=len(nodes), idle_period=IDLE_PERIOD, ) + + logger.info("Checking if the cluster name and roles + temperature were changed.") assert await check_cluster_formation_successful( ops_test, leader_unit_ip, get_application_unit_names(ops_test, app=app) ) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 401eb13d4..4ff4dc970 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -3,6 +3,7 @@ # See LICENSE file for licensing details. import logging +import subprocess import pytest from pytest_operator.plugin import OpsTest @@ -92,6 +93,34 @@ async def test_actions_get_admin_password(ops_test: OpsTest) -> None: assert result.status == "failed" +@pytest.mark.abort_on_fail +async def test_check_number_of_restarts(ops_test: OpsTest) -> None: + """Test check number of restarts. + + The number of restarts should be 1 for each unit. + """ + for unit in range(DEFAULT_NUM_UNITS): + result = int( + subprocess.check_output( + [ + "juju", + "ssh", + f"{APP_NAME}/{unit}", + "--", + "sudo", + "systemctl", + "show", + "snap.opensearch.daemon.service", + "--property=Restarts", + ] + ) + .decode() + .split("NRestarts=")[1] + .split("\n")[0] + ) + assert result == 1 + + @pytest.mark.abort_on_fail async def test_actions_rotate_admin_password(ops_test: OpsTest) -> None: """Test the rotation and change of admin password."""