diff --git a/actions.yaml b/actions.yaml index f2a0d54ab..9b3c3607e 100644 --- a/actions.yaml +++ b/actions.yaml @@ -45,3 +45,21 @@ create-backup: list-backups: description: List available backup_ids in the S3 bucket and path provided by the S3 integrator charm. + params: + output: + type: string + default: "table" + description: | + Format which the data should be returned. Possible values: table, json. + The json format will bring more details, such as shard status, indices name, etc. + +restore: + description: Restore a database backup. + S3 credentials are retrieved from a relation with the S3 integrator charm. + params: + backup-id: + type: integer + description: | + A backup-id to identify the backup to restore. Format: + required: + - backup-id diff --git a/lib/charms/opensearch/v0/helper_cluster.py b/lib/charms/opensearch/v0/helper_cluster.py index 295da04c3..f22fe909e 100644 --- a/lib/charms/opensearch/v0/helper_cluster.py +++ b/lib/charms/opensearch/v0/helper_cluster.py @@ -255,6 +255,24 @@ def shards( """Get all shards of all indexes in the cluster.""" return opensearch.request("GET", "/_cat/shards", host=host, alt_hosts=alt_hosts) + @staticmethod + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + reraise=True, + ) + def indices( + opensearch: OpenSearchDistribution, + host: Optional[str] = None, + alt_hosts: Optional[List[str]] = None, + ) -> List[Dict[str, str]]: + """Get all shards of all indexes in the cluster.""" + idx = opensearch.request("GET", "/_cat/indices", host=host, alt_hosts=alt_hosts) + idx = {} + for index in opensearch.request("GET", "/_cat/indices", host=host, alt_hosts=alt_hosts): + idx[index["index"]] = {"health": index["health"], "status": index["status"]} + return idx + @staticmethod def shards_by_state( opensearch: OpenSearchDistribution, diff --git a/lib/charms/opensearch/v0/opensearch_backups.py b/lib/charms/opensearch/v0/opensearch_backups.py index eb1ba198a..bfc2fd03b 100644 --- a/lib/charms/opensearch/v0/opensearch_backups.py +++ b/lib/charms/opensearch/v0/opensearch_backups.py @@ -10,31 +10,23 @@ and responses. The OpenSearchBackupPlugin holds the configuration info. The classes together manages the events related to backup/restore cycles. -The setup of s3-repository happens in two phases: (1) at credentials-changed event, where -the backup configuration is made in opensearch.yml and the opensearch-keystore; (2) when -the first action is requested and the actual registration of the repo takes place. - -That needs to be separated in two phases as the configuration itself will demand a restart, -before configuring the actual snapshot repo is possible in OpenSearch. - -The removal of backup only reverses step (1), to avoid accidentally deleting the existing -snapshots in the S3 repo. - +The removal of backup only reverses step the API calls, to avoid accidentally deleting the +existing snapshots in the S3 repo. The main class to interact with is the OpenSearchBackup. This class will observe the s3 relation and backup-related actions. -OpenSearchBackup finishes the config of the backup service once opensearch.yml has bee -set/unset and a restart has been applied. That means, in the case s3 has been related, +OpenSearchBackup finishes the config of the backup service once has been set/unset and a +restart has been applied. That means, in the case s3 has been related, this class will apply the new configuration to opensearch.yml and keystore, then issue a restart event. After the restart has been successful and if unit is leader: execute the API calls to setup the backup. - A charm implementing this class must setup the following: --> metadata.yaml ... + s3-credentials: interface: s3 limit: 1 @@ -42,11 +34,11 @@ --> main charm file ... + from charms.opensearch.v0.opensearch_backups import OpenSearchBackup class OpenSearchBaseCharm(CharmBase): - def __init__(...): ... self.backup = OpenSearchBackup(self) @@ -54,15 +46,17 @@ def __init__(...): import json import logging -from typing import Any, Dict +from typing import Any, Dict, List, Set, Tuple import requests from charms.data_platform_libs.v0.s3 import S3Requirer +from charms.opensearch.v0.helper_cluster import ClusterState from charms.opensearch.v0.helper_enums import BaseStrEnum from charms.opensearch.v0.opensearch_exceptions import ( OpenSearchError, OpenSearchHttpError, ) +from charms.opensearch.v0.opensearch_internal_data import Scope from charms.opensearch.v0.opensearch_plugins import ( OpenSearchBackupPlugin, OpenSearchPluginEventScope, @@ -72,7 +66,7 @@ def __init__(...): from ops.charm import ActionEvent from ops.framework import EventBase, Object from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, WaitingStatus -from tenacity import RetryError +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed # The unique Charmhub library identifier, never change it LIBID = "d301deee4d2c4c1b8e30cd3df8034be2" @@ -94,10 +88,17 @@ def __init__(...): S3_REPO_BASE_PATH = "/" +INDICES_TO_EXCLUDE_AT_RESTORE = { + ".opendistro_security", + ".opensearch-observability", + ".opensearch-sap-log-types-config", + ".opensearch-sap-pre-packaged-rules-config", +} REPO_NOT_CREATED_ERR = "repository type [s3] does not exist" REPO_NOT_ACCESS_ERR = f"[{S3_REPOSITORY}] path [{S3_REPO_BASE_PATH}] is not accessible" REPO_CREATING_ERR = "Could not determine repository generation from root blobs" +RESTORE_OPEN_INDEX_WITH_SAME_NAME = "because an open index with same name already exists" class OpenSearchBackupError(OpenSearchError): @@ -121,6 +122,9 @@ class BackupServiceState(BaseStrEnum): REPO_S3_UNREACHABLE = "repository s3 is unreachable" ILLEGAL_ARGUMENT = "request contained wrong argument" SNAPSHOT_MISSING = "snapshot not found" + SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED = ( + "cannot restore, indices with same name are still open" + ) SNAPSHOT_RESTORE_ERROR = "restore of snapshot failed" SNAPSHOT_IN_PROGRESS = "snapshot in progress" SNAPSHOT_PARTIALLY_TAKEN = "snapshot partial: at least one shard missing" @@ -143,32 +147,176 @@ def __init__(self, charm: Object): ) self.framework.observe(self.charm.on.create_backup_action, self._on_create_backup_action) self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups_action) + self.framework.observe(self.charm.on.restore_action, self._on_restore_backup_action) @property def _plugin_status(self): return self.charm.plugin_manager.get_plugin_status(OpenSearchBackupPlugin) + def _format_backup_list(self, backup_list: List[Tuple[Any]]) -> str: + """Formats provided list of backups as a table.""" + backups = [ + "{:<10s} | {:<12s} | {:s}".format(" backup-id ", "backup-type", "backup-status") + ] + backups.append("-" * len(backups[0])) + + import math + + for backup_id, backup_type, backup_status in backup_list: + tab = " " * math.floor((10 - len(str(backup_id))) / 2) + backups.append( + "{:<10s} | {:<12s} | {:s}".format(f"{tab}{backup_id}", backup_type, backup_status) + ) + return "\n".join(backups) + + def _generate_backup_list_output(self, backups: Dict[str, Any]) -> str: + """Generates a list of backups in a formatted table. + + List contains successful and failed backups in order of ascending time. + + Raises: + OpenSearchError: if the list of backups errors + """ + backup_list = [] + for id, backup in backups: + state = self.get_snapshot_status(backup["state"]) + backup_list.append( + ( + id, + "physical", + str(state) if state != BackupServiceState.SUCCESS else "finished", + ) + ) + return self._format_backup_list(backup_list) + def _on_list_backups_action(self, event: ActionEvent) -> None: """Returns the list of available backups to the user.""" - if not self._check_action_can_run(event): - event.fail("Failed: backup service is not configured yet") - return backups = {} try: - backups = self._fetch_snapshots() - event.set_results({"snapshots": (json.dumps(backups)).replace("_", "-")}) + backups = self._list_backups() except OpenSearchError as e: event.fail( f"List backups action failed - {str(e)} - check the application logs for the full stack trace." ) + if event.params.get("output").lower() == "json": + event.set_results({"backups": (json.dumps(backups)).replace("_", "-")}) + elif event.params.get("output").lower() == "table": + event.set_results({"backups": self._generate_backup_list_output(backups)}) + else: + event.fail("Failed: invalid output format, must be either json or table") - def _on_create_backup_action(self, event: ActionEvent) -> None: - """Creates a backup from the current cluster.""" - if not self.charm.unit.is_leader(): - event.fail("Failed: the action can be run only on leader unit") + def _restore_and_try_close_indices_if_needed( + self, backup_id: int + ) -> Tuple[Dict[str, Any], Set[str]]: + """Restores and processes any exception related to running indices. + + Closing option is preferable, as the restore action as a whole may not succeed. + This method returns the final output and list of closed indices. The calling method + should also check the output for any other error messages, as a more generic error + (i.e. non-closing index error) will pass by as much as a successful restore. + """ + closed_idx = set() + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(wait=30)): + with attempt: + # First, for overlapping indices + backup_indices = self._list_backups()[backup_id]["indices"] + to_close = [ + index + for index, state in ClusterState.indices(self.charm.opensearch).items() + if ( + index in backup_indices + and state["status"] != "close" + and index not in INDICES_TO_EXCLUDE_AT_RESTORE + ) + ] + # Try closing each index + for index in to_close: + # Try closing the index + o = self._request("POST", f"{index}/_close") + logger.debug( + f"_restore_and_try_close_indices_if_needed: request closing {index} returned {o}" + ) + closed_idx.add(index) + + output = self._request( + "POST", + f"_snapshot/{S3_REPOSITORY}/{backup_id}/_restore?wait_for_completion=false", + payload={ + "indices": ",".join( + [ + f"-{idx}" + for idx in INDICES_TO_EXCLUDE_AT_RESTORE & set(backup_indices) + ] + ), + "ignore_unavailable": False, + "include_global_state": False, + "include_aliases": True, + "partial": False, + }, + ) + logger.debug( + f"_restore_and_try_close_indices_if_needed retrying, output is: {output}" + ) + if ( + self.get_service_status(output) + == BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED + ): + # The .opensearch-sap-log-types-config, for example does not show + # in the _cat/indices but does block a restore operation. + to_close = output["error"]["reason"].split("[")[2].split("]")[0] + o = self._request("POST", f"{index}/_close") + logger.debug( + f"_restore_and_try_close_indices_if_needed: request closing {index} returned {o}" + ) + closed_idx.add(index) + # We still have non closed indices, try again + raise Exception() + # If status returns success, ensure all indices have been updated + # otherwise, raise an assert error and retry + shard_status = output["snapshot"]["shards"] + assert shard_status["total"] == shard_status["successful"] + except RetryError: + logger.error("_restore_and_try_close_indices_if_needed: fail all retries") + return output, closed_idx + + def _on_restore_backup_action(self, event: ActionEvent) -> None: # noqa: C901 + """Restores a backup to the current cluster.""" + if not self._can_unit_perform_backup(event): + event.fail("Failed: backup service is not configured yet") + return + + backup_id = str(event.params.get("backup-id")) + + # Restore will try to close indices if there is a matching name. + # The goal is to leave the cluster in a running state, even if the restore fails. + # In case of failure, then restore action must return a list of closed indices + closed_idx = set() + try: + backups = self._list_backups() + if backup_id not in backups.keys(): + event.fail(f"Failed: no backup-id {backup_id}, options available: {backups}") + return + backup_id_state = self.get_snapshot_status(backups[backup_id]["state"]) + if backup_id_state != BackupServiceState.SUCCESS: + event.fail( + f"Failed: no backup-id {backup_id} not successful, state is: {backup_id_state}" + ) + return + output, closed_idx = self._restore_and_try_close_indices_if_needed(backup_id) + logger.debug(f"Restore action: received response: {output}") + logger.info(f"Restore action succeeded for backup_id {backup_id}") + except OpenSearchListBackupError as e: + event.fail(f"Failed: {e}, closed indices: {closed_idx}") return + if self.get_service_status(output) != BackupServiceState.SUCCESS: + status = str(self.get_service_status(output)) + event.fail(f"Failed with: {status}, closed indices: {closed_idx}") + event.set_results({"state": "successful restore!", "closed-indices": str(closed_idx)}) - if not self._check_action_can_run(event): + def _on_create_backup_action(self, event: ActionEvent) -> None: + """Creates a backup from the current cluster.""" + if not self._can_unit_perform_backup(event): event.fail("Failed: backup service is not configured yet") return @@ -179,7 +327,7 @@ def _on_create_backup_action(self, event: ActionEvent) -> None: event.fail("Backup still in progress: aborting this request...") return # Increment by 1 the latest snapshot_id (set to 0 if no snapshot was previously made) - new_backup_id = int(max(self._fetch_snapshots().keys() or [0])) + 1 + new_backup_id = int(max(self._list_backups().keys() or [0])) + 1 logger.debug( f"Create backup action request id {new_backup_id} response is:" + self.get_service_status( @@ -208,7 +356,7 @@ def _on_create_backup_action(self, event: ActionEvent) -> None: return event.set_results({"backup-id": new_backup_id, "status": "Backup is running."}) - def _check_action_can_run(self, event: ActionEvent) -> bool: + def _can_unit_perform_backup(self, event: ActionEvent) -> bool: """Checks if the actions run from this unit can be executed or not. If not, then register the reason as a failure in the event and returns False. @@ -229,11 +377,15 @@ def _check_action_can_run(self, event: ActionEvent) -> bool: return False return True - def _fetch_snapshots(self) -> Dict[int, str]: + def _list_backups(self) -> Dict[int, str]: """Returns a mapping of snapshot ids / state.""" response = self._request("GET", f"_snapshot/{S3_REPOSITORY}/_all") return { - snapshot["snapshot"]: snapshot["state"] for snapshot in response.get("snapshots", []) + snapshot["snapshot"]: { + "state": snapshot["state"], + "indices": snapshot.get("indices", []), + } + for snapshot in response.get("snapshots", []) } def is_backup_in_progress(self, backup_id=None) -> bool: @@ -258,7 +410,7 @@ def _on_s3_credentials_changed(self, event: EventBase) -> None: 3) If the plugin is not enabled, then defer the event 4) Send the API calls to setup the backup service """ - if not self._is_s3_fully_configured(): + if not self.can_use_s3_repository(): # Always check if a relation actually exists and if options are available # in this case, seems one of the conditions above is not yet present # abandon this restart event, as it will be called later once s3 configuration @@ -295,11 +447,9 @@ def _on_s3_credentials_changed(self, event: EventBase) -> None: if not self.charm.unit.is_leader(): # Plugin is configured locally for this unit. Now the leader proceed. - self.charm.status.clear(ActiveStatus()) + self.charm.status.set(ActiveStatus()) return - self.apply_api_config_if_needed() - self.charm.status.clear(ActiveStatus()) def apply_api_config_if_needed(self) -> None: """Runs the post restart routine and API calls needed to setup/disable backup. @@ -313,13 +463,12 @@ def apply_api_config_if_needed(self) -> None: # (2) run the request; and state = self._register_snapshot_repo() # (3) based on the response, set the message status - if state == BackupServiceState.SUCCESS: - self.charm.status.set(ActiveStatus("Backup service configured")) - else: + if state != BackupServiceState.SUCCESS: self.charm.status.set(BlockedStatus(f"Backup setup failed with {state}")) - self.charm.status.clear(ActiveStatus()) + else: + self.charm.status.set(ActiveStatus()) - def _on_s3_broken(self, event: EventBase) -> None: + def _on_s3_broken(self, event: EventBase) -> None: # noqa: C901 """Processes the broken s3 relation. It runs the reverse process of on_s3_change: @@ -327,10 +476,25 @@ def _on_s3_broken(self, event: EventBase) -> None: and defer this event. 2) If leader, run API calls to signal disable is needed """ - if self.charm.model.get_relation(S3_RELATION).units: + if ( + self.charm.model.get_relation(S3_RELATION) + and self.charm.model.get_relation(S3_RELATION).units + ): # There are still members in the relation, defer until it is finished + # Make a relation-change in the peer relation, so it triggers this unit back + counter = self.charm.peers_data.get(Scope.UNIT, "s3_broken") + if counter: + self.charm.peers_data.put(Scope.UNIT, "s3_broken", counter + 1) + else: + self.charm.peers_data.put(Scope.UNIT, "s3_broken", 1) event.defer() return + + # Second part of this work-around + if self.charm.peers_data.get(Scope.UNIT, "s3_broken"): + # Now, we can delete it + self.charm.peers_data.delete(Scope.UNIT, "s3_broken") + self.charm.status.set(MaintenanceStatus("Disabling backup service...")) snapshot_status = self._check_snapshot_status() if snapshot_status in [ @@ -338,7 +502,9 @@ def _on_s3_broken(self, event: EventBase) -> None: ]: # 1) snapshot is either in progress or partially taken: block and defer this event self.charm.status.set( - BlockedStatus(f"Disabling backup not possible: {snapshot_status}") + MaintenanceStatus( + f"Disabling backup postponed until backup in progress: {snapshot_status}" + ) ) event.defer() return @@ -349,7 +515,8 @@ def _on_s3_broken(self, event: EventBase) -> None: "Snapshot has been partially taken, but not completed. Continuing with relation removal..." ) - # Run the check here, as we want all the units to keep deferring the event if needed. + # Run the check here, instead of the start of this hook, as we want all the + # units to keep deferring the event if needed. # That avoids a condition where we have: # 1) A long snapshot is taking place # 2) Relation is removed @@ -384,7 +551,7 @@ def _on_s3_broken(self, event: EventBase) -> None: self.charm.plugin_manager.reset_event_scope() return self.charm.plugin_manager.reset_event_scope() - self.charm.status.clear(ActiveStatus()) + self.charm.status.set(ActiveStatus()) def _execute_s3_broken_calls(self): """Executes the s3 broken API calls.""" @@ -402,9 +569,23 @@ def _check_snapshot_status(self) -> BackupServiceState: except (ValueError, OpenSearchHttpError, requests.HTTPError): return BackupServiceState.RESPONSE_FAILED_NETWORK + def _get_endpoint_protocol(self, endpoint: str) -> str: + """Returns the protocol based on the endpoint.""" + if endpoint.startswith("http://"): + return "http" + if endpoint.startswith("https://"): + return "https" + return "https" + def _register_snapshot_repo(self) -> BackupServiceState: """Registers the snapshot repo in the cluster.""" info = self.s3_client.get_s3_connection_info() + extra_settings = {} + if info.get("region"): + extra_settings["region"] = info.get("region") + if info.get("storage-class"): + extra_settings["storage_class"] = info.get("storage-class") + return self.get_service_status( self._request( "PUT", @@ -412,17 +593,17 @@ def _register_snapshot_repo(self) -> BackupServiceState: payload={ "type": "s3", "settings": { - "region": info.get("region"), "endpoint": info.get("endpoint"), - "protocol": "http", # TODO: roll back to https once we have the cert + "protocol": self._get_endpoint_protocol(info.get("endpoint")), "bucket": info["bucket"], "base_path": info.get("path", S3_REPO_BASE_PATH), + **extra_settings, }, }, ) ) - def _is_s3_fully_configured(self) -> bool: + def can_use_s3_repository(self) -> bool: """Checks if relation is set and all configs needed are present. The get_s3_connection_info() checks if the relation is present, and if yes, @@ -433,7 +614,7 @@ def _is_s3_fully_configured(self) -> bool: """ missing_s3_configs = [ config - for config in ["region", "bucket", "access-key", "secret-key"] + for config in ["bucket", "endpoint", "access-key", "secret-key"] if config not in self.s3_client.get_s3_connection_info() ] if missing_s3_configs: @@ -510,6 +691,8 @@ def get_service_status(self, response: Dict[str, Any]) -> BackupServiceState: # return BackupServiceState.ILLEGAL_ARGUMENT if type == "snapshot_missing_exception": return BackupServiceState.SNAPSHOT_MISSING + if type == "snapshot_restore_exception" and RESTORE_OPEN_INDEX_WITH_SAME_NAME in reason: + return BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED if type == "snapshot_restore_exception": return BackupServiceState.SNAPSHOT_RESTORE_ERROR if ( diff --git a/tests/integration/ha/test_backups.py b/tests/integration/ha/test_backups.py index ccb34b635..81219fde6 100644 --- a/tests/integration/ha/test_backups.py +++ b/tests/integration/ha/test_backups.py @@ -29,6 +29,7 @@ get_application_unit_ids_ips, get_leader_unit_id, get_leader_unit_ip, + http_request, run_action, ) from tests.integration.tls.test_tls import TLS_CERTIFICATES_APP_NAME @@ -37,6 +38,7 @@ S3_INTEGRATOR_NAME = "s3-integrator" +TEST_BACKUP_DOC_ID = 10 CLOUD_CONFIGS = { "aws": { "endpoint": "https://s3.amazonaws.com", @@ -97,6 +99,9 @@ # bucket_object.delete() +TEST_BACKUP_INDEX = "test_backup_index" + + @pytest.fixture() async def c_writes(ops_test: OpsTest): """Creates instance of the ContinuousWrites.""" @@ -200,13 +205,11 @@ async def test_backup_cluster( units = await get_application_unit_ids_ips(ops_test, app=app) leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) - # create index with r_shards = nodes - 1 - test_backup_index = "test_backup_index" - await create_index(ops_test, app, leader_unit_ip, test_backup_index, r_shards=len(units) - 1) + await create_index(ops_test, app, leader_unit_ip, TEST_BACKUP_INDEX, r_shards=len(units) - 1) # index document - doc_id = 10 - await index_doc(ops_test, app, leader_unit_ip, test_backup_index, doc_id) + doc_id = TEST_BACKUP_DOC_ID + await index_doc(ops_test, app, leader_unit_ip, TEST_BACKUP_INDEX, doc_id) # check that the doc can be retrieved from any node logger.info("Test backup index: searching") @@ -215,13 +218,13 @@ async def test_backup_cluster( ops_test, app, u_ip, - test_backup_index, + TEST_BACKUP_INDEX, query={"query": {"term": {"_id": doc_id}}}, preference="_only_local", ) # Validate the index and document are present assert len(docs) == 1 - assert docs[0]["_source"] == default_doc(test_backup_index, doc_id) + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) leader_id = await get_leader_unit_id(ops_test, app) @@ -230,16 +233,181 @@ async def test_backup_cluster( assert action.status == "completed" - list_backups = await run_action(ops_test, leader_id, "list-backups") + list_backups = await run_action(ops_test, leader_id, "list-backups", params={"output": "json"}) logger.info(f"list-backups output: {list_backups}") # Expected format: - # namespace(status='completed', response={'return-code': 0, 'snapshots': '{"1": "SUCCESS"}'}) + # namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'}) + backups = json.loads(list_backups.response["backups"]) assert list_backups.status == "completed" - assert len(json.loads(list_backups.response["snapshots"])) == int(action.response["backup-id"]) - assert ( - json.loads(list_backups.response["snapshots"])[action.response["backup-id"]] == "SUCCESS" - ) + assert len(backups.keys()) == int(action.response["backup-id"]) + assert backups[action.response["backup-id"]]["state"] == "SUCCESS" # continuous writes checks await assert_continuous_writes_consistency(ops_test, c_writes, app) + + +@pytest.mark.abort_on_fail +async def test_restore_cluster( + ops_test: OpsTest, +) -> None: + """Deletes the TEST_BACKUP_INDEX, restores the cluster and tries to search for index.""" + app = (await app_name(ops_test)) or APP_NAME + + units = await get_application_unit_ids_ips(ops_test, app=app) + leader_id = await get_leader_unit_id(ops_test, app) + leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) + + await http_request( + ops_test, + "DELETE", + f"https://{leader_unit_ip}:9200/{TEST_BACKUP_INDEX}", + app=app, + ) + + action = await run_action(ops_test, leader_id, "restore", params={"backup-id": 1}) + logger.info(f"restore output: {action}") + assert action.status == "completed" + + # index document + doc_id = TEST_BACKUP_DOC_ID + # check that the doc can be retrieved from any node + logger.info("Test backup index: searching") + for u_id, u_ip in units.items(): + docs = await search( + ops_test, + app, + u_ip, + TEST_BACKUP_INDEX, + query={"query": {"term": {"_id": doc_id}}}, + preference="_only_local", + ) + # Validate the index and document are present + assert len(docs) == 1 + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) + + +@pytest.mark.abort_on_fail +async def test_restore_cluster_after_app_destroyed(ops_test: OpsTest) -> None: + """Deletes the entire OpenSearch cluster and redeploys from scratch. + + Restores the backup and then checks if the same TEST_BACKUP_INDEX is there. + """ + app = (await app_name(ops_test)) or APP_NAME + await ops_test.model.remove_application(app, block_until_done=True) + app_num_units = int(os.environ.get("TEST_NUM_APP_UNITS", None) or 3) + my_charm = await ops_test.build_charm(".") + # Redeploy + await asyncio.gather( + ops_test.model.deploy(my_charm, num_units=app_num_units, series=SERIES), + ) + # Relate it to OpenSearch to set up TLS. + await ops_test.model.relate(APP_NAME, TLS_CERTIFICATES_APP_NAME) + await ops_test.model.relate(APP_NAME, S3_INTEGRATOR_NAME) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1400, + idle_period=IDLE_PERIOD, + ) + + units = await get_application_unit_ids_ips(ops_test, app=app) + leader_id = await get_leader_unit_id(ops_test, app) + + action = await run_action(ops_test, leader_id, "restore", params={"backup-id": 1}) + logger.info(f"restore output: {action}") + assert action.status == "completed" + + # index document + doc_id = TEST_BACKUP_DOC_ID + # check that the doc can be retrieved from any node + logger.info("Test backup index: searching") + for u_id, u_ip in units.items(): + docs = await search( + ops_test, + app, + u_ip, + TEST_BACKUP_INDEX, + query={"query": {"term": {"_id": doc_id}}}, + preference="_only_local", + ) + # Validate the index and document are present + assert len(docs) == 1 + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) + + +@pytest.mark.abort_on_fail +async def test_remove_and_readd_s3_relation(ops_test: OpsTest) -> None: + """Removes and re-adds the s3-credentials relation to test backup and restore.""" + app = (await app_name(ops_test)) or APP_NAME + units = await get_application_unit_ids_ips(ops_test, app=app) + leader_id = await get_leader_unit_id(ops_test, app) + leader_unit_ip = await get_leader_unit_ip(ops_test, app=app) + + logger.info("Remove s3-credentials relation") + # Remove relation + await ops_test.model.applications[APP_NAME].destroy_relation( + "s3-credentials", f"{S3_INTEGRATOR_NAME}:s3-credentials" + ) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1400, + idle_period=IDLE_PERIOD, + ) + + logger.info("Re-add s3-credentials relation") + await ops_test.model.relate(APP_NAME, S3_INTEGRATOR_NAME) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1400, + idle_period=IDLE_PERIOD, + ) + + leader_id = await get_leader_unit_id(ops_test, app) + + action = await run_action(ops_test, leader_id, "create-backup") + logger.info(f"create-backup output: {action}") + + assert action.status == "completed" + + list_backups = await run_action(ops_test, leader_id, "list-backups", params={"output": "json"}) + logger.info(f"list-backups output: {list_backups}") + + # Expected format: + # namespace(status='completed', response={'return-code': 0, 'backups': '{"1": ...}'}) + backups = json.loads(list_backups.response["backups"]) + assert list_backups.status == "completed" + assert len(backups.keys()) == int(action.response["backup-id"]) + assert backups[action.response["backup-id"]]["state"] == "SUCCESS" + + await http_request( + ops_test, + "DELETE", + f"https://{leader_unit_ip}:9200/{TEST_BACKUP_INDEX}", + app=app, + ) + + action = await run_action( + ops_test, leader_id, "restore", params={"backup-id": int(action.response["backup-id"])} + ) + logger.info(f"restore-backup output: {action}") + assert action.status == "completed" + + # index document + doc_id = TEST_BACKUP_DOC_ID + # check that the doc can be retrieved from any node + logger.info("Test backup index: searching") + for u_id, u_ip in units.items(): + docs = await search( + ops_test, + app, + u_ip, + TEST_BACKUP_INDEX, + query={"query": {"term": {"_id": doc_id}}}, + preference="_only_local", + ) + # Validate the index and document are present + assert len(docs) == 1 + assert docs[0]["_source"] == default_doc(TEST_BACKUP_INDEX, doc_id) diff --git a/tests/unit/lib/test_backups.py b/tests/unit/lib/test_backups.py index 5519a4365..f4019ada9 100644 --- a/tests/unit/lib/test_backups.py +++ b/tests/unit/lib/test_backups.py @@ -57,6 +57,12 @@ def setUp(self) -> None: self.harness.add_relation_unit(self.s3_rel_id, "s3-integrator/0") mock_pm_run.assert_not_called() + def test_get_endpoint_protocol(self) -> None: + """Tests the get_endpoint_protocol method.""" + assert self.charm.backup._get_endpoint_protocol("http://10.0.0.1:8000") == "http" + assert self.charm.backup._get_endpoint_protocol("https://10.0.0.2:8000") == "https" + assert self.charm.backup._get_endpoint_protocol("test.not-valid-url") == "https" + @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup.apply_api_config_if_needed") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager._apply_config") @@ -91,9 +97,10 @@ def test_00_update_relation_data(self, __, mock_apply_config, _, mock_status) -> ).__dict__ ) + @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._request") @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.request") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") - def test_01_apply_api_config_if_needed(self, mock_status, mock_request) -> None: + def test_01_apply_api_config_if_needed(self, mock_status, _, mock_request) -> None: """Tests the application of post-restart steps.""" self.harness.update_relation_data( self.s3_rel_id, @@ -110,15 +117,18 @@ def test_01_apply_api_config_if_needed(self, mock_status, mock_request) -> None: ) mock_status.return_value = PluginState.ENABLED self.charm.backup.apply_api_config_if_needed() - mock_request.called_once_with("GET", f"_snapshot/{S3_REPOSITORY}") - mock_request.called_once_with( + mock_request.assert_called_with( "PUT", f"_snapshot/{S3_REPOSITORY}", payload={ "type": "s3", "settings": { + "endpoint": "localhost", + "protocol": "https", "bucket": TEST_BUCKET_NAME, "base_path": TEST_BASE_PATH, + "region": "testing-region", + "storage_class": "storageclass", }, }, ) @@ -128,7 +138,7 @@ def test_01_apply_api_config_if_needed(self, mock_status, mock_request) -> None: @patch("charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.request") @patch("charms.opensearch.v0.opensearch_backups.OpenSearchBackup._execute_s3_broken_calls") @patch("charms.opensearch.v0.opensearch_plugin_manager.OpenSearchPluginManager.status") - def test_99_relation_broken( + def test_20_relation_broken( self, mock_status, mock_execute_s3_broken_calls, @@ -144,6 +154,7 @@ def test_99_relation_broken( {"SUCCESS"}, ] mock_status.return_value = PluginState.ENABLED + self.harness.remove_relation_unit(self.s3_rel_id, "s3-integrator/0") self.harness.remove_relation(self.s3_rel_id) mock_request.called_once_with("GET", "/_snapshot/_status") mock_execute_s3_broken_calls.assert_called_once()