From 7f5487315322b6c1c1292dc49393c3522141253e Mon Sep 17 00:00:00 2001 From: Seung Yeon Joo Date: Tue, 29 Oct 2024 23:12:04 -0700 Subject: [PATCH] Convert-Index-To-Remote action added Signed-off-by: Seung Yeon Joo --- .../indexstatemanagement/ISMActionsParser.kt | 2 + .../action/ConvertIndexToRemoteAction.kt | 72 +++++ .../ConvertIndexToRemoteActionParser.kt | 44 +++ .../step/restore/AttemptRestoreStep.kt | 158 ++++++++++ .../step/restore/WaitForRestoreStep.kt | 78 +++++ .../validation/ActionValidation.kt | 1 + .../ValidateConvertIndexToRemote.kt | 76 +++++ .../mappings/opendistro-ism-config.json | 9 +- .../IndexManagementRestTestCase.kt | 2 +- .../indexstatemanagement/TestHelpers.kt | 5 + .../step/AttemptRestoreStepTests.kt | 161 ++++++++++ .../step/WaitForRestoreStepTests.kt | 285 ++++++++++++++++++ .../cached-opendistro-ism-config.json | 9 +- 13 files changed, 899 insertions(+), 3 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteActionParser.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/AttemptRestoreStep.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/WaitForRestoreStep.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateConvertIndexToRemote.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRestoreStepTests.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRestoreStepTests.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index a4c95b461..6b75abae2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -11,6 +11,7 @@ import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityActionParser @@ -52,6 +53,7 @@ class ISMActionsParser private constructor() { ShrinkActionParser(), SnapshotActionParser(), TransformActionParser(), + ConvertIndexToRemoteActionParser(), ) val customActionExtensionMap = mutableMapOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteAction.kt new file mode 100644 index 000000000..1e9840687 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteAction.kt @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.indexmanagement.indexstatemanagement.step.restore.AttemptRestoreStep +import org.opensearch.indexmanagement.indexstatemanagement.step.restore.WaitForRestoreStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext + +class ConvertIndexToRemoteAction( + val repository: String, + index: Int, +) : Action(name, index) { + + companion object { + const val name = "convert_index_to_remote" + const val REPOSITORY_FIELD = "repository" + + @JvmStatic + fun fromStreamInput(si: StreamInput): ConvertIndexToRemoteAction { + val repository = si.readString() + val index = si.readInt() + return ConvertIndexToRemoteAction(repository, index) + } + } + + private val attemptRestoreStep = AttemptRestoreStep(this) + private val waitForRestoreStep = WaitForRestoreStep() + + private val steps = listOf(attemptRestoreStep, waitForRestoreStep) + + @Suppress("ReturnCount") + override fun getStepToExecute(context: StepContext): Step { + // If stepMetaData is null, return the first step (attemptRestoreStep) + val stepMetaData = context.metadata.stepMetaData ?: return attemptRestoreStep + + // If the current step has completed, return the next step + if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { + return when (stepMetaData.name) { + AttemptRestoreStep.name -> waitForRestoreStep + else -> attemptRestoreStep // Default to the first step + } + } + + // If the current step is not completed, continue executing it + return when (stepMetaData.name) { + AttemptRestoreStep.name -> attemptRestoreStep + else -> waitForRestoreStep + } + } + + override fun getSteps(): List = steps + + override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) { + builder.startObject(type) + builder.field(REPOSITORY_FIELD, repository) + builder.endObject() + } + + override fun populateAction(out: StreamOutput) { + out.writeString(repository) + out.writeInt(actionIndex) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteActionParser.kt new file mode 100644 index 000000000..975d5ddd6 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ConvertIndexToRemoteActionParser.kt @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParser.Token +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.REPOSITORY_FIELD +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser + +class ConvertIndexToRemoteActionParser : ActionParser() { + override fun fromStreamInput(sin: StreamInput): Action { + return ConvertIndexToRemoteAction.fromStreamInput(sin) + } + + override fun fromXContent(xcp: XContentParser, index: Int): Action { + var repository: String? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + REPOSITORY_FIELD -> repository = xcp.text() + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ConvertIndexToRemoteAction.") + } + } + + return ConvertIndexToRemoteAction( + repository = requireNotNull(repository) { "ConvertIndexToRemoteAction repository must be specified" }, + index = index, + ) + } + + override fun getActionType(): String { + return ConvertIndexToRemoteAction.name + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/AttemptRestoreStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/AttemptRestoreStep.kt new file mode 100644 index 000000000..2c96aefe5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/AttemptRestoreStep.kt @@ -0,0 +1,158 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.restore + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse +import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.snapshots.SnapshotException +import org.opensearch.snapshots.SnapshotState +import org.opensearch.transport.RemoteTransportException + +class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(name) { + + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + private var snapshotName: String? = null + + @Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod") + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + val repository = action.repository + + try { + val mutableInfo = mutableMapOf() + + // List snapshots matching the pattern + val getSnapshotsRequest = GetSnapshotsRequest() + .repository(repository) + .snapshots(arrayOf("$indexName*")) + .ignoreUnavailable(true) + .verbose(true) + + val getSnapshotsResponse: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil { + getSnapshots(getSnapshotsRequest, it) + } + val snapshots = getSnapshotsResponse.snapshots + if (snapshots.isNullOrEmpty()) { + val message = getFailedMessage(indexName, "No snapshots found matching pattern [$indexName*]") + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + return this + } + + val successfulSnapshots = snapshots.filter { it.state() == SnapshotState.SUCCESS } + + if (successfulSnapshots.isEmpty()) { + val message = getFailedMessage( + indexName, + "No successful snapshots found matching pattern [$indexName*]", + ) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + return this + } + + // Select the latest snapshot + val latestSnapshotInfo = successfulSnapshots.maxByOrNull { it.endTime() }!! + logger.info("Restoring snapshot info: $latestSnapshotInfo") + + // Use the snapshot name from the selected SnapshotInfo + snapshotName = latestSnapshotInfo.snapshotId().name + + // Proceed with the restore operation + val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName) + .indices("*") + .storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) + .renamePattern("^(.*)\$") + .renameReplacement("$1_remote") + .waitForCompletion(false) + val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil { + restoreSnapshot(restoreSnapshotRequest, it) + } + + when (response.status()) { + RestStatus.ACCEPTED, RestStatus.OK -> { + stepStatus = StepStatus.COMPLETED + mutableInfo["message"] = getSuccessMessage(indexName) + } + else -> { + val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}") + logger.warn("$message - $response") + stepStatus = StepStatus.FAILED + mutableInfo["message"] = message + mutableInfo["cause"] = response.toString() + } + } + info = mutableInfo.toMap() + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotException) { + handleRestoreException(indexName, cause) + } else { + handleException(indexName, cause as Exception) + } + } catch (e: SnapshotException) { + handleRestoreException(indexName, e) + } catch (e: Exception) { + handleException(indexName, e) + } + + return this + } + + private fun handleRestoreException(indexName: String, e: SnapshotException) { + val message = getFailedRestoreMessage(indexName) + logger.debug(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + private fun handleException(indexName: String, e: Exception) { + val message = getFailedMessage(indexName, e.message ?: "Unknown error") + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + val currentActionMetaData = currentMetadata.actionMetaData + return currentMetadata.copy( + actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)), + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info, + ) + } + + override fun isIdempotent(): Boolean = false + + companion object { + val validTopContextFields = setOf("index", "indexUuid") + const val name = "attempt_restore" + fun getFailedMessage(index: String, cause: String) = "Failed to start restore for [index=$index], cause: $cause" + fun getFailedRestoreMessage(index: String) = "Failed to start restore due to concurrent restore or snapshot in progress [index=$index]" + fun getSuccessMessage(index: String) = "Successfully started restore for [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/WaitForRestoreStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/WaitForRestoreStep.kt new file mode 100644 index 000000000..3a4fa1f65 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/restore/WaitForRestoreStep.kt @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.restore + +import org.apache.logging.log4j.LogManager +import org.opensearch.cluster.RestoreInProgress +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.transport.RemoteTransportException + +class WaitForRestoreStep : Step(name) { + + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + + try { + val clusterState = context.clusterService.state() + val restoreInProgress = clusterState.custom(RestoreInProgress.TYPE) + + val restoreOngoing = restoreInProgress?.let { rip -> + rip.any { entry -> + entry.indices().contains(indexName) + } + } ?: false + + if (restoreOngoing) { + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to getPendingMessage(indexName)) + } else { + // Restore is complete + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessMessage(indexName)) + } + } catch (e: Exception) { + handleException(indexName, e) + } + + return this + } + + private fun handleException(indexName: String, e: Exception) { + val message = getFailedMessage(indexName, e.message ?: "Unknown error") + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val cause = (e as? RemoteTransportException)?.cause + + val errorMessage = cause?.message ?: e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + return currentMetadata.copy( + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info, + ) + } + + override fun isIdempotent(): Boolean = true + + companion object { + const val name = "wait_for_restore" + fun getFailedMessage(index: String, cause: String) = "Failed to check restore status for [index=$index], cause: $cause" + fun getPendingMessage(index: String) = "Restore not complete for [index=$index], retrying..." + fun getSuccessMessage(index: String) = "Restore complete for [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt index e26f2d32e..04ff73026 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt @@ -30,6 +30,7 @@ class ActionValidation( "read_write" -> ValidateReadWrite(settings, clusterService, jvmService).execute(indexName) "replica_count" -> ValidateReplicaCount(settings, clusterService, jvmService).execute(indexName) "snapshot" -> ValidateSnapshot(settings, clusterService, jvmService).execute(indexName) + "convert_index_to_remote" -> ValidateConvertIndexToRemote(settings, clusterService, jvmService).execute(indexName) "transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName) "close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName) "index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateConvertIndexToRemote.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateConvertIndexToRemote.kt new file mode 100644 index 000000000..9d8071c26 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateConvertIndexToRemote.kt @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.validation + +import org.apache.logging.log4j.LogManager +import org.opensearch.cluster.metadata.MetadataCreateIndexService +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate +import org.opensearch.indexmanagement.util.OpenForTesting +import org.opensearch.indices.InvalidIndexNameException +import org.opensearch.monitor.jvm.JvmService + +@OpenForTesting +class ValidateConvertIndexToRemote( + settings: Settings, + clusterService: ClusterService, + jvmService: JvmService, +) : Validate(settings, clusterService, jvmService) { + + private val logger = LogManager.getLogger(javaClass) + + @Suppress("ReturnSuppressCount", "ReturnCount") + override fun execute(indexName: String): Validate { + // For restore action, check if index name is valid + if (!validIndexName(indexName)) { + validationStatus = ValidationStatus.FAILED + return this + } + + // Optionally, check if the index already exists + // Depending on your requirements, you may want to allow or disallow restoring over existing indices + if (indexExists(indexName)) { + val message = getIndexAlreadyExistsMessage(indexName) + logger.warn(message) + validationStatus = ValidationStatus.FAILED + validationMessage = message + return this + } + + validationMessage = getValidationPassedMessage(indexName) + return this + } + + private fun indexExists(indexName: String): Boolean { + val indexExists = clusterService.state().metadata.indices.containsKey(indexName) + return indexExists + } + + // Checks if the index name is valid according to OpenSearch naming conventions + private fun validIndexName(indexName: String): Boolean { + val exceptionGenerator: (String, String) -> RuntimeException = { name, reason -> + InvalidIndexNameException(name, reason) + } + try { + MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator) + } catch (e: Exception) { + val message = getIndexNotValidMessage(indexName) + logger.warn(message) + validationMessage = message + return false + } + return true + } + + @Suppress("TooManyFunctions") + companion object { + const val name = "validate_convert_index_to_remote" + fun getIndexAlreadyExistsMessage(index: String) = "Index [index=$index] already exists, cannot restore over existing index." + fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid for restore action." + fun getValidationPassedMessage(index: String) = "Restore action validation passed for [index=$index]" + } +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 4c138a267..5a683d808 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 21 + "schema_version": 22 }, "dynamic": "strict", "properties": { @@ -216,6 +216,13 @@ } } }, + "convert_index_to_remote": { + "properties": { + "repository": { + "type": "keyword" + } + } + }, "rollover": { "properties": { "min_size": { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index a55809379..b9b9dccb9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 21 + val configSchemaVersion = 22 val historySchemaVersion = 7 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 218ec4879..b728282e5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -17,6 +17,7 @@ import org.opensearch.indexmanagement.common.model.notification.Channel import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationAction import org.opensearch.indexmanagement.indexstatemanagement.action.CloseAction +import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeAction import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityAction @@ -280,6 +281,10 @@ fun randomSnapshotActionConfig(repository: String = "repo", snapshot: String = " return SnapshotAction(repository, snapshot, index = 0) } +fun randomRestoreActionConfig(repository: String = "repo"): ConvertIndexToRemoteAction { + return ConvertIndexToRemoteAction(repository, index = 0) +} + /** * Helper functions for creating a random Conditions object */ diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRestoreStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRestoreStepTests.kt new file mode 100644 index 000000000..99fe09154 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRestoreStepTests.kt @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse +import org.opensearch.client.AdminClient +import org.opensearch.client.Client +import org.opensearch.client.ClusterAdminClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.ClusterSettings +import org.opensearch.common.settings.Settings +import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.randomRestoreActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST +import org.opensearch.indexmanagement.indexstatemanagement.step.restore.AttemptRestoreStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.ingest.TestTemplateService.MockTemplateScript +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.script.TemplateScript +import org.opensearch.test.OpenSearchTestCase + +class AttemptRestoreStepTests : OpenSearchTestCase() { + + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val restoreAction = randomRestoreActionConfig("repo") + private val metadata = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + AttemptRestoreStep.name, + 1, + 0, + false, + 0, + null, + ActionProperties(snapshotName = "snapshot-name"), + ), + null, + null, + null, + ) + private val lockService: LockService = LockService(mock(), clusterService) + + @Before + fun setup() { + whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST))) + whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(MockTemplateScript.Factory("snapshot-pattern*")) + } + + fun `test restore failure with no matching snapshots`() { + val getSnapshotsResponse = GetSnapshotsResponse(emptyList()) + val client = getClient(getAdminClient(getClusterAdminClient(getSnapshotsResponse, null, null))) + + runBlocking { + val step = AttemptRestoreStep(restoreAction) + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + step.preExecute(logger, context).execute() + val updatedMetadata = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedMetadata.stepMetaData?.stepStatus, + ) + assertEquals( + "Did not get correct failure message", + "Failed to start restore for [index=test], cause: No snapshots found matching pattern [test*]", + updatedMetadata.info!!["message"], + ) + } + } + + fun `test restore exception`() { + val exception = IllegalArgumentException("example") + val getSnapshotsResponse = GetSnapshotsResponse(emptyList()) + val client = getClient(getAdminClient(getClusterAdminClient(getSnapshotsResponse, null, exception))) + + runBlocking { + val step = AttemptRestoreStep(restoreAction) + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + step.preExecute(logger, context).execute() + val updatedMetadata = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedMetadata.stepMetaData?.stepStatus, + ) + assertEquals( + "Did not get cause from exception", + "example", + updatedMetadata.info!!["cause"], + ) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + + private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient } + + private fun getClusterAdminClient( + getSnapshotsResponse: GetSnapshotsResponse?, + restoreSnapshotResponse: RestoreSnapshotResponse?, + exception: Exception?, + ): ClusterAdminClient { + return mock { + // Mock getSnapshots call + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + when { + exception != null -> listener.onFailure(exception) + getSnapshotsResponse != null -> listener.onResponse(getSnapshotsResponse) + else -> listener.onResponse(GetSnapshotsResponse(emptyList())) + } + }.whenever(this.mock).getSnapshots(any(), any()) + + // Mock restoreSnapshot call + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + when { + exception != null -> listener.onFailure(exception) + restoreSnapshotResponse != null -> listener.onResponse(restoreSnapshotResponse) + else -> listener.onResponse(mock()) + } + }.whenever(this.mock).restoreSnapshot(any(), any()) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRestoreStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRestoreStepTests.kt new file mode 100644 index 000000000..6bd841d6f --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForRestoreStepTests.kt @@ -0,0 +1,285 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse +import org.opensearch.client.AdminClient +import org.opensearch.client.Client +import org.opensearch.client.ClusterAdminClient +import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.RestoreInProgress +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.step.restore.WaitForRestoreStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.transport.RemoteTransportException + +class WaitForRestoreStepTests : OpenSearchTestCase() { + + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val lockService: LockService = LockService(mock(), clusterService) + + fun `test restore status states`() { + val restoreEntry: RestoreInProgress.Entry = mock() + whenever(restoreEntry.indices()).doReturn(listOf("test")) + val restoreInProgress = RestoreInProgress.Builder() + .add(restoreEntry) + .build() + val clusterState: ClusterState = mock() + whenever(clusterState.custom(RestoreInProgress.TYPE)).thenReturn(restoreInProgress) + whenever(clusterService.state()).thenReturn(clusterState) + val response: SnapshotsStatusResponse = mock() + val client = getClient(getAdminClient(getClusterAdminClient(response, null))) + + val metadata = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + WaitForRestoreStep.name, + 1, + 0, + false, + 0, + null, + ActionProperties(snapshotName = "snapshot-name"), + ), + null, + null, + null, + ) + runBlocking { + val step = WaitForRestoreStep() + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + step.preExecute(logger, context).execute() + val updatedMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not CONDITION_NOT_MET", + Step.StepStatus.CONDITION_NOT_MET, + updatedMetaData.stepMetaData?.stepStatus, + ) + assertEquals( + "Did not get restore in progress message", + WaitForRestoreStep.getPendingMessage("test"), + updatedMetaData.info!!["message"], + ) + } + + // Test SUCCESS state + whenever(restoreEntry.indices()).doReturn(listOf("other-test")) + runBlocking { + val step = WaitForRestoreStep() + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + step.preExecute(logger, context).execute() + val updatedMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not COMPLETED", + Step.StepStatus.COMPLETED, + updatedMetaData.stepMetaData?.stepStatus, + ) + assertEquals( + "Did not get restore completed message", + WaitForRestoreStep.getSuccessMessage("test"), + updatedMetaData.info!!["message"], + ) + } + } + + fun `test restore not in response list`() { + val client = getClient(getAdminClient(getClusterAdminClient(null, IllegalArgumentException("not used")))) + + val metadata = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + WaitForRestoreStep.name, + 1, + 0, + false, + 0, + null, + ActionProperties(snapshotName = "snapshot-name"), + ), + null, + null, + null, + ) + val restoreEntry: RestoreInProgress.Entry = mock() + whenever(restoreEntry.indices()).doReturn(listOf("not-test")) + val restoreInProgress = RestoreInProgress.Builder() + .add(restoreEntry) + .build() + val clusterState: ClusterState = mock() + whenever(clusterState.custom(RestoreInProgress.TYPE)).thenReturn(restoreInProgress) + whenever(clusterService.state()).thenReturn(clusterState) + + runBlocking { + val step = WaitForRestoreStep() + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + step.preExecute(logger, context).execute() + val updatedMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not COMPLETED", + Step.StepStatus.COMPLETED, + updatedMetaData.stepMetaData?.stepStatus, + ) + assertEquals( + "Did not get restore completed message", + WaitForRestoreStep.getSuccessMessage("test"), + updatedMetaData.info!!["message"], + ) + } + } + + fun `test restore exception`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getClusterAdminClient(null, exception))) + whenever(clusterService.state()).thenThrow(exception) + val metadata = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + WaitForRestoreStep.name, + 1, + 0, + false, + 0, + null, + ActionProperties(snapshotName = "snapshot-name"), + ), + null, + null, + null, + ) + + runBlocking { + val step = WaitForRestoreStep() + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + step.preExecute(logger, context).execute() + val updatedMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedMetaData.stepMetaData?.stepStatus, + ) + assertEquals( + "Did not get cause from exception", + "example", + updatedMetaData.info!!["cause"], + ) + } + } + + fun `test restore remote transport exception`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getClusterAdminClient(null, exception))) + whenever(clusterService.state()).thenThrow(exception) + val metadata = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + WaitForRestoreStep.name, + 1, + 0, + false, + 0, + null, + ActionProperties(snapshotName = "snapshot-name"), + ), + null, + null, + null, + ) + + runBlocking { + val step = WaitForRestoreStep() + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + step.preExecute(logger, context).execute() + val updatedMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedMetaData.stepMetaData?.stepStatus, + ) + assertEquals( + "Did not get cause from nested exception", + "nested", + updatedMetaData.info!!["cause"], + ) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient } + private fun getClusterAdminClient(response: SnapshotsStatusResponse?, exception: Exception?): ClusterAdminClient { + assertTrue("Must provide one and only one response or exception", (response != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (response != null) { + listener.onResponse(response) + } else { + listener.onFailure(exception) + } + }.whenever(this.mock).snapshotsStatus(any(), any()) + } + } +} diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 4c138a267..5a683d808 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 21 + "schema_version": 22 }, "dynamic": "strict", "properties": { @@ -216,6 +216,13 @@ } } }, + "convert_index_to_remote": { + "properties": { + "repository": { + "type": "keyword" + } + } + }, "rollover": { "properties": { "min_size": {