Skip to content

Commit

Permalink
Convert-Index-To-Remote action added
Browse files Browse the repository at this point in the history
Signed-off-by: Seung Yeon Joo <[email protected]>
  • Loading branch information
Seung Yeon Joo committed Oct 30, 2024
1 parent 5aaf114 commit 7f54873
Show file tree
Hide file tree
Showing 13 changed files with 899 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityActionParser
Expand Down Expand Up @@ -52,6 +53,7 @@ class ISMActionsParser private constructor() {
ShrinkActionParser(),
SnapshotActionParser(),
TransformActionParser(),
ConvertIndexToRemoteActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.AttemptRestoreStep
import org.opensearch.indexmanagement.indexstatemanagement.step.restore.WaitForRestoreStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

class ConvertIndexToRemoteAction(
val repository: String,
index: Int,
) : Action(name, index) {

companion object {
const val name = "convert_index_to_remote"
const val REPOSITORY_FIELD = "repository"

@JvmStatic
fun fromStreamInput(si: StreamInput): ConvertIndexToRemoteAction {
val repository = si.readString()
val index = si.readInt()
return ConvertIndexToRemoteAction(repository, index)
}
}

private val attemptRestoreStep = AttemptRestoreStep(this)
private val waitForRestoreStep = WaitForRestoreStep()

private val steps = listOf(attemptRestoreStep, waitForRestoreStep)

@Suppress("ReturnCount")
override fun getStepToExecute(context: StepContext): Step {
// If stepMetaData is null, return the first step (attemptRestoreStep)
val stepMetaData = context.metadata.stepMetaData ?: return attemptRestoreStep

// If the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptRestoreStep.name -> waitForRestoreStep
else -> attemptRestoreStep // Default to the first step
}
}

// If the current step is not completed, continue executing it
return when (stepMetaData.name) {
AttemptRestoreStep.name -> attemptRestoreStep
else -> waitForRestoreStep
}
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(REPOSITORY_FIELD, repository)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeString(repository)
out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.REPOSITORY_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class ConvertIndexToRemoteActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
return ConvertIndexToRemoteAction.fromStreamInput(sin)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var repository: String? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ConvertIndexToRemoteAction.")
}
}

return ConvertIndexToRemoteAction(
repository = requireNotNull(repository) { "ConvertIndexToRemoteAction repository must be specified" },
index = index,
)
}

override fun getActionType(): String {
return ConvertIndexToRemoteAction.name
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.restore

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotException
import org.opensearch.snapshots.SnapshotState
import org.opensearch.transport.RemoteTransportException

class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var snapshotName: String? = null

@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod")
override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
val repository = action.repository

try {
val mutableInfo = mutableMapOf<String, String>()

// List snapshots matching the pattern
val getSnapshotsRequest = GetSnapshotsRequest()
.repository(repository)
.snapshots(arrayOf("$indexName*"))
.ignoreUnavailable(true)
.verbose(true)

val getSnapshotsResponse: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil {
getSnapshots(getSnapshotsRequest, it)
}
val snapshots = getSnapshotsResponse.snapshots
if (snapshots.isNullOrEmpty()) {
val message = getFailedMessage(indexName, "No snapshots found matching pattern [$indexName*]")
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

val successfulSnapshots = snapshots.filter { it.state() == SnapshotState.SUCCESS }

if (successfulSnapshots.isEmpty()) {
val message = getFailedMessage(
indexName,
"No successful snapshots found matching pattern [$indexName*]",
)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}

// Select the latest snapshot
val latestSnapshotInfo = successfulSnapshots.maxByOrNull { it.endTime() }!!
logger.info("Restoring snapshot info: $latestSnapshotInfo")

// Use the snapshot name from the selected SnapshotInfo
snapshotName = latestSnapshotInfo.snapshotId().name

// Proceed with the restore operation
val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName)
.indices("*")
.storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.renamePattern("^(.*)\$")
.renameReplacement("$1_remote")
.waitForCompletion(false)
val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil {
restoreSnapshot(restoreSnapshotRequest, it)
}

when (response.status()) {
RestStatus.ACCEPTED, RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = getSuccessMessage(indexName)
}
else -> {
val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}")
logger.warn("$message - $response")
stepStatus = StepStatus.FAILED
mutableInfo["message"] = message
mutableInfo["cause"] = response.toString()
}
}
info = mutableInfo.toMap()
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotException) {
handleRestoreException(indexName, cause)
} else {
handleException(indexName, cause as Exception)
}
} catch (e: SnapshotException) {
handleRestoreException(indexName, e)
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

private fun handleRestoreException(indexName: String, e: SnapshotException) {
val message = getFailedRestoreMessage(indexName)
logger.debug(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName, e.message ?: "Unknown error")
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf<String, Any>("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetadata.actionMetaData
return currentMetadata.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent(): Boolean = false

companion object {
val validTopContextFields = setOf("index", "indexUuid")
const val name = "attempt_restore"
fun getFailedMessage(index: String, cause: String) = "Failed to start restore for [index=$index], cause: $cause"
fun getFailedRestoreMessage(index: String) = "Failed to start restore due to concurrent restore or snapshot in progress [index=$index]"
fun getSuccessMessage(index: String) = "Successfully started restore for [index=$index]"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.restore

import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.RestoreInProgress
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.transport.RemoteTransportException

class WaitForRestoreStep : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index

try {
val clusterState = context.clusterService.state()
val restoreInProgress = clusterState.custom<RestoreInProgress>(RestoreInProgress.TYPE)

val restoreOngoing = restoreInProgress?.let { rip ->
rip.any { entry ->
entry.indices().contains(indexName)
}
} ?: false

if (restoreOngoing) {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getPendingMessage(indexName))
} else {
// Restore is complete
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
}
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName, e.message ?: "Unknown error")
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val cause = (e as? RemoteTransportException)?.cause

val errorMessage = cause?.message ?: e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}

override fun isIdempotent(): Boolean = true

companion object {
const val name = "wait_for_restore"
fun getFailedMessage(index: String, cause: String) = "Failed to check restore status for [index=$index], cause: $cause"
fun getPendingMessage(index: String) = "Restore not complete for [index=$index], retrying..."
fun getSuccessMessage(index: String) = "Restore complete for [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ActionValidation(
"read_write" -> ValidateReadWrite(settings, clusterService, jvmService).execute(indexName)
"replica_count" -> ValidateReplicaCount(settings, clusterService, jvmService).execute(indexName)
"snapshot" -> ValidateSnapshot(settings, clusterService, jvmService).execute(indexName)
"convert_index_to_remote" -> ValidateConvertIndexToRemote(settings, clusterService, jvmService).execute(indexName)
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
Expand Down
Loading

0 comments on commit 7f54873

Please sign in to comment.