From 3304eb02d8e0456508f411419a90b3d6b8615cb8 Mon Sep 17 00:00:00 2001 From: Mehdi Bendriss Date: Tue, 10 Jan 2023 23:04:35 +0100 Subject: [PATCH] [Refactoring] initial round: v0 (#23) (#24) --- .coveragerc | 23 + lib/charms/opensearch/v0/helper_charm.py | 57 ++ lib/charms/opensearch/v0/helper_cluster.py | 2 +- .../opensearch/v0/helper_conf_setter.py | 127 +++- lib/charms/opensearch/v0/helper_databag.py | 169 +++++- .../opensearch/v0/opensearch_base_charm.py | 560 +++++++++++++++--- lib/charms/opensearch/v0/opensearch_config.py | 4 +- pyproject.toml | 2 +- requirements.txt | 7 +- src/charm.py | 495 +--------------- src/opensearch.py | 9 + tests/unit/test_charm.py | 228 +++---- tests/unit/test_helper_charm.py | 49 ++ tests/unit/test_helper_databag.py | 61 ++ tests/unit/test_helper_networking.py | 3 +- tests/unit/test_opensearch_base_charm.py | 160 ++++- 16 files changed, 1198 insertions(+), 758 deletions(-) create mode 100644 .coveragerc create mode 100644 lib/charms/opensearch/v0/helper_charm.py create mode 100644 tests/unit/test_helper_charm.py diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 000000000..701ef829e --- /dev/null +++ b/.coveragerc @@ -0,0 +1,23 @@ +[report] +exclude_lines = + # Skip any pass lines such as may be used for @abstractmethod + pass + + # Ignore abstract methods + @abstractmethod + @abc.abstractmethod + + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + # Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: diff --git a/lib/charms/opensearch/v0/helper_charm.py b/lib/charms/opensearch/v0/helper_charm.py new file mode 100644 index 000000000..f095f831c --- /dev/null +++ b/lib/charms/opensearch/v0/helper_charm.py @@ -0,0 +1,57 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Utility functions for charms related operations.""" +import re + +from charms.opensearch.v0.helper_enums import BaseStrEnum +from ops.model import ActiveStatus + +# The unique Charmhub library identifier, never change it +LIBID = "293db55a2d8949f8aa5906d04cd541ba" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + + +class Status: + """Class for managing the various status changes in a charm.""" + + class CheckPattern(BaseStrEnum): + """Enum for types of status comparison.""" + + Equal = "equal" + Start = "start" + End = "end" + Contain = "contain" + Interpolated = "interpolated" + + def __init__(self, charm): + self.charm = charm + + def clear(self, status_message: str, pattern: CheckPattern = CheckPattern.Equal): + """Resets the unit status if it was previously blocked/maintenance with message.""" + unit = self.charm.unit + + condition: bool + match pattern: + case Status.CheckPattern.Equal: + condition = unit.status.message == status_message + case Status.CheckPattern.Start: + condition = unit.status.message.startswith(status_message) + case Status.CheckPattern.End: + condition = unit.status.message.endswith(status_message) + case Status.CheckPattern.Interpolated: + condition = ( + re.fullmatch(status_message.replace("{}", "(?s:.*?)"), status_message) + is not None + ) + case _: + condition = status_message in unit.status.message + + if condition: + unit.status = ActiveStatus() diff --git a/lib/charms/opensearch/v0/helper_cluster.py b/lib/charms/opensearch/v0/helper_cluster.py index 4adf36a2c..cd91c7ca1 100644 --- a/lib/charms/opensearch/v0/helper_cluster.py +++ b/lib/charms/opensearch/v0/helper_cluster.py @@ -1,7 +1,7 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. -"""Utility class for getting cluster info, configuration info and suggestions.""" +"""Utility classes and methods for getting cluster info, configuration info and suggestions.""" from typing import Dict, List diff --git a/lib/charms/opensearch/v0/helper_conf_setter.py b/lib/charms/opensearch/v0/helper_conf_setter.py index 4de0b8832..0d76cc160 100755 --- a/lib/charms/opensearch/v0/helper_conf_setter.py +++ b/lib/charms/opensearch/v0/helper_conf_setter.py @@ -6,12 +6,14 @@ import re import sys import uuid +from abc import ABC, abstractmethod from collections.abc import Mapping from enum import Enum from io import StringIO from os.path import exists from typing import Dict, List +from overrides import override from ruamel.yaml import YAML, CommentedSeq from ruamel.yaml.comments import CommentedSet @@ -42,10 +44,11 @@ def __str__(self): return self.value -class YamlConfigSetter: - """Utility class for updating YAML config, supporting diverse object types and nestedness. +class ConfigSetter(ABC): + """Base class for manipulating YAML Config, of multiple types and any depth level. + + conf_setter = YamlConfigSetter() or another config setter - conf_setter = YamlConfigSetter() put("file.yml", "a.b", "new_name") put("file.yml", "a.b/c.obj/key3/key1.a/obj", {"a": "new_name_1", "b": ["hello", "world"]}) put("file.yml", "a.b/c.arr.simple/[0]", "hello") @@ -57,9 +60,111 @@ class YamlConfigSetter: def __init__(self, base_path: str = None): """base_path: if set, where to look for files relatively on "load/put/delete" methods.""" - self.yaml = YAML() self.base_path = self.__clean_base_path(base_path) + @abstractmethod + def load(self, config_file: str) -> Dict[str, any]: + """Load the content of a YAML file.""" + pass + + @abstractmethod + def put( + self, + config_file: str, + key_path: str, + val: any, + sep="/", + output_type: OutputType = OutputType.file, + inline_array: bool = False, + output_file: str = None, + ) -> Dict[str, any]: + """Add or update the value of a key (or content of array at index / key) if it exists. + + Args: + config_file (str): Path to the source config file + key_path (str): The path of the YAML key to target + val (any): The value to store for the passed key + sep (str): The separator / delimiter character to use in the key_path + output_type (OutputType): The type of output we're expecting from this operation, + i.e, set OutputType.all to have the output on both the console and target file + inline_array (bool): whether the operation should format arrays in: + - multiline fashion (false) + - between brackets (true) + output_file: Target file for the result config, by default same as config_file + + Returns: + Dict[str, any]: The final version of the YAML config. + """ + pass + + @abstractmethod + def delete( + self, + config_file: str, + key_path: str, + sep="/", + output_type: OutputType = OutputType.file, + output_file: str = None, + ) -> Dict[str, any]: + """Delete the value of a key (or content of array at index / key) if it exists. + + Args: + config_file (str): Path to the source config file + key_path (str): The path of the YAML key to target + sep (str): The separator / delimiter character to use in the key_path + output_type (OutputType): The type of output we're expecting from this operation, + i.e, set OutputType.all to have the output on both the console and target file + output_file: Target file for the result config, by default same as config_file + + Returns: + Dict[str, any]: The final version of the YAML config. + """ + pass + + @abstractmethod + def replace( + self, + config_file: str, + old_val: str, + new_val: any, + regex: bool = False, + output_type: OutputType = OutputType.file, + output_file: str = None, + ) -> None: + """Replace any substring in a text file. + + Args: + config_file (str): Path to the source config file + old_val (str): The value we wish to replace + new_val (any): The new value to replace old_val + regex (bool): Whether to treat old_val as a regex. + output_type (OutputType): The type of output we're expecting from this operation, + i.e, set OutputType.all to have the output on both the console and target file + output_file: Target file for the result config, by default same as config_file + """ + pass + + @staticmethod + def __clean_base_path(base_path: str): + if base_path is None: + return "" + + base_path = base_path.strip() + if not base_path.endswith("/"): + base_path = f"{base_path}/" + + return base_path + + +class YamlConfigSetter(ConfigSetter): + """Class for updating YAML config on the file system.""" + + def __init__(self, base_path: str = None): + """base_path: if set, where to look for files relatively on "load/put/delete" methods.""" + super().__init__(base_path) + self.yaml = YAML() + + @override def load(self, config_file: str) -> Dict[str, any]: """Load the content of a YAML file.""" path = f"{self.base_path}{config_file}" @@ -78,6 +183,7 @@ def load(self, config_file: str) -> Dict[str, any]: return data + @override def put( self, config_file: str, @@ -104,6 +210,7 @@ def put( return data + @override def delete( self, config_file: str, @@ -125,6 +232,7 @@ def delete( return data + @override def replace( self, config_file: str, @@ -315,14 +423,3 @@ def __flow_style() -> CommentedSeq: ret = CommentedSeq() ret.fa.set_flow_style() return ret - - @staticmethod - def __clean_base_path(base_path: str): - if base_path is None: - return "" - - base_path = base_path.strip() - if not base_path.endswith("/"): - base_path = f"{base_path}/" - - return base_path diff --git a/lib/charms/opensearch/v0/helper_databag.py b/lib/charms/opensearch/v0/helper_databag.py index c1e102c06..f7fb457be 100644 --- a/lib/charms/opensearch/v0/helper_databag.py +++ b/lib/charms/opensearch/v0/helper_databag.py @@ -4,9 +4,13 @@ """Utility classes for app / unit data bag related operations.""" import json -from typing import Dict, Optional +from abc import ABC, abstractmethod +from ast import literal_eval +from typing import Dict, Optional, Union +from charms.opensearch.v0.constants_tls import CertType from charms.opensearch.v0.helper_enums import BaseStrEnum +from overrides import override # The unique Charmhub library identifier, never change it LIBID = "e28df77e11504aef9a537b351fd4cf37" @@ -26,28 +30,91 @@ class Scope(BaseStrEnum): UNIT = "unit" -class Store: - """Class representing a relation data store for a charm. - - Requires the following 2 properties on the charm: - - app_peers_data - - unit_peers_data - """ +class DataStore(ABC): + """Class representing a data store used in the OPs code of the charm.""" def __init__(self, charm): self._charm = charm - def put(self, scope: Scope, key: str, value: Optional[str]) -> None: - """Put object into the relation data store.""" + @abstractmethod + def put(self, scope: Scope, key: str, value: Optional[any]) -> None: + """Put string into the data store.""" + pass + + @abstractmethod + def put_object( + self, scope: Scope, key: str, value: Dict[str, any], merge: bool = False + ) -> None: + """Put object into the data store.""" + pass + + @abstractmethod + def has(self, scope: Scope, key: str): + """Check if the said key is contained in the store.""" + pass + + @abstractmethod + def all(self, scope: Scope) -> Dict[str, str]: + """Get all content of a store.""" + pass + + @abstractmethod + def get( + self, scope: Scope, key: str, default: Optional[Union[int, float, str, bool]] = None + ) -> Optional[Union[int, float, str, bool]]: + """Get string from the data store.""" + pass + + @abstractmethod + def get_object(self, scope: Scope, key: str) -> Optional[Dict[str, any]]: + """Get dict / json object from the data store.""" + pass + + @abstractmethod + def delete(self, scope: Scope, key: str): + """Delete object from the data store.""" + pass + + @staticmethod + def cast(str_val: str) -> Union[bool, int, float, str]: + """Cast a string to the corresponding primitive type.""" + try: + typed_val = literal_eval(str_val.capitalize()) + if type(typed_val) not in {bool, int, float, str}: + return str_val + + return typed_val + except (ValueError, SyntaxError): + return str_val + + @staticmethod + def put_or_delete(data: Dict[str, str], key: str, value: Optional[str]): + """Put data into the key/val data store or delete if value is None.""" + if value is None: + del data[key] + return + + data.update({key: str(value)}) + + +class RelationDataStore(DataStore): + """Class representing a relation data store for a charm.""" + + def __init__(self, charm, relation_name: str): + super(RelationDataStore, self).__init__(charm) + self.relation_name = relation_name + + @override + def put(self, scope: Scope, key: str, value: Optional[Union[any]]) -> None: + """Put string into the relation data store.""" if scope is None: raise ValueError("Scope undefined.") - data = self._charm.unit_peers_data - if scope == Scope.APP: - data = self._charm.app_peers_data + data = self._get_relation_data(scope) self.put_or_delete(data, key, value) + @override def put_object( self, scope: Scope, key: str, value: Dict[str, any], merge: bool = False ) -> None: @@ -65,17 +132,46 @@ def put_object( self.put(scope, key, payload_str) - def get(self, scope: Scope, key: str) -> Optional[str]: + @override + def has(self, scope: Scope, key: str): + """Check if the said key is contained in the relation data.""" + if scope is None: + raise ValueError("Scope undefined.") + + return key in self._get_relation_data(scope) + + @override + def all(self, scope: Scope) -> Dict[str, str]: + """Get all content of a store.""" + if scope is None: + raise ValueError("Scope undefined.") + + return self._get_relation_data(scope) + + @override + def get( + self, + scope: Scope, + key: str, + default: Optional[Union[int, float, str, bool]] = None, + auto_casting: bool = True, + ) -> Optional[Union[int, float, str, bool]]: """Get string from the relation data store.""" if scope is None: raise ValueError("Scope undefined.") - data = self._charm.unit_peers_data - if scope == Scope.APP: - data = self._charm.app_peers_data + data = self._get_relation_data(scope) + + value = data.get(key) + if value is None: + return default + + if not auto_casting: + return value - return data.get(key, None) + return self.cast(value) + @override def get_object(self, scope: Scope, key: str) -> Optional[Dict[str, any]]: """Get dict / json object from the relation data store.""" data = self.get(scope, key) @@ -84,24 +180,43 @@ def get_object(self, scope: Scope, key: str) -> Optional[Dict[str, any]]: return json.loads(data) + @override def delete(self, scope: Scope, key: str): """Delete object from the relation data store.""" self.put(scope, key, None) - @staticmethod - def put_or_delete(peers_data: Dict[str, str], key: str, value: Optional[str]): - """Put data into the relation data store or delete if value is None.""" - if value is None: - del peers_data[key] - return + def _get_relation_data(self, scope: Scope) -> Dict[str, str]: + """Relation data object.""" + relation = self._charm.model.get_relation(self.relation_name) + if relation is None: + return {} - peers_data.update({key: value}) + relation_scope = self._charm.app if scope == Scope.APP else self._charm.unit + return relation.data[relation_scope] -class SecretStore(Store): + +class SecretsDataStore(RelationDataStore): """Class representing a secret store for a charm. For now, it is simply a base class for regular Relation data store """ - pass + def get_unit_certificates(self) -> Dict[CertType, str]: + """Retrieve the list of certificates for this unit.""" + certs = {} + + transport_secrets = self.get_object(Scope.UNIT, CertType.UNIT_TRANSPORT.val) + if transport_secrets and "cert" in transport_secrets: + certs[CertType.UNIT_TRANSPORT] = transport_secrets["cert"] + + http_secrets = self.get_object(Scope.UNIT, CertType.UNIT_HTTP.val) + if http_secrets and "cert" in http_secrets: + certs[CertType.UNIT_HTTP] = http_secrets["cert"] + + if self._charm.unit.is_leader(): + admin_secrets = self.get_object(Scope.APP, CertType.APP_ADMIN.val) + if admin_secrets and "cert" in admin_secrets: + certs[CertType.APP_ADMIN] = admin_secrets["cert"] + + return certs diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index c38fcd8d1..d71da6d07 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -4,25 +4,62 @@ """Base class for the OpenSearch Operators.""" import logging import random -import re -from typing import Dict, Set, Type +from abc import abstractmethod +from datetime import datetime +from typing import Dict, List, Optional, Set, Type from charms.opensearch.v0.constants_charm import ( + AdminUserInitProgress, AllocationExclusionFailed, + CertsExpirationError, HorizontalScaleUpSuggest, + RequestUnitServiceOps, + SecurityIndexInitProgress, + ServiceIsStopping, + ServiceStartError, + ServiceStopFailed, + ServiceStopped, + TLSNotFullyConfigured, + TLSRelationBrokenError, + WaitingForBusyShards, + WaitingToStart, ) from charms.opensearch.v0.constants_tls import TLS_RELATION, CertType -from charms.opensearch.v0.helper_databag import Scope, SecretStore, Store -from charms.opensearch.v0.helper_enums import BaseStrEnum +from charms.opensearch.v0.helper_charm import Status +from charms.opensearch.v0.helper_cluster import ClusterState, ClusterTopology, Node +from charms.opensearch.v0.helper_databag import ( + RelationDataStore, + Scope, + SecretsDataStore, +) from charms.opensearch.v0.helper_networking import get_host_ip, units_ips +from charms.opensearch.v0.helper_security import ( + cert_expiration_remaining_hours, + generate_hashed_password, +) from charms.opensearch.v0.opensearch_config import OpenSearchConfig -from charms.opensearch.v0.opensearch_distro import OpenSearchDistribution +from charms.opensearch.v0.opensearch_distro import ( + OpenSearchDistribution, + OpenSearchHttpError, + OpenSearchStartError, + OpenSearchStopError, +) from charms.opensearch.v0.opensearch_tls import OpenSearchTLS +from charms.rolling_ops.v0.rollingops import RollingOpsManager from charms.tls_certificates_interface.v1.tls_certificates import ( CertificateAvailableEvent, ) -from ops.charm import CharmBase -from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus +from ops.charm import ( + ActionEvent, + CharmBase, + LeaderElectedEvent, + RelationChangedEvent, + RelationJoinedEvent, + StartEvent, + UpdateStatusEvent, +) +from ops.framework import EventBase +from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus # The unique Charmhub library identifier, never change it LIBID = "cba015bae34642baa1b6bb27bb35a2f7" @@ -42,16 +79,6 @@ logger = logging.getLogger(__name__) -class StatusCheckPattern(BaseStrEnum): - """Enum for types of status comparison.""" - - Equal = "equal" - Start = "start" - End = "end" - Contain = "contain" - Interpolated = "interpolated" - - class OpenSearchBaseCharm(CharmBase): """Base class for OpenSearch charms.""" @@ -63,9 +90,432 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): self.opensearch = distro(self, PEER) self.opensearch_config = OpenSearchConfig(self.opensearch) - self.secrets = SecretStore(self) - self.relation_store = Store(self) + self.peers_data = RelationDataStore(self, PEER) + self.secrets = SecretsDataStore(self, PEER) self.tls = OpenSearchTLS(self, TLS_RELATION) + self.status = Status(self) + self.service_manager = RollingOpsManager( + self, relation=SERVICE_MANAGER, callback=self._start_opensearch + ) + + self.framework.observe(self.on.leader_elected, self._on_leader_elected) + self.framework.observe(self.on.start, self._on_start) + + self.framework.observe(self.on[PEER].relation_joined, self._on_peer_relation_joined) + self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) + + self.framework.observe(self.on.update_status, self._on_update_status) + + self.framework.observe(self.on.get_admin_secrets_action, self._on_get_admin_secrets_action) + + def _on_leader_elected(self, _: LeaderElectedEvent): + """Handle leader election event.""" + if self.peers_data.get(Scope.APP, "security_index_initialised"): + return + + if not self.peers_data.get(Scope.APP, "admin_user_initialized"): + self.unit.status = MaintenanceStatus(AdminUserInitProgress) + self._initialize_admin_user() + self.peers_data.put(Scope.APP, "admin_user_initialized", True) + self.status.clear(AdminUserInitProgress) + + def _on_start(self, event: StartEvent): + """Triggered when on start. Set the right node role.""" + if self.opensearch.is_started(): + if self.peers_data.get(Scope.APP, "security_index_initialised"): + # in the case where it was on WaitingToStart status, event got deferred + # and the service started in between, put status back to active + self.status.clear(WaitingToStart) + + # cleanup bootstrap conf in the node if existing + if self.peers_data.get(Scope.UNIT, "bootstrap_contributor"): + self._cleanup_bootstrap_conf_if_applies() + + return + + if not self._is_tls_fully_configured(): + self.unit.status = BlockedStatus(TLSNotFullyConfigured) + event.defer() + return + + # configure clients auth + self.opensearch_config.set_client_auth() + + # request the start of OpenSearch + self.unit.status = WaitingStatus(RequestUnitServiceOps.format("start")) + self.on[self.service_manager.name].acquire_lock.emit(callback_override="_start_opensearch") + + def _on_peer_relation_joined(self, event: RelationJoinedEvent): + """New node joining the cluster.""" + current_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) + + # In the case of the first units before TLS is initialized + if not current_secrets: + if not self.unit.is_leader(): + event.defer() + return + + # in the case the cluster was bootstrapped with multiple units at the same time + # and the certificates have not been generated yet + if not current_secrets.get("cert") or not current_secrets.get("chain"): + event.defer() + return + + # Store the "Admin" certificate, key and CA on the disk of the new unit + self._store_tls_resources(CertType.APP_ADMIN, current_secrets, override_admin=False) + + def _on_peer_relation_changed(self, event: RelationChangedEvent): + """Handle peer relation changes.""" + if self.unit.is_leader(): + data = event.relation.data.get(event.unit) + if data: + exclusions_to_remove = data.get("remove_from_allocation_exclusions") + if exclusions_to_remove: + self.append_allocation_exclusion_to_remove(exclusions_to_remove) + + if data.get("bootstrap_contributor"): + contributor_count = self.peers_data.get( + Scope.APP, "bootstrap_contributors_count", 0 + ) + self.peers_data.put( + Scope.APP, "bootstrap_contributors_count", contributor_count + 1 + ) + + # Restart node when cert renewal for the transport layer + if self.peers_data.get(Scope.UNIT, "must_reboot_node"): + try: + self.opensearch.restart() + self.peers_data.delete(Scope.UNIT, "must_reboot_node") + except OpenSearchStartError: + event.defer() + + def _on_update_status(self, event: UpdateStatusEvent): + """On update status event. + + We want to periodically check for 2 things: + 1- The system requirements are still met + 2- every 6 hours check if certs are expiring soon (in 7 days), + as a safeguard in case relation broken. As there will be data loss + without the user noticing in case the cert of the unit transport layer expires. + So we want to stop opensearch in that case, since it cannot be recovered from. + """ + # if there are missing system requirements defer + missing_sys_reqs = self.opensearch.missing_sys_requirements() + if len(missing_sys_reqs) > 0: + self.unit.status = BlockedStatus(" - ".join(missing_sys_reqs)) + return + + # If relation broken - leave + if self.model.get_relation("certificates") is not None: + return + + # if node already shutdown - leave + if not self.opensearch.is_node_up(): + return + + # See if the last check was made less than 6h ago, if yes - leave + date_format = "%Y-%m-%d %H:%M:%S" + last_cert_check = datetime.strptime( + self.peers_data.get(Scope.UNIT, "certs_exp_checked_at", "1970-01-01 00:00:00"), + date_format, + ) + if (datetime.now() - last_cert_check).seconds < 6 * 3600: + return + + certs = self.secrets.get_unit_certificates() + + # keep certificates that are expiring in less than 24h + for cert_type, cert in certs.items(): + hours = cert_expiration_remaining_hours(cert) + if hours > 24 * 7: + del certs[cert_type] + + if certs: + missing = [cert.val for cert in certs.keys()] + self.unit.status = BlockedStatus(CertsExpirationError.format(", ".join(missing))) + + # stop opensearch in case the Node-transport certificate expires. + if certs.get(CertType.UNIT_TRANSPORT) is not None: + self._stop_opensearch(event) + + self.peers_data.put( + Scope.UNIT, "certs_exp_checked_at", datetime.now().strftime(date_format) + ) + + def _on_get_admin_secrets_action(self, event: ActionEvent): + """Return the password and cert chain for the admin user of the cluster.""" + password = self.secrets.get(Scope.APP, "admin_password") + + chain = "" + admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) + if admin_secrets and admin_secrets.get("chain"): + chain = "\n".join(admin_secrets["chain"][::-1]) + + event.set_results({"password": password if password else "", "chain": chain}) + + def on_tls_conf_set( + self, _: CertificateAvailableEvent, scope: Scope, cert_type: CertType, renewal: bool + ): + """Called after certificate ready and stored on the corresponding scope databag. + + - Store the cert on the file system, on all nodes for APP certificates + - Update the corresponding yaml conf files + - Run the security admin script + """ + # Get the list of stored secrets for this cert + current_secrets = self.secrets.get_object(scope, cert_type.val) + + # In case of renewal of the unit transport layer cert - stop opensearch + should_restart = False + if renewal and cert_type == CertType.UNIT_TRANSPORT: + self.opensearch.stop() + should_restart = True + + # Store cert/key on disk - must happen after opensearch stop for transport certs renewal + self._store_tls_resources(cert_type, current_secrets) + + if scope == Scope.UNIT: + # node http or transport cert + self.opensearch_config.set_node_tls_conf(cert_type, current_secrets) + else: + # write the admin cert conf on all units, in case there is a leader loss + cert renewal + self.opensearch_config.set_admin_tls_conf(current_secrets) + + if should_restart: + self.peers_data.put(Scope.UNIT, "must_reboot_node", True) + + def on_tls_relation_broken(self): + """As long as all certificates are produced, we don't do anything.""" + if self._is_tls_fully_configured(): + return + + # Otherwise, we block. + self.unit.status = BlockedStatus(TLSRelationBrokenError) + self.opensearch.stop() + + def _is_tls_fully_configured(self) -> bool: + """Check if TLS fully configured meaning the admin user configured & 3 certs present.""" + # In case the initialisation of the admin user is not finished yet + if not self.peers_data.get(Scope.APP, "admin_user_initialized"): + return False + + admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) + if not admin_secrets or not admin_secrets.get("cert") or not admin_secrets.get("chain"): + return False + + unit_transport_secrets = self.secrets.get_object(Scope.UNIT, CertType.UNIT_TRANSPORT.val) + if not unit_transport_secrets or not unit_transport_secrets.get("cert"): + return False + + unit_http_secrets = self.secrets.get_object(Scope.UNIT, CertType.UNIT_HTTP.val) + if not unit_http_secrets or not unit_http_secrets.get("cert"): + return False + + return self._are_all_tls_resources_stored() + + def _start_opensearch(self, event: EventBase) -> None: + """Start OpenSearch if all resources configured.""" + if not self._can_service_start(): + event.defer() + return + + try: + # Retrieve the nodes of the cluster, needed to configure this node + nodes = self._get_nodes() + except OpenSearchHttpError: + event.defer() + return + + # Set the configuration of the node + self._set_node_conf(nodes) + + try: + self.unit.status = BlockedStatus(WaitingToStart) + self.opensearch.start() + self.status.clear(WaitingToStart) + except OpenSearchStartError as e: + logger.debug(e) + self.unit.status = BlockedStatus(ServiceStartError) + event.defer() + return + + # initialize the security index if the admin certs are written on disk + if self.unit.is_leader(): + if not self.peers_data.get(Scope.APP, "security_index_initialised"): + admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) + self._initialize_security_index(admin_secrets) + self.peers_data.put(Scope.APP, "security_index_initialised", True) + + self.peers_data.put(Scope.APP, "leader_ip", self.unit_ip) + + # store the exclusions that previously failed to be stored when no units online + if self.peers_data.get(Scope.APP, "remove_from_allocation_exclusions"): + self.opensearch.remove_allocation_exclusions( + self.peers_data.get(Scope.APP, "remove_from_allocation_exclusions") + ) + + # cleanup bootstrap conf in the node + if self.peers_data.get(Scope.UNIT, "bootstrap_contributor"): + self._cleanup_bootstrap_conf_if_applies() + + def _stop_opensearch(self, event: EventBase) -> None: + """Stop OpenSearch if possible.""" + try: + self.unit.status = WaitingStatus(ServiceIsStopping) + self.opensearch.stop() + self.unit.status = WaitingStatus(ServiceStopped) + except OpenSearchStopError as e: + logger.debug(e) + self.unit.status = BlockedStatus(ServiceStopFailed) + event.defer() + + def _restart_opensearch(self, event: EventBase) -> None: + """Restart OpenSearch if possible.""" + self._stop_opensearch(event) + self._start_opensearch(event) + + def _can_service_start(self): + """Return if the opensearch service can start.""" + # if there are any missing system requirements leave + missing_sys_reqs = self.opensearch.missing_sys_requirements() + if len(missing_sys_reqs) > 0: + self.unit.status = BlockedStatus(" - ".join(missing_sys_reqs)) + return False + + # When a new unit joins, replica shards are automatically added to it. In order to prevent + # overloading the cluster, units must be started one at a time. So we defer starting + # opensearch until all shards in other units are in a "started" or "unassigned" state. + if not self.unit.is_leader() and self.peers_data.get(Scope.APP, "leader_ip"): + try: + busy_shards = ClusterState.busy_shards_by_unit( + self.opensearch, self.peers_data.get(Scope.APP, "leader_ip") + ) + if busy_shards: + message = WaitingForBusyShards.format( + " - ".join([f"{key}/{','.join(val)}" for key, val in busy_shards.items()]) + ) + self.unit.status = WaitingStatus(message) + return False + + self.status.clear(WaitingForBusyShards, pattern=Status.CheckPattern.Interpolated) + except OpenSearchHttpError: + # this means that the leader unit is not reachable (not started yet), + # meaning that it's a new cluster, so we can safely start the OpenSearch service + pass + + return True + + def _initialize_admin_user(self): + """Change default password of Admin user.""" + hashed_pwd, pwd = generate_hashed_password() + self.secrets.put(Scope.APP, "admin_password", pwd) + + self.opensearch.config.put( + "opensearch-security/internal_users.yml", + "admin", + { + "hash": hashed_pwd, + "reserved": True, # this protects this resource from being updated on the dashboard or rest api + "backend_roles": ["admin"], + "description": "Admin user", + }, + ) + + def _initialize_security_index(self, admin_secrets: Dict[str, any]): + """Run the security_admin script, it creates and initializes the opendistro_security index. + + IMPORTANT: must only run once per cluster, otherwise the index gets overrode + """ + args = [ + f"-cd {self.opensearch.paths.conf}/opensearch-security/", + f"-cn {self.app.name}-{self.model.name}", + f"-h {self.unit_ip}", + f"-cacert {self.opensearch.paths.certs}/root-ca.cert", + f"-cert {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.cert", + f"-key {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.key", + ] + + admin_key_pwd = admin_secrets.get("key-password", None) + if admin_key_pwd is not None: + args.append(f"-keypass {admin_key_pwd}") + + self.unit.status = MaintenanceStatus(SecurityIndexInitProgress) + self.opensearch.run_script( + "plugins/opensearch-security/tools/securityadmin.sh", " ".join(args) + ) + self.status.clear(SecurityIndexInitProgress) + + def _get_nodes(self) -> List[Node]: + """Fetch the list of nodes of the cluster, depending on the requester.""" + + def fetch() -> List[Node]: + """Fetches the list of nodes through HTTP.""" + host: Optional[str] = None + alt_hosts: Optional[List[str]] = None + + all_units_ips = units_ips(self, PEER).values() + if all_units_ips: + all_hosts = list(all_units_ips) + host = all_hosts.pop(0) # get first value + alt_hosts = all_hosts + + nodes: List[Node] = [] + if host is not None: + response = self.opensearch.request( + "GET", "/_nodes", host=host, alt_hosts=alt_hosts + ) + if "nodes" in response: + for obj in response["nodes"].values(): + nodes.append(Node(obj["name"], obj["roles"], obj["ip"])) + + return nodes + + try: + return fetch() + except OpenSearchHttpError: + if self.unit.is_leader() and not self.peers_data.get( + Scope.APP, "security_index_initialised" + ): + return [] + raise + + def _set_node_conf(self, nodes: List[Node]) -> None: + """Set the configuration of the current node / unit.""" + roles = ClusterTopology.suggest_roles(nodes, self.app.planned_units()) + + cm_names = ClusterTopology.get_cluster_managers_names(nodes) + cm_ips = ClusterTopology.get_cluster_managers_ips(nodes) + + contribute_to_bootstrap = False + if "cluster_manager" in roles: + cm_names.append(self.unit_name) + cm_ips.append(self.unit_ip) + + cms_in_bootstrap = self.peers_data.get(Scope.APP, "bootstrap_contributors_count", 0) + if cms_in_bootstrap < self.app.planned_units(): + contribute_to_bootstrap = True + + if self.unit.is_leader(): + self.peers_data.put( + Scope.APP, "bootstrap_contributors_count", cms_in_bootstrap + 1 + ) + + # indicates that this unit is part of the "initial cm nodes" + self.peers_data.put(Scope.UNIT, "bootstrap_contributor", True) + + self.opensearch_config.set_node( + self.app.name, + self.model.name, + self.unit_name, + roles, + cm_names, + cm_ips, + contribute_to_bootstrap, + ) + + def _cleanup_bootstrap_conf_if_applies(self) -> None: + """Remove some conf props in the CM nodes that contributed to the cluster bootstrapping.""" + self.opensearch_config.cleanup_bootstrap_conf() def on_allocation_exclusion_add_failed(self): """Callback for when the OpenSearch service fails stopping.""" @@ -78,73 +528,45 @@ def on_unassigned_shards(self, unassigned_shards: int): def append_allocation_exclusion_to_remove(self, unit_name) -> None: """Store a unit in the relation data bag, to be removed from the allocation exclusion.""" if not self.unit.is_leader(): - self.unit_peers_data["remove_from_allocation_exclusions"] = unit_name + self.peers_data.put(Scope.UNIT, "remove_from_allocation_exclusions", unit_name) return exclusions = set( - self.app_peers_data.get("remove_from_allocation_exclusions", "").split(",") + self.peers_data.get(Scope.APP, "remove_from_allocation_exclusions", "").split(",") ) exclusions.add(unit_name) - self.app_peers_data["remove_from_allocation_exclusions"] = ",".join(exclusions) + self.peers_data.put(Scope.APP, "remove_from_allocation_exclusions", ",".join(exclusions)) def remove_allocation_exclusions(self, exclusions: Set[str]) -> None: """Remove the allocation exclusions from the peer databag if existing.""" stored_exclusions = set( - self.app_peers_data.get("remove_from_allocation_exclusions", "").split(",") + self.peers_data.get(Scope.APP, "remove_from_allocation_exclusions", "").split(",") ) exclusions_to_keep = ",".join(stored_exclusions - exclusions) + scope = Scope.UNIT if self.unit.is_leader(): - self.app_peers_data["remove_from_allocation_exclusions"] = exclusions_to_keep - else: - self.unit_peers_data["remove_from_allocation_exclusions"] = exclusions_to_keep + scope = Scope.APP + + self.peers_data.put(scope, "remove_from_allocation_exclusions", exclusions_to_keep) def get_allocation_exclusions(self) -> str: """Retrieve the units that must be removed from the allocation exclusion.""" - return self.app_peers_data.get("to_remove_from_allocation_exclusion", "") + return self.peers_data.get(Scope.APP, "to_remove_from_allocation_exclusion", "") - def on_tls_conf_set( - self, event: CertificateAvailableEvent, scope: Scope, cert_type: CertType, renewal: bool - ) -> None: - """Called after certificate ready and stored on the corresponding scope databag.""" + @abstractmethod + def _store_tls_resources( + self, cert_type: CertType, secrets: Dict[str, any], override_admin: bool = True + ): + """Write certificates and keys on disk.""" pass - def on_tls_relation_broken(self): - """Called after certificates relation broken.""" + @abstractmethod + def _are_all_tls_resources_stored(self): + """Check if all TLS resources are stored on disk.""" pass - def clear_status( - self, status_message: str, pattern: StatusCheckPattern = StatusCheckPattern.Equal - ): - """Resets the unit status if it was previously blocked/maintenance with message.""" - condition: bool - if pattern == StatusCheckPattern.Equal: - condition = self.unit.status.message == status_message - elif pattern == StatusCheckPattern.Start: - condition = self.unit.status.message.startswith(status_message) - elif pattern == StatusCheckPattern.End: - condition = self.unit.status.message.endswith(status_message) - elif pattern == StatusCheckPattern.Interpolated: - condition = ( - re.fullmatch(status_message.replace("{}", "(?s:.*?)"), status_message) is not None - ) - else: - condition = status_message in self.unit.status.message - - if condition: - self.unit.status = ActiveStatus() - - @property - def app_peers_data(self) -> Dict[str, str]: - """Peer relation data object.""" - return self._get_relation_data(Scope.APP, PEER) - - @property - def unit_peers_data(self) -> Dict[str, str]: - """Peer relation data object.""" - return self._get_relation_data(Scope.UNIT, PEER) - @property def unit_ip(self) -> str: """IP address of the current unit.""" @@ -165,13 +587,3 @@ def alternative_host(self) -> str: """Return an alternative host (of another node) in case the current is offline.""" all_units_ips = units_ips(self, PEER) return random.choice(list(all_units_ips.values())) - - def _get_relation_data(self, scope: Scope, relation_name: str) -> Dict[str, str]: - """Relation data object.""" - relation = self.model.get_relation(relation_name) - if relation is None: - return {} - - relation_scope = self.app if scope == Scope.APP else self.unit - - return relation.data[relation_scope] diff --git a/lib/charms/opensearch/v0/opensearch_config.py b/lib/charms/opensearch/v0/opensearch_config.py index a2f01729e..67721a6cb 100644 --- a/lib/charms/opensearch/v0/opensearch_config.py +++ b/lib/charms/opensearch/v0/opensearch_config.py @@ -73,13 +73,13 @@ def set_node_tls_conf(self, cert_type: CertType, secrets: Dict[str, any]): self._opensearch.config.put( self.CONFIG_YML, f"plugins.security.ssl.{target_conf_layer}.pemcert_filepath", - f"{self._opensearch.paths.certs_relative}/{cert_type.val}.cert", + f"{self._opensearch.paths.certs_relative}/{cert_type}.cert", ) self._opensearch.config.put( self.CONFIG_YML, f"plugins.security.ssl.{target_conf_layer}.pemkey_filepath", - f"{self._opensearch.paths.certs_relative}/{cert_type.val}.key", + f"{self._opensearch.paths.certs_relative}/{cert_type}.key", ) self._opensearch.config.put( diff --git a/pyproject.toml b/pyproject.toml index 12d0028b7..e6436ccde 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ markers = [ # Formatting tools configuration [tool.black] line-length = 99 -target-version = ["py38"] +target-version = ["py310"] [tool.isort] profile = "black" diff --git a/requirements.txt b/requirements.txt index 6a124f71d..be08d89a0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ bcrypt==4.0.1 -cryptography==38.0.3 -jsonschema==4.17.0 -ops==1.5.3 +cryptography==38.0.4 +jsonschema==4.17.3 +ops==1.5.4 +overrides==7.3.1 requests==2.28.1 ruamel.yaml==0.17.21 tenacity==8.1.0 \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index c3645954c..3a9cde4b7 100755 --- a/src/charm.py +++ b/src/charm.py @@ -5,63 +5,18 @@ """Charmed Machine Operator for OpenSearch.""" import logging -from datetime import datetime from os.path import exists -from typing import Dict, List, Optional +from typing import Dict -from charms.opensearch.v0.constants_charm import ( - AdminUserInitProgress, - CertsExpirationError, - InstallError, - InstallProgress, - RequestUnitServiceOps, - SecurityIndexInitProgress, - ServiceIsStopping, - ServiceStartError, - ServiceStopFailed, - ServiceStopped, - TLSNotFullyConfigured, - TLSRelationBrokenError, - WaitingForBusyShards, - WaitingToStart, -) +from charms.opensearch.v0.constants_charm import InstallError, InstallProgress from charms.opensearch.v0.constants_tls import CertType -from charms.opensearch.v0.helper_cluster import ClusterState, ClusterTopology, Node -from charms.opensearch.v0.helper_databag import Scope -from charms.opensearch.v0.helper_networking import units_ips -from charms.opensearch.v0.helper_security import ( - cert_expiration_remaining_hours, - generate_hashed_password, - to_pkcs8, -) -from charms.opensearch.v0.opensearch_base_charm import ( - PEER, - SERVICE_MANAGER, - OpenSearchBaseCharm, - StatusCheckPattern, -) -from charms.opensearch.v0.opensearch_distro import ( - OpenSearchHttpError, - OpenSearchInstallError, - OpenSearchStartError, - OpenSearchStopError, -) -from charms.rolling_ops.v0.rollingops import RollingOpsManager -from charms.tls_certificates_interface.v1.tls_certificates import ( - CertificateAvailableEvent, -) -from ops.charm import ( - ActionEvent, - InstallEvent, - LeaderElectedEvent, - RelationChangedEvent, - RelationJoinedEvent, - StartEvent, - UpdateStatusEvent, -) -from ops.framework import EventBase +from charms.opensearch.v0.helper_security import to_pkcs8 +from charms.opensearch.v0.opensearch_base_charm import OpenSearchBaseCharm +from charms.opensearch.v0.opensearch_distro import OpenSearchInstallError +from ops.charm import InstallEvent from ops.main import main -from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus +from ops.model import BlockedStatus, MaintenanceStatus +from overrides import override from opensearch import OpenSearchTarball @@ -75,332 +30,17 @@ def __init__(self, *args): super().__init__(*args, distro=OpenSearchTarball) # OpenSearchSnap self.framework.observe(self.on.install, self._on_install) - self.framework.observe(self.on.leader_elected, self._on_leader_elected) - self.framework.observe(self.on.start, self._on_start) - - self.framework.observe(self.on[PEER].relation_joined, self._on_peer_relation_joined) - self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) - - self.framework.observe(self.on.update_status, self._on_update_status) - - self.framework.observe(self.on.get_admin_secrets_action, self._on_get_admin_secrets_action) - - self.service_manager = RollingOpsManager( - self, relation=SERVICE_MANAGER, callback=self._start_opensearch - ) def _on_install(self, _: InstallEvent) -> None: """Handle the install event.""" self.unit.status = MaintenanceStatus(InstallProgress) try: self.opensearch.install() - self.clear_status(InstallProgress) + self.status.clear(InstallProgress) except OpenSearchInstallError: self.unit.status = BlockedStatus(InstallError) - def _on_leader_elected(self, _: LeaderElectedEvent): - """Handle leader election event.""" - if self.app_peers_data.get("security_index_initialised"): - return - - if not self.app_peers_data.get("admin_user_initialized"): - self.unit.status = MaintenanceStatus(AdminUserInitProgress) - self._initialize_admin_user() - self.app_peers_data["admin_user_initialized"] = "True" - self.clear_status(AdminUserInitProgress) - - def _on_start(self, event: StartEvent): - """Triggered when on start. Set the right node role.""" - if self.opensearch.is_started(): - if self.app_peers_data.get("security_index_initialised"): - # in the case where it was on WaitingToStart status, event got deferred - # and the service started in between, put status back to active - self.clear_status(WaitingToStart) - - # cleanup bootstrap conf in the node if existing - if self.unit_peers_data.get("bootstrap_contributor"): - self._cleanup_bootstrap_conf_if_applies() - - return - - if not self._is_tls_fully_configured(): - self.unit.status = BlockedStatus(TLSNotFullyConfigured) - event.defer() - return - - # configure clients auth - self.opensearch_config.set_client_auth() - - # request the start of opensearch - self.unit.status = WaitingStatus(RequestUnitServiceOps.format("start")) - self.on[self.service_manager.name].acquire_lock.emit(callback_override="_start_opensearch") - - def _on_peer_relation_joined(self, event: RelationJoinedEvent): - """New node joining the cluster.""" - current_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - - # In the case of the first units before TLS is initialized - if not current_secrets: - if not self.unit.is_leader(): - event.defer() - return - - # in the case the cluster was bootstrapped with multiple units at the same time - # and the certificates have not been generated yet - if not current_secrets.get("cert") or not current_secrets.get("chain"): - event.defer() - return - - # Store the "Admin" certificate, key and CA on the disk of the new unit - self._store_tls_resources(CertType.APP_ADMIN, current_secrets, override_admin=False) - - def _on_peer_relation_changed(self, event: RelationChangedEvent): - """Handle peer relation changes.""" - if self.unit.is_leader(): - data = event.relation.data.get(event.unit) - if data: - exclusions_to_remove = data.get("remove_from_allocation_exclusions") - if exclusions_to_remove: - self.append_allocation_exclusion_to_remove(exclusions_to_remove) - - if data.get("bootstrap_contributor"): - contributor_count = int( - self.app_peers_data.get("bootstrap_contributors_count", 0) - ) - self.app_peers_data["bootstrap_contributors_count"] = str( - contributor_count + 1 - ) - - # Restart node when cert renewal for the transport layer - if self.unit_peers_data.get("must_reboot_node") == "True": - try: - self.opensearch.restart() - del self.unit_peers_data["must_reboot_node"] - except OpenSearchStartError: - event.defer() - - def _on_update_status(self, _: UpdateStatusEvent): - """On update status event. - - We want to periodically check for 2 things: - 1- The system requirements are still met - 2- every 6 hours check if certs are expiring soon (in 7 days), - as a safeguard in case relation broken. As there will be data loss - without the user noticing in case the cert of the unit transport layer expires. - So we want to stop opensearch in that case, since it cannot be recovered from. - """ - # if there are missing system requirements defer - missing_sys_reqs = self.opensearch.missing_sys_requirements() - if len(missing_sys_reqs) > 0: - self.unit.status = BlockedStatus(" - ".join(missing_sys_reqs)) - return - - # If relation broken - leave - if self.model.get_relation("certificates") is not None: - return - - # if node already shutdown - leave - if not self.opensearch.is_node_up(): - return - - # See if the last check was made less than 6h ago, if yes - leave - date_format = "%Y-%m-%d %H:%M:%S" - last_cert_check = datetime.strptime( - self.unit_peers_data.get("certs_exp_checked_at", "1970-01-01 00:00"), date_format - ) - if (datetime.now() - last_cert_check).seconds < 6 * 3600: - return - - certs = { - CertType.UNIT_TRANSPORT: self.secrets.get_object(Scope.UNIT, CertType.UNIT_TRANSPORT)[ - "cert" - ], - CertType.UNIT_HTTP: self.secrets.get_object(Scope.UNIT, CertType.UNIT_HTTP)["cert"], - } - if self.unit.is_leader(): - certs[CertType.APP_ADMIN] = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN)[ - "cert" - ] - - # keep certificates that are expiring in less than 24h - for cert_type, cert in certs.items(): - hours = cert_expiration_remaining_hours(cert) - if hours > 24 * 7: - del certs[cert_type] - - if len(certs) > 0: - missing = [cert.val for cert in certs.keys()] - self.unit.status = BlockedStatus(CertsExpirationError.format(", ".join(missing))) - - # stop opensearch in case the Node-transport certificate expires. - if certs.get(CertType.UNIT_TRANSPORT) is not None: - self.opensearch.stop() - - self.unit_peers_data["certs_exp_checked_at"] = datetime.now().strftime(date_format) - - def _on_get_admin_secrets_action(self, event: ActionEvent): - """Return the password and cert chain for the admin user of the cluster.""" - password = self.secrets.get(Scope.APP, "admin_password") - - chain = "" - admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - if admin_secrets and admin_secrets.get("chain"): - chain = "\n".join(admin_secrets["chain"][::-1]) - - event.set_results({"password": password if password else "", "chain": chain}) - - def on_tls_conf_set( - self, event: CertificateAvailableEvent, scope: Scope, cert_type: CertType, renewal: bool - ): - """Called after certificate ready and stored on the corresponding scope databag. - - - Store the cert on the file system, on all nodes for APP certificates - - Update the corresponding yaml conf files - - Run the security admin script - """ - # Get the list of stored secrets for this cert - current_secrets = self.secrets.get_object(scope, cert_type.val) - - # In case of renewal of the unit transport layer cert - stop opensearch - should_restart = False - if renewal and cert_type == CertType.UNIT_TRANSPORT: - self.opensearch.stop() - should_restart = True - - # Store cert/key on disk - must happen after opensearch stop for transport certs renewal - self._store_tls_resources(cert_type, current_secrets) - - if scope == Scope.UNIT: - # node http or transport cert - self.opensearch_config.set_node_tls_conf(cert_type, current_secrets) - else: - # write the admin cert conf on all units, in case there is a leader loss + cert renewal - self.opensearch_config.set_admin_tls_conf(current_secrets) - - if should_restart: - self.unit_peers_data["must_reboot_node"] = "True" - - def on_tls_relation_broken(self): - """As long as all certificates are produced, we don't do anything.""" - if self._is_tls_fully_configured(): - return - - # Otherwise, we block. - self.unit.status = BlockedStatus(TLSRelationBrokenError) - self.opensearch.stop() - - def _is_tls_fully_configured(self) -> bool: - """Check if TLS fully configured meaning the admin user configured & 3 certs present.""" - # In case the initialisation of the admin user is not finished yet - if not self.app_peers_data.get("admin_user_initialized"): - return False - - admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - if not admin_secrets or not admin_secrets.get("cert") or not admin_secrets.get("chain"): - return False - - unit_transport_secrets = self.secrets.get_object(Scope.UNIT, CertType.UNIT_TRANSPORT.val) - if not unit_transport_secrets or not unit_transport_secrets.get("cert"): - return False - - unit_http_secrets = self.secrets.get_object(Scope.UNIT, CertType.UNIT_HTTP.val) - if not unit_http_secrets or not unit_http_secrets.get("cert"): - return False - - return self._are_all_tls_resources_stored() - - def _start_opensearch(self, event: EventBase) -> None: - """Start OpenSearch if all resources configured.""" - if not self._can_service_start(): - event.defer() - return - - try: - # Retrieve the nodes of the cluster, needed to configure this node - nodes = self._get_nodes() - except OpenSearchHttpError: - event.defer() - return - - # Set the configuration of the node - self._set_node_conf(nodes) - - try: - self.unit.status = BlockedStatus(WaitingToStart) - self.opensearch.start() - self.clear_status(WaitingToStart) - except OpenSearchStartError as e: - logger.debug(e) - self.unit.status = BlockedStatus(ServiceStartError) - event.defer() - return - - # initialize the security index if the admin certs are written on disk - if self.unit.is_leader(): - if self.app_peers_data.get("security_index_initialised") is None: - admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - self._initialize_security_index(admin_secrets) - self.app_peers_data["security_index_initialised"] = "True" - - self.app_peers_data["leader_ip"] = self.unit_ip - - # store the exclusions that previously failed to be stored when no units online - if self.app_peers_data.get("remove_from_allocation_exclusions"): - self.opensearch.remove_allocation_exclusions( - self.app_peers_data["remove_from_allocation_exclusions"] - ) - - # cleanup bootstrap conf in the node - if self.unit_peers_data.get("bootstrap_contributor"): - self._cleanup_bootstrap_conf_if_applies() - - def _stop_opensearch(self, event: EventBase) -> None: - """Stop OpenSearch if possible.""" - try: - self.unit.status = WaitingStatus(ServiceIsStopping) - self.opensearch.stop() - self.unit.status = WaitingStatus(ServiceStopped) - except OpenSearchStopError as e: - logger.debug(e) - self.unit.status = BlockedStatus(ServiceStopFailed) - event.defer() - - def _restart_opensearch(self, event: EventBase) -> None: - """Restart OpenSearch if possible.""" - self._stop_opensearch(event) - self._start_opensearch(event) - - def _can_service_start(self): - """Return if the opensearch service can start.""" - # if there are any missing system requirements leave - missing_sys_reqs = self.opensearch.missing_sys_requirements() - if len(missing_sys_reqs) > 0: - self.unit.status = BlockedStatus(" - ".join(missing_sys_reqs)) - return False - - # When a new unit joins, replica shards are automatically added to it. In order to prevent - # overloading the cluster, units must be started one at a time. So we defer starting - # opensearch until all shards in other units are in a "started" or "unassigned" state. - if not self.unit.is_leader() and self.app_peers_data.get("leader_ip"): - try: - busy_shards = ClusterState.busy_shards_by_unit( - self.opensearch, self.app_peers_data.get("leader_ip") - ) - if busy_shards: - message = WaitingForBusyShards.format( - " - ".join([f"{key}/{','.join(val)}" for key, val in busy_shards.items()]) - ) - self.unit.status = WaitingStatus(message) - return False - - self.clear_status(WaitingForBusyShards, pattern=StatusCheckPattern.Interpolated) - except OpenSearchHttpError: - # this means that the leader unit is not reachable (not started yet), - # meaning that it's a new cluster, so we can safely start the OpenSearch service - pass - - return True - + @override def _store_tls_resources( self, cert_type: CertType, secrets: Dict[str, any], override_admin: bool = True ): @@ -408,10 +48,10 @@ def _store_tls_resources( certs_dir = self.opensearch.paths.certs self.opensearch.write_file( - f"{certs_dir}/{cert_type.val}.key", + f"{certs_dir}/{cert_type}.key", to_pkcs8(secrets["key"], secrets.get("key-password")), ) - self.opensearch.write_file(f"{certs_dir}/{cert_type.val}.cert", secrets["cert"]) + self.opensearch.write_file(f"{certs_dir}/{cert_type}.cert", secrets["cert"]) self.opensearch.write_file(f"{certs_dir}/root-ca.cert", secrets["ca"], override=False) if cert_type == CertType.APP_ADMIN: @@ -421,124 +61,17 @@ def _store_tls_resources( override=override_admin, ) + @override def _are_all_tls_resources_stored(self): """Check if all TLS resources are stored on disk.""" certs_dir = self.opensearch.paths.certs for cert_type in [CertType.APP_ADMIN, CertType.UNIT_TRANSPORT, CertType.UNIT_HTTP]: for extension in ["key", "cert"]: - if not exists(f"{certs_dir}/{cert_type.val}.{extension}"): + if not exists(f"{certs_dir}/{cert_type}.{extension}"): return False return exists(f"{certs_dir}/chain.pem") and exists(f"{certs_dir}/root-ca.cert") - def _initialize_admin_user(self): - """Change default password of Admin user.""" - hashed_pwd, pwd = generate_hashed_password() - self.secrets.put(Scope.APP, "admin_password", pwd) - - self.opensearch.config.put( - "opensearch-security/internal_users.yml", - "admin", - { - "hash": hashed_pwd, - "reserved": True, # this protects this resource from being updated on the dashboard or rest api - "backend_roles": ["admin"], - "description": "Admin user", - }, - ) - - def _initialize_security_index(self, admin_secrets: Dict[str, any]): - """Run the security_admin script, it creates and initializes the opendistro_security index. - - IMPORTANT: must only run once per cluster, otherwise the index gets overrode - """ - args = [ - f"-cd {self.opensearch.paths.conf}/opensearch-security/", - f"-cn {self.app.name}-{self.model.name}", - f"-h {self.unit_ip}", - f"-cacert {self.opensearch.paths.certs}/root-ca.cert", - f"-cert {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.cert", - f"-key {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.key", - ] - - admin_key_pwd = admin_secrets.get("key-password", None) - if admin_key_pwd is not None: - args.append(f"-keypass {admin_key_pwd}") - - self.unit.status = MaintenanceStatus(SecurityIndexInitProgress) - self.opensearch.run_script( - "plugins/opensearch-security/tools/securityadmin.sh", " ".join(args) - ) - self.clear_status(SecurityIndexInitProgress) - - def _get_nodes(self) -> List[Node]: - """Fetch the list of nodes of the cluster, depending on the requester.""" - - def fetch() -> List[Node]: - """Fetches the list of nodes through HTTP.""" - host: Optional[str] = None - alt_hosts: Optional[List[str]] = None - - all_units_ips = units_ips(self, PEER).values() - if all_units_ips: - all_hosts = list(all_units_ips) - host = all_hosts.pop(0) # get first value - alt_hosts = all_hosts - - nodes: List[Node] = [] - if host is not None: - response = self.opensearch.request( - "GET", "/_nodes", host=host, alt_hosts=alt_hosts - ) - if "nodes" in response: - for obj in response["nodes"].values(): - nodes.append(Node(obj["name"], obj["roles"], obj["ip"])) - - return nodes - - try: - return fetch() - except OpenSearchHttpError: - if self.unit.is_leader() and not self.app_peers_data.get("security_index_initialised"): - return [] - raise - - def _set_node_conf(self, nodes: List[Node]) -> None: - """Set the configuration of the current node / unit.""" - roles = ClusterTopology.suggest_roles(nodes, self.app.planned_units()) - - cm_names = ClusterTopology.get_cluster_managers_names(nodes) - cm_ips = ClusterTopology.get_cluster_managers_ips(nodes) - - contribute_to_bootstrap = False - if "cluster_manager" in roles: - cm_names.append(self.unit_name) - cm_ips.append(self.unit_ip) - - cms_in_bootstrap = int(self.app_peers_data.get("bootstrap_contributors_count", 0)) - if cms_in_bootstrap < self.app.planned_units(): - contribute_to_bootstrap = True - - if self.unit.is_leader(): - self.app_peers_data["bootstrap_contributors_count"] = f"{cms_in_bootstrap + 1}" - - # indicates that this unit is part of the "initial cm nodes" - self.unit_peers_data["bootstrap_contributor"] = "True" - - self.opensearch_config.set_node( - self.app.name, - self.model.name, - self.unit_name, - roles, - cm_names, - cm_ips, - contribute_to_bootstrap, - ) - - def _cleanup_bootstrap_conf_if_applies(self) -> None: - """Remove some conf props in the CM nodes that contributed to the cluster bootstrapping.""" - self.opensearch_config.cleanup_bootstrap_conf() - if __name__ == "__main__": main(OpenSearchOperatorCharm) diff --git a/src/opensearch.py b/src/opensearch.py index 7a24ba55a..31b8e1bb5 100644 --- a/src/opensearch.py +++ b/src/opensearch.py @@ -23,6 +23,7 @@ ) from charms.operator_libs_linux.v1 import snap from charms.operator_libs_linux.v1.snap import SnapError +from overrides import override from tenacity import retry, stop_after_attempt, wait_exponential from utils import extract_tarball @@ -39,6 +40,7 @@ def __init__(self, charm, peer_relation: str): cache = snap.SnapCache() self._opensearch = cache["opensearch"] + @override def install(self): """Install opensearch from the snapcraft store.""" if self._opensearch.present: @@ -52,6 +54,7 @@ def install(self): self._run_cmd("snap connect opensearch:process-control") + @override def start(self): """Start the snap exposed "daemon" service.""" if not self._opensearch.present: @@ -69,6 +72,7 @@ def start(self): logger.error(f"Failed to start the opensearch.{self.SERVICE_NAME} service. \n{e}") raise OpenSearchStartError() + @override def _stop_service(self): """Stop the snap exposed "daemon" service.""" if not self._opensearch.present: @@ -80,6 +84,7 @@ def _stop_service(self): logger.error(f"Failed to stop the opensearch.{self.SERVICE_NAME} service. \n{e}") raise OpenSearchStopError() + @override def _build_paths(self) -> Paths: """Builds a Path object. @@ -108,6 +113,7 @@ def __init__(self, charm, peer_relation: str): wait=wait_exponential(multiplier=1, min=2, max=10), reraise=True, ) + @override def install(self): """Temporary (will be deleted later) - Download and Un-tar the opensearch distro.""" try: @@ -125,6 +131,7 @@ def install(self): extract_tarball(tarball_path, self.paths.home) self._create_systemd_unit() + @override def start(self): """Start opensearch as a Daemon.""" logger.debug("Starting opensearch.") @@ -147,6 +154,7 @@ def start(self): else: raise OpenSearchStartError() + @override def _stop_service(self): """Stop opensearch.""" self._run_cmd( @@ -164,6 +172,7 @@ def _stop_service(self): raise OpenSearchStopError() + @override def _build_paths(self) -> Paths: return Paths( home="/etc/opensearch", diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index feb44a78b..a295cce1a 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -2,153 +2,93 @@ # See LICENSE file for licensing details. # # Learn more about testing at: https://juju.is/docs/sdk/testing - -import unittest -from datetime import datetime, timedelta -from unittest.mock import patch +import tempfile +from os import listdir +from os.path import isfile, join +from unittest.mock import MagicMock, patch from charms.opensearch.v0.constants_tls import CertType -from charms.opensearch.v0.helper_databag import Scope -from charms.opensearch.v0.opensearch_base_charm import PEER, SERVICE_MANAGER -from charms.opensearch.v0.opensearch_distro import ( - OpenSearchHttpError, - OpenSearchInstallError, -) -from helpers import patch_network_get -from ops.model import ActiveStatus, BlockedStatus -from ops.testing import Harness - -from charm import OpenSearchOperatorCharm +from helpers import create_utf8_encoded_private_key +from unit.test_opensearch_base_charm import TestOpenSearchBaseCharm -@patch_network_get("1.1.1.1") -class TestCharm(unittest.TestCase): +class TestCharm(TestOpenSearchBaseCharm): @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution._create_directories") def setUp(self, _create_directories): - self.harness = Harness(OpenSearchOperatorCharm) - self.addCleanup(self.harness.cleanup) - self.harness.begin() - self.charm = self.harness.charm - self.rel_id = self.harness.add_relation(PEER, self.charm.app.name) - self.service_rel_id = self.harness.add_relation(SERVICE_MANAGER, self.charm.app.name) - - @patch("opensearch.OpenSearchTarball.install") - def test_on_install(self, install): - """Test the install event callback on success.""" - self.charm.on.install.emit() - install.assert_called_once() - - @patch("opensearch.OpenSearchTarball.install") - def test_on_install_error(self, install): - """Test the install event callback on error.""" - install.side_effect = OpenSearchInstallError() - self.charm.on.install.emit() - self.assertTrue(isinstance(self.harness.model.unit.status, BlockedStatus)) - - @patch("charm.OpenSearchOperatorCharm._initialize_admin_user") - def test_on_leader_elected(self, _initialize_admin_user): - """Test on leader elected event.""" - self.harness.set_leader(True) - self.charm.on.leader_elected.emit() - _initialize_admin_user.assert_called_once() - self.assertTrue(isinstance(self.harness.model.unit.status, ActiveStatus)) - - @patch("charm.OpenSearchOperatorCharm._initialize_admin_user") - def test_on_leader_elected_index_initialised(self, _initialize_admin_user): - # security_index_initialised - self.charm.app_peers_data["security_index_initialised"] = "True" - self.harness.set_leader(True) - self.charm.on.leader_elected.emit() - _initialize_admin_user.assert_not_called() - - # admin_user_initialized - del self.charm.app_peers_data["security_index_initialised"] - self.charm.app_peers_data["admin_user_initialized"] = "True" - self.charm.on.leader_elected.emit() - _initialize_admin_user.assert_not_called() - - @patch("opensearch.OpenSearchTarball.is_started") - @patch("charm.OpenSearchOperatorCharm._is_tls_fully_configured") - @patch("charms.opensearch.v0.opensearch_config.OpenSearchConfig.set_client_auth") - @patch("charm.OpenSearchOperatorCharm._get_nodes") - @patch("charm.OpenSearchOperatorCharm._set_node_conf") - @patch("charm.OpenSearchOperatorCharm._can_service_start") - @patch("opensearch.OpenSearchTarball.start") - @patch("charm.OpenSearchOperatorCharm._initialize_security_index") - @patch("charm.OpenSearchOperatorCharm._initialize_admin_user") - def test_on_start( - self, - _initialize_admin_user, - _initialize_security_index, - start, - _can_service_start, - _set_node_conf, - _get_nodes, - set_client_auth, - _is_tls_fully_configured, - is_started, - ): - """Test on start event.""" - # test when setup complete - is_started.return_value = True - self.charm.app_peers_data["security_index_initialised"] = "True" - self.charm.on.start.emit() - _is_tls_fully_configured.assert_not_called() - - # test when setup not complete - is_started.return_value = False - del self.charm.app_peers_data["security_index_initialised"] - _is_tls_fully_configured.return_value = False - self.charm.on.start.emit() - set_client_auth.assert_not_called() - - # when _get_nodes fails - _get_nodes.side_effect = OpenSearchHttpError() - self.charm.on.start.emit() - _set_node_conf.assert_not_called() - - # _get_nodes succeeds - _is_tls_fully_configured.return_value = True - _get_nodes.side_effect = None - _can_service_start.return_value = False - self.charm.on.start.emit() - _get_nodes.assert_not_called() - _set_node_conf.assert_not_called() - _initialize_security_index.assert_not_called() - - # initialisation of the security index - del self.charm.app_peers_data["security_index_initialised"] - _can_service_start.return_value = True - self.harness.set_leader(True) - self.charm.on.start.emit() - _get_nodes.assert_called() - _set_node_conf.assert_called() - start.assert_called_once() - self.assertEqual(self.charm.app_peers_data["security_index_initialised"], "True") - _initialize_security_index.assert_called_once() - - @patch("charms.opensearch.v0.helper_security.cert_expiration_remaining_hours") - @patch("opensearch.OpenSearchTarball.is_node_up") - @patch("ops.model.Model.get_relation") - @patch("opensearch.OpenSearchTarball.missing_sys_requirements") - def test_on_update_status( - self, missing_sys_requirements, get_relation, is_node_up, cert_expiration_remaining_hours - ): - """Test on update status.""" - # test missing sys requirements - missing_sys_requirements.return_value = ["ulimit -n not set"] - self.charm.on.update_status.emit() - self.assertTrue(isinstance(self.harness.model.unit.status, BlockedStatus)) - - # test when TLS relation is broken and cert is expiring soon - get_relation.return_value = None - is_node_up.return_value = True - self.charm.unit_peers_data["certs_exp_checked_at"] = ( - datetime.now() - timedelta(hours=7) - ).strftime("%Y-%m-%d %H:%M:%S") - self.charm.secrets.put_object( - Scope.UNIT, CertType.UNIT_TRANSPORT.val, {"cert": "transport"} - ) - cert_expiration_remaining_hours.return_value = 24 * 3 - self.charm.on.update_status.emit() - self.assertTrue(isinstance(self.harness.model.unit.status, BlockedStatus)) + super().setUp() + + def test_store_tls_resources(self): + """Test the storing of TLS resources.""" + self.opensearch.paths = MagicMock() + + with tempfile.TemporaryDirectory() as tmp_dir: + self.opensearch.paths.certs = tmp_dir + + self.charm._store_tls_resources( + CertType.UNIT_TRANSPORT, + {"ca": "ca", "cert": "cert_transport", "key": create_utf8_encoded_private_key()}, + ) + + stored_files = [f for f in listdir(tmp_dir) if isfile(join(tmp_dir, f))] + + t_prefix = CertType.UNIT_TRANSPORT.val + self.assertCountEqual( + stored_files, ["root-ca.cert", f"{t_prefix}.cert", f"{t_prefix}.key"] + ) + + self.charm._store_tls_resources( + CertType.APP_ADMIN, + { + "ca": "ca", + "cert": "cert_admin", + "chain": "chain", + "key": create_utf8_encoded_private_key(), + }, + ) + + stored_files = [f for f in listdir(tmp_dir) if isfile(join(tmp_dir, f))] + + a_prefix = CertType.APP_ADMIN.val + self.assertCountEqual( + stored_files, + [ + "root-ca.cert", + f"{a_prefix}.cert", + f"{a_prefix}.key", + "chain.pem", + f"{t_prefix}.cert", + f"{t_prefix}.key", + ], + ) + + def test_are_all_tls_resources_stored(self): + """Test if all TLS resources are successfully stored.""" + self.opensearch.paths = MagicMock() + + with tempfile.TemporaryDirectory() as tmp_dir: + self.opensearch.paths.certs = tmp_dir + + self.assertFalse(self.charm._are_all_tls_resources_stored()) + + self.charm._store_tls_resources( + CertType.UNIT_TRANSPORT, + {"ca": "ca", "cert": "cert_transport", "key": create_utf8_encoded_private_key()}, + ) + self.assertFalse(self.charm._are_all_tls_resources_stored()) + + self.charm._store_tls_resources( + CertType.APP_ADMIN, + { + "ca": "ca", + "cert": "cert_admin", + "chain": "chain", + "key": create_utf8_encoded_private_key(), + }, + ) + self.assertFalse(self.charm._are_all_tls_resources_stored()) + + self.charm._store_tls_resources( + CertType.UNIT_HTTP, + {"ca": "ca", "cert": "cert_http", "key": create_utf8_encoded_private_key()}, + ) + self.assertTrue(self.charm._are_all_tls_resources_stored()) diff --git a/tests/unit/test_helper_charm.py b/tests/unit/test_helper_charm.py new file mode 100644 index 000000000..842afb967 --- /dev/null +++ b/tests/unit/test_helper_charm.py @@ -0,0 +1,49 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit test for the helper_cluster library.""" + +import unittest +from unittest.mock import patch + +from charms.opensearch.v0.helper_charm import Status +from charms.opensearch.v0.opensearch_base_charm import PEER +from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus +from ops.testing import Harness + +from charm import OpenSearchOperatorCharm + + +class TestHelperDatabag(unittest.TestCase): + @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution._create_directories") + def setUp(self, _create_directories) -> None: + self.harness = Harness(OpenSearchOperatorCharm) + self.addCleanup(self.harness.cleanup) + self.harness.begin() + + self.charm = self.harness.charm + self.rel_id = self.harness.add_relation(PEER, self.charm.app.name) + self.status = self.charm.status + + def test_clear_status(self): + """Test clearing the charm status.""" + self.charm.unit.status = WaitingStatus("Status Message 1") + self.status.clear("Status Message 1", pattern=Status.CheckPattern.Equal) + self.assertEqual(self.charm.unit.status.name, "active") + + self.charm.unit.status = WaitingStatus("Status Message 2") + self.status.clear("Stat", pattern=Status.CheckPattern.Start) + self.assertEqual(self.charm.unit.status.name, "active") + + self.charm.unit.status = MaintenanceStatus("Status Message 3") + self.status.clear("ssage 3", pattern=Status.CheckPattern.End) + self.assertEqual(self.charm.unit.status.name, "active") + + self.charm.unit.status = BlockedStatus("Status Message 4") + self.status.clear("essage 4", pattern=Status.CheckPattern.Contain) + self.assertEqual(self.charm.unit.status.name, "active") + + message_template = "Message {} filled by {}." + self.charm.unit.status = BlockedStatus(message_template.format(5, "unit tests")) + self.status.clear(message_template, pattern=Status.CheckPattern.Interpolated) + self.assertEqual(self.charm.unit.status.name, "active") diff --git a/tests/unit/test_helper_databag.py b/tests/unit/test_helper_databag.py index 28fad21a5..875a38d74 100644 --- a/tests/unit/test_helper_databag.py +++ b/tests/unit/test_helper_databag.py @@ -23,6 +23,67 @@ def setUp(self, _create_directories) -> None: self.charm = self.harness.charm self.rel_id = self.harness.add_relation(PEER, self.charm.app.name) self.secret_store = self.charm.secrets + self.peers_data = self.charm.peers_data + + def test_typed_put_get(self): + """Test putting and getting typed data in/from the relation data store.""" + self.peers_data.put(Scope.APP, "bool-true", True) + self.assertTrue(self.peers_data.get(Scope.APP, "bool-true")) + self.assertTrue(self.peers_data.get(Scope.APP, "bool-true", auto_casting=False), "True") + + self.peers_data.put(Scope.APP, "bool-false", False) + self.assertFalse(self.peers_data.get(Scope.APP, "bool-false")) + self.assertTrue(self.peers_data.get(Scope.APP, "bool-false", auto_casting=False), "False") + + self.peers_data.put(Scope.UNIT, "int", 18) + self.assertEqual(self.peers_data.get(Scope.UNIT, "int"), 18) + self.assertEqual(self.peers_data.get(Scope.UNIT, "int", auto_casting=False), "18") + + self.peers_data.put(Scope.APP, "float", 2.6) + self.assertEqual(self.peers_data.get(Scope.APP, "float"), 2.6) + self.assertEqual(self.peers_data.get(Scope.APP, "float", auto_casting=False), "2.6") + + self.peers_data.put(Scope.APP, "str", "str-val") + self.assertEqual(self.peers_data.get(Scope.APP, "str"), "str-val") + self.assertEqual(self.peers_data.get(Scope.APP, "str", auto_casting=False), "str-val") + + def test_data_has(self): + """Test checking on the existence of a key.""" + self.peers_data.put(Scope.APP, "key1", "val1") + self.peers_data.put(Scope.UNIT, "key2", "val2") + + self.assertTrue(self.peers_data.has(Scope.APP, "key1")) + self.assertTrue(self.peers_data.has(Scope.UNIT, "key2")) + + self.assertFalse(self.peers_data.has(Scope.APP, "key2")) + self.assertFalse(self.peers_data.has(Scope.UNIT, "key1")) + + def test_get_all(self): + """Test the full retrieval of the store data.""" + self.assertCountEqual(self.peers_data.all(Scope.APP), {}) + self.assertCountEqual(self.peers_data.all(Scope.UNIT), {}) + + self.peers_data.put(Scope.APP, "str", "val-0") + self.peers_data.put(Scope.APP, "int", 10) + self.peers_data.put(Scope.UNIT, "str", "val-1") + self.peers_data.put(Scope.UNIT, "float", 2.8) + + self.assertCountEqual(self.peers_data.all(Scope.APP), {"str": "val-0", "int": "10"}) + self.assertCountEqual(self.peers_data.all(Scope.UNIT), {"str": "val-1", "float": "2.8"}) + + def test_typed_get_with_default(self): + """Test putting and getting typed data in/from the relation data store.""" + self.assertTrue(self.peers_data.get(Scope.APP, "bool-missing-true", default=True)) + self.assertFalse(self.peers_data.get(Scope.APP, "bool-missing-false", default=False)) + + self.assertEqual(self.peers_data.get(Scope.APP, "int-missing", 2), 2) + self.assertEqual(self.peers_data.get(Scope.UNIT, "float-missing", default=2.5), 2.5) + self.assertEqual(self.peers_data.get(Scope.APP, "str-missing", default="str"), "str") + + def test_get_null_without_default(self): + """Test fetching non-existent keys from the databag.""" + self.assertIsNone(self.peers_data.get(Scope.APP, "missing-key")) + self.assertIsNone(self.peers_data.get(Scope.UNIT, "missing-key")) def test_put_get(self): """Test putting and getting data of simple type in/from the secret store.""" diff --git a/tests/unit/test_helper_networking.py b/tests/unit/test_helper_networking.py index 2e74b9217..08838161e 100644 --- a/tests/unit/test_helper_networking.py +++ b/tests/unit/test_helper_networking.py @@ -4,6 +4,7 @@ """Unit test for the helper_cluster library.""" import unittest +import uuid from unittest.mock import patch from charms.opensearch.v0.helper_networking import ( @@ -66,4 +67,4 @@ def test_units_ips(self): def test_is_reachable(self): """Test if host is reachable.""" self.assertTrue(is_reachable("google.com", 80)) - self.assertFalse(is_reachable("googl.comodo", 80)) + self.assertFalse(is_reachable(uuid.uuid4().hex, 80)) diff --git a/tests/unit/test_opensearch_base_charm.py b/tests/unit/test_opensearch_base_charm.py index 9fbc372e2..2a87bda8e 100644 --- a/tests/unit/test_opensearch_base_charm.py +++ b/tests/unit/test_opensearch_base_charm.py @@ -4,38 +4,180 @@ """Unit test for the helper_cluster library.""" import unittest +from datetime import datetime, timedelta from unittest.mock import patch -from charms.opensearch.v0.opensearch_base_charm import PEER +from charms.opensearch.v0.constants_tls import CertType +from charms.opensearch.v0.helper_databag import Scope +from charms.opensearch.v0.opensearch_base_charm import PEER, SERVICE_MANAGER +from charms.opensearch.v0.opensearch_distro import ( + OpenSearchHttpError, + OpenSearchInstallError, +) +from ops.model import ActiveStatus, BlockedStatus from ops.testing import Harness from charm import OpenSearchOperatorCharm from tests.helpers import patch_network_get -class TestHelperDatabag(unittest.TestCase): - @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution._create_directories") +@patch_network_get("1.1.1.1") +class TestOpenSearchBaseCharm(unittest.TestCase): + + BASE_LIB_PATH = "charms.opensearch.v0" + BASE_CHARM_CLASS = f"{BASE_LIB_PATH}.opensearch_base_charm.OpenSearchBaseCharm" + OPENSEARCH_DISTRO = "" + + @patch(f"{BASE_LIB_PATH}.opensearch_distro.OpenSearchDistribution._create_directories") def setUp(self, _create_directories) -> None: self.harness = Harness(OpenSearchOperatorCharm) self.addCleanup(self.harness.cleanup) self.harness.begin() self.charm = self.harness.charm + self.opensearch = self.charm.opensearch + self.peers_data = self.charm.peers_data self.rel_id = self.harness.add_relation(PEER, self.charm.app.name) + self.service_rel_id = self.harness.add_relation(SERVICE_MANAGER, self.charm.app.name) + + self.OPENSEARCH_DISTRO = ( + f"{self.opensearch.__class__.__module__}.{self.opensearch.__class__.__name__}" + ) + + def test_on_install(self): + """Test the install event callback on success.""" + with patch(f"{self.OPENSEARCH_DISTRO}.install") as install: + self.charm.on.install.emit() + install.assert_called_once() + + def test_on_install_error(self): + """Test the install event callback on error.""" + with patch(f"{self.OPENSEARCH_DISTRO}.install") as install: + install.side_effect = OpenSearchInstallError() + self.charm.on.install.emit() + self.assertTrue(isinstance(self.harness.model.unit.status, BlockedStatus)) + + @patch(f"{BASE_CHARM_CLASS}._initialize_admin_user") + def test_on_leader_elected(self, _initialize_admin_user): + """Test on leader elected event.""" + self.harness.set_leader(True) + self.charm.on.leader_elected.emit() + _initialize_admin_user.assert_called_once() + self.assertTrue(isinstance(self.harness.model.unit.status, ActiveStatus)) + + @patch(f"{BASE_CHARM_CLASS}._initialize_admin_user") + def test_on_leader_elected_index_initialised(self, _initialize_admin_user): + # security_index_initialised + self.peers_data.put(Scope.APP, "security_index_initialised", True) + self.harness.set_leader(True) + self.charm.on.leader_elected.emit() + _initialize_admin_user.assert_not_called() + + # admin_user_initialized + self.peers_data.delete(Scope.APP, "security_index_initialised") + self.peers_data.put(Scope.APP, "admin_user_initialized", True) + self.charm.on.leader_elected.emit() + _initialize_admin_user.assert_not_called() + + @patch(f"{BASE_CHARM_CLASS}._is_tls_fully_configured") + @patch(f"{BASE_LIB_PATH}.opensearch_config.OpenSearchConfig.set_client_auth") + @patch(f"{BASE_CHARM_CLASS}._get_nodes") + @patch(f"{BASE_CHARM_CLASS}._set_node_conf") + @patch(f"{BASE_CHARM_CLASS}._can_service_start") + @patch(f"{BASE_CHARM_CLASS}._initialize_security_index") + @patch(f"{BASE_CHARM_CLASS}._initialize_admin_user") + def test_on_start( + self, + _initialize_admin_user, + _initialize_security_index, + _can_service_start, + _set_node_conf, + _get_nodes, + set_client_auth, + _is_tls_fully_configured, + ): + """Test on start event.""" + with patch(f"{self.OPENSEARCH_DISTRO}.is_started") as is_started: + # test when setup complete + is_started.return_value = True + self.peers_data.put(Scope.APP, "security_index_initialised", True) + self.charm.on.start.emit() + _is_tls_fully_configured.assert_not_called() + + # test when setup not complete + is_started.return_value = False + self.peers_data.delete(Scope.APP, "security_index_initialised") + _is_tls_fully_configured.return_value = False + self.charm.on.start.emit() + set_client_auth.assert_not_called() + + # when _get_nodes fails + _get_nodes.side_effect = OpenSearchHttpError() + self.charm.on.start.emit() + _set_node_conf.assert_not_called() + + # _get_nodes succeeds + _is_tls_fully_configured.return_value = True + _get_nodes.side_effect = None + _can_service_start.return_value = False + self.charm.on.start.emit() + _get_nodes.assert_not_called() + _set_node_conf.assert_not_called() + _initialize_security_index.assert_not_called() + + with patch(f"{self.OPENSEARCH_DISTRO}.start") as start: + # initialisation of the security index + self.peers_data.delete(Scope.APP, "security_index_initialised") + _can_service_start.return_value = True + self.harness.set_leader(True) + self.charm.on.start.emit() + _get_nodes.assert_called() + _set_node_conf.assert_called() + start.assert_called_once() + self.assertTrue(self.peers_data.get(Scope.APP, "security_index_initialised")) + _initialize_security_index.assert_called_once() + + @patch(f"{BASE_LIB_PATH}.helper_security.cert_expiration_remaining_hours") + @patch("ops.model.Model.get_relation") + def test_on_update_status(self, get_relation, cert_expiration_remaining_hours): + """Test on update status.""" + with patch( + f"{self.OPENSEARCH_DISTRO}.missing_sys_requirements" + ) as missing_sys_requirements: + # test missing sys requirements + missing_sys_requirements.return_value = ["ulimit -n not set"] + self.charm.on.update_status.emit() + self.assertTrue(isinstance(self.harness.model.unit.status, BlockedStatus)) + + with patch(f"{self.OPENSEARCH_DISTRO}.is_node_up") as is_node_up: + # test when TLS relation is broken and cert is expiring soon + get_relation.return_value = None + is_node_up.return_value = True + self.peers_data.put( + Scope.UNIT, + "certs_exp_checked_at", + (datetime.now() - timedelta(hours=7)).strftime("%Y-%m-%d %H:%M:%S"), + ) + self.charm.secrets.put_object( + Scope.UNIT, CertType.UNIT_TRANSPORT.val, {"cert": "transport"} + ) + cert_expiration_remaining_hours.return_value = 24 * 3 + self.charm.on.update_status.emit() + self.assertTrue(isinstance(self.harness.model.unit.status, BlockedStatus)) def test_app_peers_data(self): """Test getting data from the app relation data bag.""" - self.assertEqual(self.charm.app_peers_data, {}) + self.assertEqual(self.peers_data.all(Scope.APP), {}) - self.charm.app_peers_data["app-key"] = "app-val" - self.assertEqual(self.charm.app_peers_data["app-key"], "app-val") + self.peers_data.put(Scope.APP, "app-key", "app-val") + self.assertEqual(self.peers_data.get(Scope.APP, "app-key"), "app-val") def test_unit_peers_data(self): """Test getting data from the unit relation data bag.""" - self.assertEqual(self.charm.unit_peers_data, {}) + self.assertEqual(self.peers_data.all(Scope.UNIT), {}) - self.charm.app_peers_data["unit-key"] = "unit-val" - self.assertEqual(self.charm.app_peers_data["unit-key"], "unit-val") + self.peers_data.put(Scope.UNIT, "unit-key", "unit-val") + self.assertEqual(self.peers_data.get(Scope.UNIT, "unit-key"), "unit-val") @patch_network_get("1.1.1.1") def test_unit_ip(self):