From a38e7743ba47ae5505e25bfcccaf34f739cebbf4 Mon Sep 17 00:00:00 2001 From: phvalguima Date: Thu, 14 Dec 2023 13:10:51 +0100 Subject: [PATCH] [DPE-2345][DPE-2451] Add restore action Extends https://github.com/canonical/opensearch-operator/pull/135 and adds restore support. This PR will override any conflicting index in case of restore. It also updates list backups action to be shown in table format by default. It adds 3x extra integration tests: 1) Snapshot / restore in the same cluster 2) Snapshot / remove / readd relation / restore 3) Snapshot / redeploy cluster / restore --- actions.yaml | 18 ++ lib/charms/opensearch/v0/helper_cluster.py | 18 ++ .../opensearch/v0/opensearch_backups.py | 277 +++++++++++++++--- tests/integration/ha/test_backups.py | 194 +++++++++++- tests/unit/lib/test_backups.py | 19 +- 5 files changed, 462 insertions(+), 64 deletions(-) diff --git a/actions.yaml b/actions.yaml index f2a0d54ab..9b3c3607e 100644 --- a/actions.yaml +++ b/actions.yaml @@ -45,3 +45,21 @@ create-backup: list-backups: description: List available backup_ids in the S3 bucket and path provided by the S3 integrator charm. + params: + output: + type: string + default: "table" + description: | + Format which the data should be returned. Possible values: table, json. + The json format will bring more details, such as shard status, indices name, etc. + +restore: + description: Restore a database backup. + S3 credentials are retrieved from a relation with the S3 integrator charm. + params: + backup-id: + type: integer + description: | + A backup-id to identify the backup to restore. Format: + required: + - backup-id diff --git a/lib/charms/opensearch/v0/helper_cluster.py b/lib/charms/opensearch/v0/helper_cluster.py index 295da04c3..f22fe909e 100644 --- a/lib/charms/opensearch/v0/helper_cluster.py +++ b/lib/charms/opensearch/v0/helper_cluster.py @@ -255,6 +255,24 @@ def shards( """Get all shards of all indexes in the cluster.""" return opensearch.request("GET", "/_cat/shards", host=host, alt_hosts=alt_hosts) + @staticmethod + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + reraise=True, + ) + def indices( + opensearch: OpenSearchDistribution, + host: Optional[str] = None, + alt_hosts: Optional[List[str]] = None, + ) -> List[Dict[str, str]]: + """Get all shards of all indexes in the cluster.""" + idx = opensearch.request("GET", "/_cat/indices", host=host, alt_hosts=alt_hosts) + idx = {} + for index in opensearch.request("GET", "/_cat/indices", host=host, alt_hosts=alt_hosts): + idx[index["index"]] = {"health": index["health"], "status": index["status"]} + return idx + @staticmethod def shards_by_state( opensearch: OpenSearchDistribution, diff --git a/lib/charms/opensearch/v0/opensearch_backups.py b/lib/charms/opensearch/v0/opensearch_backups.py index eb1ba198a..bfc2fd03b 100644 --- a/lib/charms/opensearch/v0/opensearch_backups.py +++ b/lib/charms/opensearch/v0/opensearch_backups.py @@ -10,31 +10,23 @@ and responses. The OpenSearchBackupPlugin holds the configuration info. The classes together manages the events related to backup/restore cycles. -The setup of s3-repository happens in two phases: (1) at credentials-changed event, where -the backup configuration is made in opensearch.yml and the opensearch-keystore; (2) when -the first action is requested and the actual registration of the repo takes place. - -That needs to be separated in two phases as the configuration itself will demand a restart, -before configuring the actual snapshot repo is possible in OpenSearch. - -The removal of backup only reverses step (1), to avoid accidentally deleting the existing -snapshots in the S3 repo. - +The removal of backup only reverses step the API calls, to avoid accidentally deleting the +existing snapshots in the S3 repo. The main class to interact with is the OpenSearchBackup. This class will observe the s3 relation and backup-related actions. -OpenSearchBackup finishes the config of the backup service once opensearch.yml has bee -set/unset and a restart has been applied. That means, in the case s3 has been related, +OpenSearchBackup finishes the config of the backup service once has been set/unset and a +restart has been applied. That means, in the case s3 has been related, this class will apply the new configuration to opensearch.yml and keystore, then issue a restart event. After the restart has been successful and if unit is leader: execute the API calls to setup the backup. - A charm implementing this class must setup the following: --> metadata.yaml ... + s3-credentials: interface: s3 limit: 1 @@ -42,11 +34,11 @@ --> main charm file ... + from charms.opensearch.v0.opensearch_backups import OpenSearchBackup class OpenSearchBaseCharm(CharmBase): - def __init__(...): ... self.backup = OpenSearchBackup(self) @@ -54,15 +46,17 @@ def __init__(...): import json import logging -from typing import Any, Dict +from typing import Any, Dict, List, Set, Tuple import requests from charms.data_platform_libs.v0.s3 import S3Requirer +from charms.opensearch.v0.helper_cluster import ClusterState from charms.opensearch.v0.helper_enums import BaseStrEnum from charms.opensearch.v0.opensearch_exceptions import ( OpenSearchError, OpenSearchHttpError, ) +from charms.opensearch.v0.opensearch_internal_data import Scope from charms.opensearch.v0.opensearch_plugins import ( OpenSearchBackupPlugin, OpenSearchPluginEventScope, @@ -72,7 +66,7 @@ def __init__(...): from ops.charm import ActionEvent from ops.framework import EventBase, Object from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, WaitingStatus -from tenacity import RetryError +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed # The unique Charmhub library identifier, never change it LIBID = "d301deee4d2c4c1b8e30cd3df8034be2" @@ -94,10 +88,17 @@ def __init__(...): S3_REPO_BASE_PATH = "/" +INDICES_TO_EXCLUDE_AT_RESTORE = { + ".opendistro_security", + ".opensearch-observability", + ".opensearch-sap-log-types-config", + ".opensearch-sap-pre-packaged-rules-config", +} REPO_NOT_CREATED_ERR = "repository type [s3] does not exist" REPO_NOT_ACCESS_ERR = f"[{S3_REPOSITORY}] path [{S3_REPO_BASE_PATH}] is not accessible" REPO_CREATING_ERR = "Could not determine repository generation from root blobs" +RESTORE_OPEN_INDEX_WITH_SAME_NAME = "because an open index with same name already exists" class OpenSearchBackupError(OpenSearchError): @@ -121,6 +122,9 @@ class BackupServiceState(BaseStrEnum): REPO_S3_UNREACHABLE = "repository s3 is unreachable" ILLEGAL_ARGUMENT = "request contained wrong argument" SNAPSHOT_MISSING = "snapshot not found" + SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED = ( + "cannot restore, indices with same name are still open" + ) SNAPSHOT_RESTORE_ERROR = "restore of snapshot failed" SNAPSHOT_IN_PROGRESS = "snapshot in progress" SNAPSHOT_PARTIALLY_TAKEN = "snapshot partial: at least one shard missing" @@ -143,32 +147,176 @@ def __init__(self, charm: Object): ) self.framework.observe(self.charm.on.create_backup_action, self._on_create_backup_action) self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups_action) + self.framework.observe(self.charm.on.restore_action, self._on_restore_backup_action) @property def _plugin_status(self): return self.charm.plugin_manager.get_plugin_status(OpenSearchBackupPlugin) + def _format_backup_list(self, backup_list: List[Tuple[Any]]) -> str: + """Formats provided list of backups as a table.""" + backups = [ + "{:<10s} | {:<12s} | {:s}".format(" backup-id ", "backup-type", "backup-status") + ] + backups.append("-" * len(backups[0])) + + import math + + for backup_id, backup_type, backup_status in backup_list: + tab = " " * math.floor((10 - len(str(backup_id))) / 2) + backups.append( + "{:<10s} | {:<12s} | {:s}".format(f"{tab}{backup_id}", backup_type, backup_status) + ) + return "\n".join(backups) + + def _generate_backup_list_output(self, backups: Dict[str, Any]) -> str: + """Generates a list of backups in a formatted table. + + List contains successful and failed backups in order of ascending time. + + Raises: + OpenSearchError: if the list of backups errors + """ + backup_list = [] + for id, backup in backups: + state = self.get_snapshot_status(backup["state"]) + backup_list.append( + ( + id, + "physical", + str(state) if state != BackupServiceState.SUCCESS else "finished", + ) + ) + return self._format_backup_list(backup_list) + def _on_list_backups_action(self, event: ActionEvent) -> None: """Returns the list of available backups to the user.""" - if not self._check_action_can_run(event): - event.fail("Failed: backup service is not configured yet") - return backups = {} try: - backups = self._fetch_snapshots() - event.set_results({"snapshots": (json.dumps(backups)).replace("_", "-")}) + backups = self._list_backups() except OpenSearchError as e: event.fail( f"List backups action failed - {str(e)} - check the application logs for the full stack trace." ) + if event.params.get("output").lower() == "json": + event.set_results({"backups": (json.dumps(backups)).replace("_", "-")}) + elif event.params.get("output").lower() == "table": + event.set_results({"backups": self._generate_backup_list_output(backups)}) + else: + event.fail("Failed: invalid output format, must be either json or table") - def _on_create_backup_action(self, event: ActionEvent) -> None: - """Creates a backup from the current cluster.""" - if not self.charm.unit.is_leader(): - event.fail("Failed: the action can be run only on leader unit") + def _restore_and_try_close_indices_if_needed( + self, backup_id: int + ) -> Tuple[Dict[str, Any], Set[str]]: + """Restores and processes any exception related to running indices. + + Closing option is preferable, as the restore action as a whole may not succeed. + This method returns the final output and list of closed indices. The calling method + should also check the output for any other error messages, as a more generic error + (i.e. non-closing index error) will pass by as much as a successful restore. + """ + closed_idx = set() + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(wait=30)): + with attempt: + # First, for overlapping indices + backup_indices = self._list_backups()[backup_id]["indices"] + to_close = [ + index + for index, state in ClusterState.indices(self.charm.opensearch).items() + if ( + index in backup_indices + and state["status"] != "close" + and index not in INDICES_TO_EXCLUDE_AT_RESTORE + ) + ] + # Try closing each index + for index in to_close: + # Try closing the index + o = self._request("POST", f"{index}/_close") + logger.debug( + f"_restore_and_try_close_indices_if_needed: request closing {index} returned {o}" + ) + closed_idx.add(index) + + output = self._request( + "POST", + f"_snapshot/{S3_REPOSITORY}/{backup_id}/_restore?wait_for_completion=false", + payload={ + "indices": ",".join( + [ + f"-{idx}" + for idx in INDICES_TO_EXCLUDE_AT_RESTORE & set(backup_indices) + ] + ), + "ignore_unavailable": False, + "include_global_state": False, + "include_aliases": True, + "partial": False, + }, + ) + logger.debug( + f"_restore_and_try_close_indices_if_needed retrying, output is: {output}" + ) + if ( + self.get_service_status(output) + == BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED + ): + # The .opensearch-sap-log-types-config, for example does not show + # in the _cat/indices but does block a restore operation. + to_close = output["error"]["reason"].split("[")[2].split("]")[0] + o = self._request("POST", f"{index}/_close") + logger.debug( + f"_restore_and_try_close_indices_if_needed: request closing {index} returned {o}" + ) + closed_idx.add(index) + # We still have non closed indices, try again + raise Exception() + # If status returns success, ensure all indices have been updated + # otherwise, raise an assert error and retry + shard_status = output["snapshot"]["shards"] + assert shard_status["total"] == shard_status["successful"] + except RetryError: + logger.error("_restore_and_try_close_indices_if_needed: fail all retries") + return output, closed_idx + + def _on_restore_backup_action(self, event: ActionEvent) -> None: # noqa: C901 + """Restores a backup to the current cluster.""" + if not self._can_unit_perform_backup(event): + event.fail("Failed: backup service is not configured yet") + return + + backup_id = str(event.params.get("backup-id")) + + # Restore will try to close indices if there is a matching name. + # The goal is to leave the cluster in a running state, even if the restore fails. + # In case of failure, then restore action must return a list of closed indices + closed_idx = set() + try: + backups = self._list_backups() + if backup_id not in backups.keys(): + event.fail(f"Failed: no backup-id {backup_id}, options available: {backups}") + return + backup_id_state = self.get_snapshot_status(backups[backup_id]["state"]) + if backup_id_state != BackupServiceState.SUCCESS: + event.fail( + f"Failed: no backup-id {backup_id} not successful, state is: {backup_id_state}" + ) + return + output, closed_idx = self._restore_and_try_close_indices_if_needed(backup_id) + logger.debug(f"Restore action: received response: {output}") + logger.info(f"Restore action succeeded for backup_id {backup_id}") + except OpenSearchListBackupError as e: + event.fail(f"Failed: {e}, closed indices: {closed_idx}") return + if self.get_service_status(output) != BackupServiceState.SUCCESS: + status = str(self.get_service_status(output)) + event.fail(f"Failed with: {status}, closed indices: {closed_idx}") + event.set_results({"state": "successful restore!", "closed-indices": str(closed_idx)}) - if not self._check_action_can_run(event): + def _on_create_backup_action(self, event: ActionEvent) -> None: + """Creates a backup from the current cluster.""" + if not self._can_unit_perform_backup(event): event.fail("Failed: backup service is not configured yet") return @@ -179,7 +327,7 @@ def _on_create_backup_action(self, event: ActionEvent) -> None: event.fail("Backup still in progress: aborting this request...") return # Increment by 1 the latest snapshot_id (set to 0 if no snapshot was previously made) - new_backup_id = int(max(self._fetch_snapshots().keys() or [0])) + 1 + new_backup_id = int(max(self._list_backups().keys() or [0])) + 1 logger.debug( f"Create backup action request id {new_backup_id} response is:" + self.get_service_status( @@ -208,7 +356,7 @@ def _on_create_backup_action(self, event: ActionEvent) -> None: return event.set_results({"backup-id": new_backup_id, "status": "Backup is running."}) - def _check_action_can_run(self, event: ActionEvent) -> bool: + def _can_unit_perform_backup(self, event: ActionEvent) -> bool: """Checks if the actions run from this unit can be executed or not. If not, then register the reason as a failure in the event and returns False. @@ -229,11 +377,15 @@ def _check_action_can_run(self, event: ActionEvent) -> bool: return False return True - def _fetch_snapshots(self) -> Dict[int, str]: + def _list_backups(self) -> Dict[int, str]: """Returns a mapping of snapshot ids / state.""" response = self._request("GET", f"_snapshot/{S3_REPOSITORY}/_all") return { - snapshot["snapshot"]: snapshot["state"] for snapshot in response.get("snapshots", []) + snapshot["snapshot"]: { + "state": snapshot["state"], + "indices": snapshot.get("indices", []), + } + for snapshot in response.get("snapshots", []) } def is_backup_in_progress(self, backup_id=None) -> bool: @@ -258,7 +410,7 @@ def _on_s3_credentials_changed(self, event: EventBase) -> None: 3) If the plugin is not enabled, then defer the event 4) Send the API calls to setup the backup service """ - if not self._is_s3_fully_configured(): + if not self.can_use_s3_repository(): # Always check if a relation actually exists and if options are available # in this case, seems one of the conditions above is not yet present # abandon this restart event, as it will be called later once s3 configuration @@ -295,11 +447,9 @@ def _on_s3_credentials_changed(self, event: EventBase) -> None: if not self.charm.unit.is_leader(): # Plugin is configured locally for this unit. Now the leader proceed. - self.charm.status.clear(ActiveStatus()) + self.charm.status.set(ActiveStatus()) return - self.apply_api_config_if_needed() - self.charm.status.clear(ActiveStatus()) def apply_api_config_if_needed(self) -> None: """Runs the post restart routine and API calls needed to setup/disable backup. @@ -313,13 +463,12 @@ def apply_api_config_if_needed(self) -> None: # (2) run the request; and state = self._register_snapshot_repo() # (3) based on the response, set the message status - if state == BackupServiceState.SUCCESS: - self.charm.status.set(ActiveStatus("Backup service configured")) - else: + if state != BackupServiceState.SUCCESS: self.charm.status.set(BlockedStatus(f"Backup setup failed with {state}")) - self.charm.status.clear(ActiveStatus()) + else: + self.charm.status.set(ActiveStatus()) - def _on_s3_broken(self, event: EventBase) -> None: + def _on_s3_broken(self, event: EventBase) -> None: # noqa: C901 """Processes the broken s3 relation. It runs the reverse process of on_s3_change: @@ -327,10 +476,25 @@ def _on_s3_broken(self, event: EventBase) -> None: and defer this event. 2) If leader, run API calls to signal disable is needed """ - if self.charm.model.get_relation(S3_RELATION).units: + if ( + self.charm.model.get_relation(S3_RELATION) + and self.charm.model.get_relation(S3_RELATION).units + ): # There are still members in the relation, defer until it is finished + # Make a relation-change in the peer relation, so it triggers this unit back + counter = self.charm.peers_data.get(Scope.UNIT, "s3_broken") + if counter: + self.charm.peers_data.put(Scope.UNIT, "s3_broken", counter + 1) + else: + self.charm.peers_data.put(Scope.UNIT, "s3_broken", 1) event.defer() return + + # Second part of this work-around + if self.charm.peers_data.get(Scope.UNIT, "s3_broken"): + # Now, we can delete it + self.charm.peers_data.delete(Scope.UNIT, "s3_broken") + self.charm.status.set(MaintenanceStatus("Disabling backup service...")) snapshot_status = self._check_snapshot_status() if snapshot_status in [ @@ -338,7 +502,9 @@ def _on_s3_broken(self, event: EventBase) -> None: ]: # 1) snapshot is either in progress or partially taken: block and defer this event self.charm.status.set( - BlockedStatus(f"Disabling backup not possible: {snapshot_status}") + MaintenanceStatus( + f"Disabling backup postponed until backup in progress: {snapshot_status}" + ) ) event.defer() return @@ -349,7 +515,8 @@ def _on_s3_broken(self, event: EventBase) -> None: "Snapshot has been partially taken, but not completed. Continuing with relation removal..." ) - # Run the check here, as we want all the units to keep deferring the event if needed. + # Run the check here, instead of the start of this hook, as we want all the + # units to keep deferring the event if needed. # That avoids a condition where we have: # 1) A long snapshot is taking place # 2) Relation is removed @@ -384,7 +551,7 @@ def _on_s3_broken(self, event: EventBase) -> None: self.charm.plugin_manager.reset_event_scope() return self.charm.plugin_manager.reset_event_scope() - self.charm.status.clear(ActiveStatus()) + self.charm.status.set(ActiveStatus()) def _execute_s3_broken_calls(self): """Executes the s3 broken API calls.""" @@ -402,9 +569,23 @@ def _check_snapshot_status(self) -> BackupServiceState: except (ValueError, OpenSearchHttpError, requests.HTTPError): return BackupServiceState.RESPONSE_FAILED_NETWORK + def _get_endpoint_protocol(self, endpoint: str) -> str: + """Returns the protocol based on the endpoint.""" + if endpoint.startswith("http://"): + return "http" + if endpoint.startswith("https://"): + return "https" + return "https" + def _register_snapshot_repo(self) -> BackupServiceState: """Registers the snapshot repo in the cluster.""" info = self.s3_client.get_s3_connection_info() + extra_settings = {} + if info.get("region"): + extra_settings["region"] = info.get("region") + if info.get("storage-class"): + extra_settings["storage_class"] = info.get("storage-class") + return self.get_service_status( self._request( "PUT", @@ -412,17 +593,17 @@ def _register_snapshot_repo(self) -> BackupServiceState: payload={ "type": "s3", "settings": { - "region": info.get("region"), "endpoint": info.get("endpoint"), - "protocol": "http", # TODO: roll back to https once we have the cert + "protocol": self._get_endpoint_protocol(info.get("endpoint")), "bucket": info["bucket"], "base_path": info.get("path", S3_REPO_BASE_PATH), + **extra_settings, }, }, ) ) - def _is_s3_fully_configured(self) -> bool: + def can_use_s3_repository(self) -> bool: """Checks if relation is set and all configs needed are present. The get_s3_connection_info() checks if the relation is present, and if yes, @@ -433,7 +614,7 @@ def _is_s3_fully_configured(self) -> bool: """ missing_s3_configs = [ config - for config in ["region", "bucket", "access-key", "secret-key"] + for config in ["bucket", "endpoint", "access-key", "secret-key"] if config not in self.s3_client.get_s3_connection_info() ] if missing_s3_configs: @@ -510,6 +691,8 @@ def get_service_status(self, response: Dict[str, Any]) -> BackupServiceState: # return BackupServiceState.ILLEGAL_ARGUMENT if type == "snapshot_missing_exception": return BackupServiceState.SNAPSHOT_MISSING + if type == "snapshot_restore_exception" and RESTORE_OPEN_INDEX_WITH_SAME_NAME in reason: + return BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED if type == "snapshot_restore_exception": return BackupServiceState.SNAPSHOT_RESTORE_ERROR if ( diff --git a/tests/integration/ha/test_backups.py b/tests/integration/ha/test_backups.py index ccb34b635..81219fde6 100644 --- a/tests/integration/ha/test_backups.py +++ b/tests/integration/ha/test_backups.py @@ -29,6 +29,7 @@ get_application_unit_ids_ips, get_leader_unit_id, get_leader_unit_ip, + http_request, run_action, ) from tests.integration.tls.test_tls import TLS_CERTIFICATES_APP_NAME @@ -37,6 +38,7 @@ S3_INTEGRATOR_NAME = "s3-integrator" +TEST_BACKUP_DOC_ID = 10 CLOUD_CONFIGS = { "aws": { "endpoint": "https://s3.amazonaws.com", @@ -97,6 +99,9 @@ # bucket_object.delete() +TEST_BACKUP_INDEX = "test_backup_index" + + @pytest.fixture() async def c_writes(ops_test: OpsTest): """Creates instance of the ContinuousWrites.""" @@ -200,13 +205,11 @@ async def test_backup_cluster( units = await get_application_unit_ids_ips(ops_test, app=app) leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) - # create index with r_shards = nodes - 1 - test_backup_index = "test_backup_index" - await create_index(ops_test, app, leader_unit_ip, test_backup_index, r_shards=len(units) - 1) + await create_index(ops_test, app, leader_unit_ip, TEST_BACKUP_INDEX, r_shards=len(units) - 1) # index document - doc_id = 10 - await index_doc(ops_test, app, leader_unit_ip, test_backup_index, doc_id) + doc_id = TEST_BACKUP_DOC_ID + await index_doc(ops_test, app, leader_unit_ip, TEST_BACKUP_INDEX, doc_id) # check that the doc can be retrieved from any node logger.info("Test backup index: searching") @@ -215,13 +218,13 @@ async def test_backup_cluster( ops_test, app, u_ip, - test_backup_index, + TEST_BACKUP_INDEX, query={"query": {"term": {"_id": doc_id}}}, preference="_only_local", ) # Validate the index and document are present assert len(docs) == 1 - assert docs[0]["_source"] == default_doc(test_backup_index, doc_id) + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) leader_id = await get_leader_unit_id(ops_test, app) @@ -230,16 +233,181 @@ async def test_backup_cluster( assert action.status == "completed" - list_backups = await run_action(ops_test, leader_id, "list-backups") + list_backups = await run_action(ops_test, leader_id, "list-backups", params={"output": "json"}) logger.info(f"list-backups output: {list_backups}") # Expected format: - # namespace(status='completed', response={'return-code': 0, 'snapshots': '{"1": "SUCCESS"}'}) + # namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'}) + backups = json.loads(list_backups.response["backups"]) assert list_backups.status == "completed" - assert len(json.loads(list_backups.response["snapshots"])) == int(action.response["backup-id"]) - assert ( - json.loads(list_backups.response["snapshots"])[action.response["backup-id"]] == "SUCCESS" - ) + assert len(backups.keys()) == int(action.response["backup-id"]) + assert backups[action.response["backup-id"]]["state"] == "SUCCESS" # continuous writes checks await assert_continuous_writes_consistency(ops_test, c_writes, app) + + +@pytest.mark.abort_on_fail +async def test_restore_cluster( + ops_test: OpsTest, +) -> None: + """Deletes the TEST_BACKUP_INDEX, restores the cluster and tries to search for index.""" + app = (await app_name(ops_test)) or APP_NAME + + units = await get_application_unit_ids_ips(ops_test, app=app) + leader_id = await get_leader_unit_id(ops_test, app) + leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) + + await http_request( + ops_test, + "DELETE", + f"https://{leader_unit_ip}:9200/{TEST_BACKUP_INDEX}", + app=app, + ) + + action = await run_action(ops_test, leader_id, "restore", params={"backup-id": 1}) + logger.info(f"restore output: {action}") + assert action.status == "completed" + + # index document + doc_id = TEST_BACKUP_DOC_ID + # check that the doc can be retrieved from any node + logger.info("Test backup index: searching") + for u_id, u_ip in units.items(): + docs = await search( + ops_test, + app, + u_ip, + TEST_BACKUP_INDEX, + query={"query": {"term": {"_id": doc_id}}}, + preference="_only_local", + ) + # Validate the index and document are present + assert len(docs) == 1 + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) + + +@pytest.mark.abort_on_fail +async def test_restore_cluster_after_app_destroyed(ops_test: OpsTest) -> None: + """Deletes the entire OpenSearch cluster and redeploys from scratch. + + Restores the backup and then checks if the same TEST_BACKUP_INDEX is there. + """ + app = (await app_name(ops_test)) or APP_NAME + await ops_test.model.remove_application(app, block_until_done=True) + app_num_units = int(os.environ.get("TEST_NUM_APP_UNITS", None) or 3) + my_charm = await ops_test.build_charm(".") + # Redeploy + await asyncio.gather( + ops_test.model.deploy(my_charm, num_units=app_num_units, series=SERIES), + ) + # Relate it to OpenSearch to set up TLS. + await ops_test.model.relate(APP_NAME, TLS_CERTIFICATES_APP_NAME) + await ops_test.model.relate(APP_NAME, S3_INTEGRATOR_NAME) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1400, + idle_period=IDLE_PERIOD, + ) + + units = await get_application_unit_ids_ips(ops_test, app=app) + leader_id = await get_leader_unit_id(ops_test, app) + + action = await run_action(ops_test, leader_id, "restore", params={"backup-id": 1}) + logger.info(f"restore output: {action}") + assert action.status == "completed" + + # index document + doc_id = TEST_BACKUP_DOC_ID + # check that the doc can be retrieved from any node + logger.info("Test backup index: searching") + for u_id, u_ip in units.items(): + docs = await search( + ops_test, + app, + u_ip, + TEST_BACKUP_INDEX, + query={"query": {"term": {"_id": doc_id}}}, + preference="_only_local", + ) + # Validate the index and document are present + assert len(docs) == 1 + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) + + +@pytest.mark.abort_on_fail +async def test_remove_and_readd_s3_relation(ops_test: OpsTest) -> None: + """Removes and re-adds the s3-credentials relation to test backup and restore.""" + app = (await app_name(ops_test)) or APP_NAME + units = await get_application_unit_ids_ips(ops_test, app=app) + leader_id = await get_leader_unit_id(ops_test, app) + leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) + + logger.info("Remove s3-credentials relation") + # Remove relation + await ops_test.model.applications[APP_NAME].destroy_relation( + "s3-credentials", f"{S3_INTEGRATOR_NAME}:s3-credentials" + ) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1400, + idle_period=IDLE_PERIOD, + ) + + logger.info("Re-add s3-credentials relation") + await ops_test.model.relate(APP_NAME, S3_INTEGRATOR_NAME) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1400, + idle_period=IDLE_PERIOD, + ) + + leader_id = await get_leader_unit_id(ops_test, app) + + action = await run_action(ops_test, leader_id, "create-backup") + logger.info(f"create-backup output: {action}") + + assert action.status == "completed" + + list_backups = await run_action(ops_test, leader_id, "list-backups", params={"output": "json"}) + logger.info(f"list-backups output: {list_backups}") + + # Expected format: + # namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'}) + backups = json.loads(list_backups.response["backups"]) + assert list_backups.status == "completed" + assert len(backups.keys()) == int(action.response["backup-id"]) + assert backups[action.response["backup-id"]]["state"] == "SUCCESS" + + await http_request( + ops_test, + "DELETE", + f"https://{leader_unit_ip}:9200/{TEST_BACKUP_INDEX}", + app=app, + ) + + action = await run_action( + ops_test, leader_id, "restore", params={"backup-id": int(action.response["backup-id"])} + ) + logger.info(f"restore-backup output: {action}") + assert action.status == "completed" + + # index document + doc_id = TEST_BACKUP_DOC_ID + # check that the doc can be retrieved from any node + logger.info("Test backup index: searching") + for u_id, u_ip in units.items(): + docs = await search( + ops_test, + app, + u_ip, + TEST_BACKUP_INDEX, + query={"query": {"term": {"_id": doc_id}}}, + preference="_only_local", + ) + # Validate the index and document are present + assert len(docs) == 1 + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) diff --git a/tests/unit/lib/test_backups.py b/tests/unit/lib/test_backups.py index 5519a4365..f4019ada9 100644 --- a/tests/unit/lib/test_backups.py +++ b/tests/unit/lib/test_backups.py @@ -57,6 +57,12 @@ def setUp(self) -> None: self.harness.add_relation_unit(self.s3_rel_id, "s3-integrator/0") mock_pm_run.assert_not_called() + def test_get_endpoint_protocol(self) -> None: + """Tests the get_endpoint_protocol method.""" + assert self.charm.backup._get_endpoint_protocol("http://10.0.0.1:8000") == "http" + assert self.charm.backup._get_endpoint_protocol("https://10.0.0.2:8000") == "https" + assert self.charm.backup._get_endpoint_protocol("test.not-valid-url") == "https" + @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup.apply_api_config_if_needed") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager._apply_config") @@ -91,9 +97,10 @@ def test_00_update_relation_data(self, __, mock_apply_config, _, mock_status) -> ).__dict__ ) + @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._request") @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.request") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") - def test_01_apply_api_config_if_needed(self, mock_status, mock_request) -> None: + def test_01_apply_api_config_if_needed(self, mock_status, _, mock_request) -> None: """Tests the application of post-restart steps.""" self.harness.update_relation_data( self.s3_rel_id, @@ -110,15 +117,18 @@ def test_01_apply_api_config_if_needed(self, mock_status, mock_request) -> None: ) mock_status.return_value = PluginState.ENABLED self.charm.backup.apply_api_config_if_needed() - mock_request.called_once_with("GET", f"_snapshot/{S3_REPOSITORY}") - mock_request.called_once_with( + mock_request.assert_called_with( "PUT", f"_snapshot/{S3_REPOSITORY}", payload={ "type": "s3", "settings": { + "endpoint": "localhost", + "protocol": "https", "bucket": TEST_BUCKET_NAME, "base_path": TEST_BASE_PATH, + "region": "testing-region", + "storage_class": "storageclass", }, }, ) @@ -128,7 +138,7 @@ def test_01_apply_api_config_if_needed(self, mock_status, mock_request) -> None: @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.request") @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._execute_s3_broken_calls") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") - def test_99_relation_broken( + def test_20_relation_broken( self, mock_status, mock_execute_s3_broken_calls, @@ -144,6 +154,7 @@ def test_99_relation_broken( {"SUCCESS"}, ] mock_status.return_value = PluginState.ENABLED + self.harness.remove_relation_unit(self.s3_rel_id, "s3-integrator/0") self.harness.remove_relation(self.s3_rel_id) mock_request.called_once_with("GET", "/_snapshot/_status") mock_execute_s3_broken_calls.assert_called_once()