From 9d58b8d91f81f53b6cbacba929496bc198418a05 Mon Sep 17 00:00:00 2001 From: Harish Bhakuni Date: Tue, 27 Jun 2023 11:01:58 -0700 Subject: [PATCH] [Snapshot Interop] Add Changes in Restore Snapshot Flow for remote store Interoperability (#6788) * [Snapshot Interop] Add Changes in Restore Snapshot Flow for remote store interoperability. --------- Signed-off-by: Harish Bhakuni Co-authored-by: Harish Bhakuni --- .../rest-api-spec/api/snapshot.restore.json | 4 + .../snapshots/RestoreSnapshotIT.java | 513 ++++++++++++++++++ .../restore/RestoreSnapshotRequest.java | 42 +- .../RestoreSnapshotRequestBuilder.java | 8 + .../cluster/routing/RecoverySource.java | 50 +- .../opensearch/index/shard/IndexShard.java | 199 +++++-- .../shard/RemoteStoreRefreshListener.java | 15 +- .../opensearch/index/shard/StoreRecovery.java | 98 +++- .../store/RemoteSegmentStoreDirectory.java | 19 + .../RemoteSegmentStoreDirectoryFactory.java | 5 + .../metadata/RemoteSegmentMetadata.java | 11 +- .../index/translog/RemoteFsTranslog.java | 14 + .../transfer/TranslogTransferManager.java | 5 + .../cluster/IndicesClusterStateService.java | 15 +- .../repositories/FilterRepository.java | 10 + .../opensearch/repositories/Repository.java | 17 + .../blobstore/BlobStoreRepository.java | 10 + .../opensearch/snapshots/RestoreService.java | 15 +- .../restore/RestoreSnapshotRequestTests.java | 4 + .../cluster/routing/RecoverySourceTests.java | 24 + .../index/shard/IndexShardTests.java | 77 ++- .../RemoteSegmentStoreDirectoryTests.java | 18 +- .../RemoteSegmentMetadataHandlerTests.java | 4 + .../RepositoriesServiceTests.java | 10 + .../blobstore/BlobStoreRepositoryTests.java | 92 +++- .../index/shard/IndexShardTestCase.java | 32 +- .../blobstore/BlobStoreTestUtil.java | 31 +- .../AbstractSnapshotIntegTestCase.java | 48 +- 28 files changed, 1303 insertions(+), 87 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.restore.json b/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.restore.json index 87ab8117ec489..07148c7d261f4 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.restore.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.restore.json @@ -42,6 +42,10 @@ "type":"boolean", "description":"Should this request wait until the operation has completed before returning", "default":false + }, + "source_remote_store_repository": { + "type":"string", + "description":"Remote Store Repository of Remote Store Indices" } }, "body":{ diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java index 8be14d1188db8..9f492bbaee01a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java @@ -33,29 +33,44 @@ package org.opensearch.snapshots; import org.opensearch.action.ActionFuture; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder; import org.opensearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; +import org.opensearch.client.Requests; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.index.IndexSettings; import org.opensearch.indices.InvalidIndexNameException; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestStatus; +import org.opensearch.test.InternalTestCluster; +import java.io.IOException; import java.nio.file.Path; import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -70,6 +85,8 @@ import static org.hamcrest.Matchers.nullValue; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY; import static org.opensearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.opensearch.index.query.QueryBuilders.matchQuery; @@ -81,6 +98,10 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertRequestBuilderThrows; public class RestoreSnapshotIT extends AbstractSnapshotIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(FeatureFlags.REMOTE_STORE, "true").build(); + } public void testParallelRestoreOperations() { String indexName1 = "testindex1"; @@ -152,6 +173,498 @@ public void testParallelRestoreOperations() { assertThat(client.prepareGet(restoredIndexName2, docId2).get().isExists(), equalTo(true)); } + public void testRestoreRemoteStoreIndicesWithRemoteTranslog() throws IOException, ExecutionException, InterruptedException { + testRestoreOperationsShallowCopyEnabled(true); + } + + public void testRestoreRemoteStoreIndicesWithoutRemoteTranslog() throws IOException, ExecutionException, InterruptedException { + testRestoreOperationsShallowCopyEnabled(false); + } + + public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnabled) throws IOException, ExecutionException, + InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startNode(); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-restore-snapshot-repo"; + String remoteStoreRepoName = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; + String snapshotName1 = "test-restore-snapshot1"; + String snapshotName2 = "test-restore-snapshot2"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + Path absolutePath2 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + logger.info("Remote Store Repo Path [{}]", absolutePath2); + String restoredIndexName1 = indexName1 + "-restored"; + String restoredIndexName1Seg = indexName1 + "-restored-seg"; + String restoredIndexName1Doc = indexName1 + "-restored-doc"; + String restoredIndexName2 = indexName2 + "-restored"; + + createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true)); + createRepository(remoteStoreRepoName, "fs", absolutePath2); + + Client client = client(); + Settings indexSettings = getIndexSettings(true, remoteTranslogEnabled, remoteStoreRepoName, 1, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(false, false, null, 1, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 5; + final int numDocsInIndex2 = 6; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + final String secondNode = internalCluster().startNode(); + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1, indexName2) + .get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false)); + CreateSnapshotResponse createSnapshotResponse2 = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName2) + .setWaitForCompletion(true) + .setIndices(indexName1, indexName2) + .get(); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat( + createSnapshotResponse2.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet(); + assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED); + indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + randomIntBetween(2, 5)); + ensureGreen(indexName1); + + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(false) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .get(); + RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName2) + .setWaitForCompletion(false) + .setIndices(indexName2) + .setRenamePattern(indexName2) + .setRenameReplacement(restoredIndexName2) + .get(); + assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED); + assertEquals(restoreSnapshotResponse2.status(), RestStatus.ACCEPTED); + ensureGreen(restoredIndexName1, restoredIndexName2); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1); + assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2); + + // deleting data for restoredIndexName1 and restoring from remote store. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1))); + ensureRed(restoredIndexName1); + assertAcked(client().admin().indices().prepareClose(restoredIndexName1)); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); + ensureYellowAndNoInitializingShards(restoredIndexName1); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1); + // indexing some new docs and validating + indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); + + // restore index as seg rep enabled with remote store and remote translog disabled + RestoreSnapshotResponse restoreSnapshotResponse3 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(false) + .setIgnoreIndexSettings(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1Seg) + .get(); + assertEquals(restoreSnapshotResponse3.status(), RestStatus.ACCEPTED); + ensureGreen(restoredIndexName1Seg); + + GetIndexResponse getIndexResponse = client().admin() + .indices() + .getIndex(new GetIndexRequest().indices(restoredIndexName1Seg).includeDefaults(true)) + .get(); + indexSettings = getIndexResponse.settings().get(restoredIndexName1Seg); + assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); + assertNull(indexSettings.get(SETTING_REMOTE_STORE_REPOSITORY, null)); + assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); + assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1); + // indexing some new docs and validating + indexDocuments(client, restoredIndexName1Seg, numDocsInIndex1, numDocsInIndex1 + 2); + ensureGreen(restoredIndexName1Seg); + assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1 + 2); + + // restore index as doc rep based from shallow copy snapshot + RestoreSnapshotResponse restoreSnapshotResponse4 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(false) + .setIgnoreIndexSettings( + IndexMetadata.SETTING_REMOTE_STORE_ENABLED, + IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, + IndexMetadata.SETTING_REPLICATION_TYPE + ) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1Doc) + .get(); + assertEquals(restoreSnapshotResponse4.status(), RestStatus.ACCEPTED); + ensureGreen(restoredIndexName1Doc); + + getIndexResponse = client().admin() + .indices() + .getIndex(new GetIndexRequest().indices(restoredIndexName1Doc).includeDefaults(true)) + .get(); + indexSettings = getIndexResponse.settings().get(restoredIndexName1Doc); + assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); + assertNull(indexSettings.get(SETTING_REMOTE_STORE_REPOSITORY, null)); + assertNull(indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); + assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1); + // indexing some new docs and validating + indexDocuments(client, restoredIndexName1Doc, numDocsInIndex1, numDocsInIndex1 + 2); + ensureGreen(restoredIndexName1Doc); + assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2); + } + + public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startNode(); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-restore-snapshot-repo"; + String remoteStoreRepoName = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; + String snapshotName1 = "test-restore-snapshot1"; + String snapshotName2 = "test-restore-snapshot2"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + Path absolutePath2 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + logger.info("Remote Store Repo Path [{}]", absolutePath2); + String restoredIndexName2 = indexName2 + "-restored"; + + boolean enableShallowCopy = randomBoolean(); + createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, enableShallowCopy)); + createRepository(remoteStoreRepoName, "fs", absolutePath2); + + Client client = client(); + Settings indexSettings = getIndexSettings(true, randomBoolean(), remoteStoreRepoName, 1, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(false, false, null, 1, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 5; + final int numDocsInIndex2 = 6; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + final String secondNode = internalCluster().startNode(); + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1, indexName2) + .get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false)); + CreateSnapshotResponse createSnapshotResponse2 = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName2) + .setWaitForCompletion(true) + .setIndices(indexName1, indexName2) + .get(); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat( + createSnapshotResponse2.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet(); + assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED); + indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + randomIntBetween(2, 5)); + ensureGreen(indexName1); + + assertAcked(client().admin().indices().prepareClose(indexName1)); + + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(false) + .setIndices(indexName1) + .get(); + RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName2) + .setWaitForCompletion(false) + .setIndices(indexName2) + .setRenamePattern(indexName2) + .setRenameReplacement(restoredIndexName2) + .get(); + assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED); + assertEquals(restoreSnapshotResponse2.status(), RestStatus.ACCEPTED); + ensureGreen(indexName1, restoredIndexName2); + assertDocsPresentInIndex(client, indexName1, numDocsInIndex1); + assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2); + + // deleting data for restoredIndexName1 and restoring from remote store. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1))); + ensureRed(indexName1); + assertAcked(client().admin().indices().prepareClose(indexName1)); + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1), PlainActionFuture.newFuture()); + ensureYellowAndNoInitializingShards(indexName1); + ensureGreen(indexName1); + assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); + // indexing some new docs and validating + indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2); + ensureGreen(indexName1); + assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2); + } + + public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException { + internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startNode(); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-restore-snapshot-repo"; + String remoteStoreRepoName = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; + String remoteStoreRepo2Name = "test-rs-repo-2" + TEST_REMOTE_STORE_REPO_SUFFIX; + String snapshotName1 = "test-restore-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + Path absolutePath2 = randomRepoPath().toAbsolutePath(); + Path absolutePath3 = randomRepoPath().toAbsolutePath(); + String restoredIndexName1 = indexName1 + "-restored"; + + createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false)); + createRepository(remoteStoreRepoName, "fs", absolutePath2); + createRepository(remoteStoreRepo2Name, "fs", absolutePath3); + + Client client = client(); + Settings indexSettings = getIndexSettings(true, true, remoteStoreRepoName, 1, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(false, false, null, 1, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 5; + final int numDocsInIndex2 = 6; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + final String secondNode = internalCluster().startNode(); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1, indexName2) + .get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + Settings remoteStoreIndexSettings = Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo2Name) + .build(); + // restore index as a remote store index with different remote store repo + RestoreSnapshotResponse restoreSnapshotResponse = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(false) + .setIndexSettings(remoteStoreIndexSettings) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .get(); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1); + + // deleting data for restoredIndexName1 and restoring from remote store. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1))); + assertAcked(client().admin().indices().prepareClose(restoredIndexName1)); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); + ensureYellowAndNoInitializingShards(restoredIndexName1); + ensureGreen(restoredIndexName1); + // indexing some new docs and validating + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1); + indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); + } + + private Settings.Builder getIndexSettings( + boolean enableRemoteStore, + boolean enableRemoteTranslog, + String remoteStoreRepo, + int numOfShards, + int numOfReplicas + ) { + Settings.Builder settingsBuilder = Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas); + if (enableRemoteStore) { + settingsBuilder.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } + if (enableRemoteTranslog) { + settingsBuilder.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStoreRepo) + .build(); + } + return settingsBuilder; + } + + public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionException, InterruptedException { + String indexName1 = "testindex1"; + String snapshotRepoName = "test-restore-snapshot-repo"; + String remoteStoreRepoName = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX; + String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX; + String snapshotName1 = "test-restore-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + Path absolutePath2 = randomRepoPath().toAbsolutePath(); + Path absolutePath3 = randomRepoPath().toAbsolutePath(); + String[] pathTokens = absolutePath1.toString().split("/"); + String basePath = pathTokens[pathTokens.length - 1]; + Arrays.copyOf(pathTokens, pathTokens.length - 1); + Path location = PathUtils.get(String.join("/", pathTokens)); + pathTokens = absolutePath2.toString().split("/"); + String basePath2 = pathTokens[pathTokens.length - 1]; + Arrays.copyOf(pathTokens, pathTokens.length - 1); + Path location2 = PathUtils.get(String.join("/", pathTokens)); + logger.info("Path 1 [{}]", absolutePath1); + logger.info("Path 2 [{}]", absolutePath2); + logger.info("Path 3 [{}]", absolutePath3); + String restoredIndexName1 = indexName1 + "-restored"; + + createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true)); + createRepository(remoteStoreRepoName, "fs", absolutePath3); + + Client client = client(); + Settings indexSettings = Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepoName) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + createIndex(indexName1, indexSettings); + + int numDocsInIndex1 = randomIntBetween(2, 5); + indexDocuments(client, indexName1, numDocsInIndex1); + + ensureGreen(indexName1); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1) + .get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + createRepository(remoteStoreRepoName, "fs", absolutePath2); + + RestoreSnapshotResponse restoreSnapshotResponse = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .get(); + + assertTrue(restoreSnapshotResponse.getRestoreInfo().failedShards() > 0); + + ensureRed(restoredIndexName1); + + client().admin().indices().close(Requests.closeIndexRequest(restoredIndexName1)).get(); + createRepository(remoteStoreRepoNameUpdated, "fs", absolutePath3); + RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated) + .get(); + + assertTrue(restoreSnapshotResponse2.getRestoreInfo().failedShards() == 0); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1); + + // indexing some new docs and validating + indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); + } + + private void indexDocuments(Client client, String indexName, int numOfDocs) { + indexDocuments(client, indexName, 0, numOfDocs); + } + + private void indexDocuments(Client client, String indexName, int fromId, int toId) { + for (int i = fromId; i < toId; i++) { + String id = Integer.toString(i); + client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get(); + } + client.admin().indices().prepareFlush(indexName).get(); + } + + private void assertDocsPresentInIndex(Client client, String indexName, int numOfDocs) { + for (int i = 0; i < numOfDocs; i++) { + String id = Integer.toString(i); + logger.info("checking for index " + indexName + " with docId" + id); + assertTrue("doc with id" + id + " is not present for index " + indexName, client.prepareGet(indexName, id).get().isExists()); + } + } + public void testParallelRestoreOperationsFromSingleSnapshot() throws Exception { String indexName1 = "testindex1"; String indexName2 = "testindex2"; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java index e127b44116b7e..7ff16fcf6fc03 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java @@ -113,6 +113,8 @@ private static StorageType fromString(String string) { private Settings indexSettings = EMPTY_SETTINGS; private String[] ignoreIndexSettings = Strings.EMPTY_ARRAY; private StorageType storageType = StorageType.LOCAL; + @Nullable + private String sourceRemoteStoreRepository = null; @Nullable // if any snapshot UUID will do private String snapshotUuid; @@ -148,6 +150,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_7_0)) { storageType = in.readEnum(StorageType.class); } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + sourceRemoteStoreRepository = in.readOptionalString(); + } } @Override @@ -169,6 +174,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_7_0)) { out.writeEnum(storageType); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalString(sourceRemoteStoreRepository); + } } @Override @@ -521,6 +529,25 @@ public StorageType storageType() { return storageType; } + /** + * Sets Source Remote Store Repository for all the restored indices + * + * @param sourceRemoteStoreRepository name of the remote store repository that should be used for all restored indices. + */ + public RestoreSnapshotRequest setSourceRemoteStoreRepository(String sourceRemoteStoreRepository) { + this.sourceRemoteStoreRepository = sourceRemoteStoreRepository; + return this; + } + + /** + * Returns Source Remote Store Repository for all the restored indices + * + * @return source Remote Store Repository + */ + public String getSourceRemoteStoreRepository() { + return sourceRemoteStoreRepository; + } + /** * Parses restore definition * @@ -586,6 +613,12 @@ public RestoreSnapshotRequest source(Map source) { throw new IllegalArgumentException("malformed storage_type"); } + } else if (name.equals("source_remote_store_repository")) { + if (entry.getValue() instanceof String) { + setSourceRemoteStoreRepository((String) entry.getValue()); + } else { + throw new IllegalArgumentException("malformed source_remote_store_repository"); + } } else { if (IndicesOptions.isIndicesOptions(name) == false) { throw new IllegalArgumentException("Unknown parameter " + name); @@ -631,6 +664,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (storageType != null) { storageType.toXContent(builder); } + if (sourceRemoteStoreRepository != null) { + builder.field("source_remote_store_repository", sourceRemoteStoreRepository); + } builder.endObject(); return builder; } @@ -658,7 +694,8 @@ public boolean equals(Object o) { && Objects.equals(indexSettings, that.indexSettings) && Arrays.equals(ignoreIndexSettings, that.ignoreIndexSettings) && Objects.equals(snapshotUuid, that.snapshotUuid) - && Objects.equals(storageType, that.storageType); + && Objects.equals(storageType, that.storageType) + && Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository); } @Override @@ -675,7 +712,8 @@ public int hashCode() { includeAliases, indexSettings, snapshotUuid, - storageType + storageType, + sourceRemoteStoreRepository ); result = 31 * result + Arrays.hashCode(indices); result = 31 * result + Arrays.hashCode(ignoreIndexSettings); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestBuilder.java index 0104637a00035..d9cca536d1c41 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestBuilder.java @@ -256,4 +256,12 @@ public RestoreSnapshotRequestBuilder setStorageType(RestoreSnapshotRequest.Stora request.storageType(storageType); return this; } + + /** + * Sets the source remote store repository name + */ + public RestoreSnapshotRequestBuilder setSourceRemoteStoreRepository(String repositoryName) { + request.setSourceRemoteStoreRepository(repositoryName); + return this; + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java index ef86eb31e2817..1af4fbe8ffb45 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java @@ -34,6 +34,7 @@ import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -257,9 +258,11 @@ public static class SnapshotRecoverySource extends RecoverySource { private final IndexId index; private final Version version; private final boolean isSearchableSnapshot; + private final boolean remoteStoreIndexShallowCopy; + private final String sourceRemoteStoreRepository; public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) { - this(restoreUUID, snapshot, version, indexId, false); + this(restoreUUID, snapshot, version, indexId, false, false, null); } public SnapshotRecoverySource( @@ -267,13 +270,17 @@ public SnapshotRecoverySource( Snapshot snapshot, Version version, IndexId indexId, - boolean isSearchableSnapshot + boolean isSearchableSnapshot, + boolean remoteStoreIndexShallowCopy, + @Nullable String sourceRemoteStoreRepository ) { this.restoreUUID = restoreUUID; this.snapshot = Objects.requireNonNull(snapshot); this.version = Objects.requireNonNull(version); this.index = Objects.requireNonNull(indexId); this.isSearchableSnapshot = isSearchableSnapshot; + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; + this.sourceRemoteStoreRepository = sourceRemoteStoreRepository; } SnapshotRecoverySource(StreamInput in) throws IOException { @@ -286,6 +293,13 @@ public SnapshotRecoverySource( } else { isSearchableSnapshot = false; } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + remoteStoreIndexShallowCopy = in.readBoolean(); + sourceRemoteStoreRepository = in.readOptionalString(); + } else { + remoteStoreIndexShallowCopy = false; + sourceRemoteStoreRepository = null; + } } public String restoreUUID() { @@ -314,6 +328,14 @@ public boolean isSearchableSnapshot() { return isSearchableSnapshot; } + public String sourceRemoteStoreRepository() { + return sourceRemoteStoreRepository; + } + + public boolean remoteStoreIndexShallowCopy() { + return remoteStoreIndexShallowCopy; + } + @Override protected void writeAdditionalFields(StreamOutput out) throws IOException { out.writeString(restoreUUID); @@ -323,6 +345,10 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_7_0)) { out.writeBoolean(isSearchableSnapshot); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(remoteStoreIndexShallowCopy); + out.writeOptionalString(sourceRemoteStoreRepository); + } } @Override @@ -337,7 +363,9 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param .field("version", version.toString()) .field("index", index.getName()) .field("restoreUUID", restoreUUID) - .field("isSearchableSnapshot", isSearchableSnapshot); + .field("isSearchableSnapshot", isSearchableSnapshot) + .field("remoteStoreIndexShallowCopy", remoteStoreIndexShallowCopy) + .field("sourceRemoteStoreRepository", sourceRemoteStoreRepository); } @Override @@ -359,12 +387,24 @@ public boolean equals(Object o) { && snapshot.equals(that.snapshot) && index.equals(that.index) && version.equals(that.version) - && isSearchableSnapshot == that.isSearchableSnapshot; + && isSearchableSnapshot == that.isSearchableSnapshot + && remoteStoreIndexShallowCopy == that.remoteStoreIndexShallowCopy + && sourceRemoteStoreRepository != null + ? sourceRemoteStoreRepository.equals(that.sourceRemoteStoreRepository) + : that.sourceRemoteStoreRepository == null; } @Override public int hashCode() { - return Objects.hash(restoreUUID, snapshot, index, version, isSearchableSnapshot); + return Objects.hash( + restoreUUID, + snapshot, + index, + version, + isSearchableSnapshot, + remoteStoreIndexShallowCopy, + sourceRemoteStoreRepository + ); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9938d11caca13..d89d51c713d70 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -44,6 +44,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; @@ -1495,7 +1496,7 @@ public GatedCloseable acquireLastIndexCommitAndRefresh(boolean flus * @throws IOException if there is some failure in acquiring lock in remote store. */ public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException { - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard(); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory(); remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId); } @@ -1507,20 +1508,10 @@ public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long ge * @throws IOException if there is some failure in releasing lock in remote store. */ public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException { - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard(); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory(); remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId); } - private RemoteSegmentStoreDirectory getRemoteSegmentDirectoryForShard() { - FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); - assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory - : "Store.directory is not enclosing an instance of FilterDirectory"; - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); - assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; - return ((RemoteSegmentStoreDirectory) remoteDirectory); - } - public Optional getReplicationEngine() { if (getEngine() instanceof NRTReplicationEngine) { return Optional.of((NRTReplicationEngine) getEngine()); @@ -2290,7 +2281,24 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { getEngine().translogManager().skipTranslogRecovery(); } + public void openEngineAndSkipTranslogRecoveryFromSnapshot() throws IOException { + assert routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT : "not a snapshot recovery [" + + routingEntry() + + "]"; + recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); + maybeCheckIndex(); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + loadGlobalCheckpointToReplicationTracker(); + innerOpenEngineAndTranslog(replicationTracker, false); + getEngine().translogManager().skipTranslogRecovery(); + } + private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { + innerOpenEngineAndTranslog(globalCheckpointSupplier, true); + } + + private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, boolean syncFromRemote) throws IOException { assert Thread.holdsLock(mutex) == false : "opening engine under mutex"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); @@ -2309,11 +2317,20 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t synchronized (engineMutex) { assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); - if (indexSettings.isRemoteStoreEnabled()) { + if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { syncSegmentsFromRemoteSegmentStore(false, true, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { - syncRemoteTranslogAndUpdateGlobalCheckpoint(); + if (syncFromRemote) { + syncRemoteTranslogAndUpdateGlobalCheckpoint(); + } else { + // we will enter this block when we do not want to recover from remote translog. + // currently only during snapshot restore, we are coming into this block. + // here, as while initiliazing remote translog we cannot skip downloading translog files, + // so before that step, we are deleting the translog files present in remote store. + deleteTranslogFilesFromRemoteTranslog(); + + } } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); @@ -2605,6 +2622,22 @@ public void restoreFromRemoteStore(ActionListener listener) { storeRecovery.recoverFromRemoteStore(this, listener); } + public void restoreFromSnapshotAndRemoteStore( + Repository repository, + RepositoriesService repositoriesService, + ActionListener listener + ) { + try { + assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; + assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + + recoveryState.getRecoverySource(); + StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + public void restoreFromRepository(Repository repository, ActionListener listener) { try { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; @@ -3418,6 +3451,15 @@ public void startRecovery( final SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource(); if (recoverySource.isSearchableSnapshot()) { executeRecovery("from snapshot (remote)", recoveryState, recoveryListener, this::recoverFromStore); + } else if (recoverySource.remoteStoreIndexShallowCopy()) { + final String repo = recoverySource.snapshot().getRepository(); + executeRecovery( + "from snapshot and remote store", + recoveryState, + recoveryListener, + l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), repositoriesService, l) + ); + // indicesService.indexService(shardRouting.shardId().getIndex()).addMetadataListener(); } else { final String repo = recoverySource.snapshot().getRepository(); executeRecovery( @@ -4536,6 +4578,13 @@ private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException { loadGlobalCheckpointToReplicationTracker(); } + public void deleteTranslogFilesFromRemoteTranslog() throws IOException { + TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); + assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; + Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); + RemoteFsTranslog.cleanup(repository, shardId, getThreadPool()); + } + public void syncTranslogFilesFromRemoteTranslog() throws IOException { TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; @@ -4558,12 +4607,11 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that // are uploaded to the remote segment store. RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init(); - Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) + + Map uploadedSegments = remoteDirectory .getSegmentsUploadedToRemoteStore(); store.incRef(); remoteStore.incRef(); - List downloadedSegments = new ArrayList<>(); - List skippedSegments = new ArrayList<>(); try { final Directory storeDirectory; if (recoveryState.getStage() == RecoveryState.Stage.INDEX) { @@ -4580,18 +4628,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re storeDirectory = store.directory(); } Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); - for (String file : uploadedSegments.keySet()) { - long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); - if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { - if (localSegmentFiles.contains(file)) { - storeDirectory.deleteFile(file); - } - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); - downloadedSegments.add(file); - } else { - skippedSegments.add(file); - } - } + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { try ( @@ -4637,13 +4674,113 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); } finally { - logger.info("Downloaded segments: {}", downloadedSegments); - logger.info("Skipped download for segments: {}", skippedSegments); store.decRef(); remoteStore.decRef(); } } + /** + * Downloads segments from given remote segment store for a specific commit. + * @param overrideLocal flag to override local segment files with those in remote store + * @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments + * @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments + * @param commitGeneration commit generation at the time of commit operation for which we are syncing segments + * @throws IOException if exception occurs while reading segments from remote store + */ + public void syncSegmentsFromGivenRemoteSegmentStore( + boolean overrideLocal, + RemoteSegmentStoreDirectory sourceRemoteDirectory, + long primaryTerm, + long commitGeneration + ) throws IOException { + logger.info("Downloading segments from given remote segment store"); + RemoteSegmentStoreDirectory remoteDirectory = null; + if (remoteStore != null) { + remoteDirectory = getRemoteDirectory(); + remoteDirectory.init(); + remoteStore.incRef(); + } + Map uploadedSegments = sourceRemoteDirectory + .initializeToSpecificCommit(primaryTerm, commitGeneration) + .getMetadata(); + final Directory storeDirectory = store.directory(); + store.incRef(); + + try { + String segmentsNFile = copySegmentFiles( + storeDirectory, + sourceRemoteDirectory, + remoteDirectory, + uploadedSegments, + overrideLocal + ); + if (segmentsNFile != null) { + try ( + ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( + storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT) + ) + ) { + SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration); + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + if (remoteStore != null) { + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } else { + store.directory().sync(infosSnapshot.files(true)); + store.directory().syncMetaData(); + } + } + } + } catch (IOException e) { + throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); + } finally { + store.decRef(); + if (remoteStore != null) { + remoteStore.decRef(); + } + } + } + + private String copySegmentFiles( + Directory storeDirectory, + RemoteSegmentStoreDirectory sourceRemoteDirectory, + RemoteSegmentStoreDirectory targetRemoteDirectory, + Map uploadedSegments, + boolean overrideLocal + ) throws IOException { + List downloadedSegments = new ArrayList<>(); + List skippedSegments = new ArrayList<>(); + String segmentNFile = null; + try { + Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); + if (overrideLocal) { + for (String file : localSegmentFiles) { + storeDirectory.deleteFile(file); + } + } + for (String file : uploadedSegments.keySet()) { + long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); + if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT); + storeDirectory.sync(Collections.singleton(file)); + downloadedSegments.add(file); + } else { + skippedSegments.add(file); + } + if (targetRemoteDirectory != null) { + targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + } + if (file.startsWith(IndexFileNames.SEGMENTS)) { + assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file"; + segmentNFile = file; + } + } + } finally { + logger.info("Downloaded segments here: {}", downloadedSegments); + logger.info("Skipped download for segments here: {}", skippedSegments); + } + return segmentNFile; + } + private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index d7f7373e83bd0..7cfaaafcadd39 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -29,6 +29,7 @@ import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.Scheduler; @@ -71,6 +72,8 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres */ private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 10_000; + private static final int INVALID_PRIMARY_TERM = -1; + /** * Exponential back off policy with max retry interval. */ @@ -118,15 +121,18 @@ public RemoteStoreRefreshListener( this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) .getDelegate()).getDelegate(); - this.primaryTerm = indexShard.getOperationPrimaryTerm(); localSegmentChecksumMap = new HashMap<>(); + RemoteSegmentMetadata remoteSegmentMetadata = null; if (indexShard.routingEntry().primary()) { try { - this.remoteDirectory.init(); + remoteSegmentMetadata = this.remoteDirectory.init(); } catch (IOException e) { logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } + // initializing primary term with the primary term of latest metadata in remote store. + // if no metadata is present, this value will be initilized with -1. + this.primaryTerm = remoteSegmentMetadata != null ? remoteSegmentMetadata.getPrimaryTerm() : INVALID_PRIMARY_TERM; this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; @@ -163,8 +169,9 @@ public void beforeRefresh() throws IOException {} */ @Override public void afterRefresh(boolean didRefresh) { - - if (didRefresh || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm() + || didRefresh + || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { updateLocalRefreshTimeAndSeqNo(); try { indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index a81dc96ff1145..119524e8caf8a 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -59,14 +59,19 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import java.io.IOException; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -347,6 +352,72 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A } } + void recoverFromSnapshotAndRemoteStore( + final IndexShard indexShard, + Repository repository, + RepositoriesService repositoriesService, + ActionListener listener + ) { + try { + if (canRecover(indexShard)) { + indexShard.preRecovery(); + RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); + assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; + SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); + final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); + translogState.totalOperations(0); + translogState.totalOperationsOnStart(0); + indexShard.prepareForIndexRecovery(); + + RemoteStoreShardShallowCopySnapshot shallowCopyShardMetadata = repository.getRemoteStoreShallowCopyShardMetadata( + recoverySource.snapshot().getSnapshotId(), + recoverySource.index(), + shardId + ); + + long primaryTerm = shallowCopyShardMetadata.getPrimaryTerm(); + long commitGeneration = shallowCopyShardMetadata.getCommitGeneration(); + String indexUUID = shallowCopyShardMetadata.getIndexUUID(); + String remoteStoreRepository = ((SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource()) + .sourceRemoteStoreRepository(); + if (remoteStoreRepository == null) { + remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); + } + + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService); + RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( + remoteStoreRepository, + indexUUID, + String.valueOf(shardId.id()) + ); + indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration); + final Store store = indexShard.store(); + if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) { + bootstrap(indexShard, store); + } else { + bootstrapForSnapshot(indexShard, store); + } + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + writeEmptyRetentionLeasesFile(indexShard); + indexShard.recoveryState().getIndex().setFileDetailsComplete(); + if (indexShard.indexSettings.isRemoteStoreEnabled()) { + indexShard.openEngineAndSkipTranslogRecoveryFromSnapshot(); + } else { + indexShard.openEngineAndRecoverFromTranslog(); + } + indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); + indexShard.finalizeRecovery(); + indexShard.postRecovery("restore done"); + + listener.onResponse(true); + } else { + listener.onResponse(false); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + private boolean canRecover(IndexShard indexShard) { if (indexShard.state() == IndexShardState.CLOSED) { // got closed on us, just ignore this recovery @@ -597,10 +668,18 @@ private void restore( } final ActionListener restoreListener = ActionListener.wrap(v -> { final Store store = indexShard.store(); - bootstrap(indexShard, store); + if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) { + bootstrap(indexShard, store); + } else { + bootstrapForSnapshot(indexShard, store); + } assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; writeEmptyRetentionLeasesFile(indexShard); - indexShard.openEngineAndRecoverFromTranslog(); + if (indexShard.indexSettings.isRemoteStoreEnabled()) { + indexShard.openEngineAndSkipTranslogRecoveryFromSnapshot(); + } else { + indexShard.openEngineAndRecoverFromTranslog(); + } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); @@ -644,6 +723,21 @@ private void restore( } } + private void bootstrapForSnapshot(final IndexShard indexShard, final Store store) throws IOException { + store.bootstrapNewHistory(); + final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + shardId, + localCheckpoint, + indexShard.getPendingPrimaryTerm(), + translogUUID, + FileChannel::open + ); + } + private void bootstrap(final IndexShard indexShard, final Store store) throws IOException { store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 15c6fbea99148..addd8a24af9c5 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -126,6 +126,24 @@ public RemoteSegmentMetadata init() throws IOException { return remoteSegmentMetadata; } + /** + * Initializes the cache to a specific commit which keeps track of all the segment files uploaded to the + * remote segment store. + * this is currently used to restore snapshots, where we want to copy segment files from a given commit. + * TODO: check if we can return read only RemoteSegmentStoreDirectory object from here. + * @throws IOException if there were any failures in reading the metadata file + */ + public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration) throws IOException { + String metadataFile = getMetadataFileForCommit(primaryTerm, commitGeneration); + RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile); + if (remoteSegmentMetadata != null) { + this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata()); + } else { + this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(); + } + return remoteSegmentMetadata; + } + /** * Read the latest metadata file to get the list of segments uploaded to the remote segment store. * We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit. @@ -485,6 +503,7 @@ public void uploadMetadata( new RemoteSegmentMetadata( RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments), segmentInfoSnapshotByteArray, + primaryTerm, segmentInfosSnapshot.getGeneration() ) ); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 388f80ea3e480..03995d5913fb3 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -43,6 +43,11 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw String repositoryName = indexSettings.getRemoteStoreRepository(); String indexUUID = indexSettings.getIndex().getUUID(); String shardId = String.valueOf(path.getShardId().getId()); + + return newDirectory(repositoryName, indexUUID, shardId); + } + + public Directory newDirectory(String repositoryName, String indexUUID, String shardId) throws IOException { try (Repository repository = repositoriesService.get().repository(repositoryName)) { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath(); diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java index 2a84fbfb89c93..9a479346ff711 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -38,16 +38,19 @@ public class RemoteSegmentMetadata { private final byte[] segmentInfosBytes; + private final long primaryTerm; private final long generation; public RemoteSegmentMetadata( Map metadata, byte[] segmentInfosBytes, + long primaryTerm, long generation ) { this.metadata = metadata; this.segmentInfosBytes = segmentInfosBytes; this.generation = generation; + this.primaryTerm = primaryTerm; } /** @@ -66,6 +69,10 @@ public long getGeneration() { return generation; } + public long getPrimaryTerm() { + return primaryTerm; + } + /** * Generate {@code Map} from {@link RemoteSegmentMetadata} * @return {@code Map} @@ -93,6 +100,7 @@ public static Map f public void write(IndexOutput out) throws IOException { out.writeMapOfStrings(toMapOfStrings()); out.writeLong(generation); + out.writeLong(primaryTerm); out.writeLong(segmentInfosBytes.length); out.writeBytes(segmentInfosBytes, segmentInfosBytes.length); } @@ -100,9 +108,10 @@ public void write(IndexOutput out) throws IOException { public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException { Map metadata = indexInput.readMapOfStrings(); long generation = indexInput.readLong(); + long primaryTerm = indexInput.readLong(); int byteArraySize = (int) indexInput.readLong(); byte[] segmentInfosBytes = new byte[byteArraySize]; indexInput.readBytes(segmentInfosBytes, 0, byteArraySize); - return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, generation); + return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, primaryTerm, generation); } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 6ebb1bf7d2252..04057b581e8d9 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -423,6 +423,20 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { } } + public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); + TranslogTransferManager translogTransferManager = buildTranslogTransferManager( + blobStoreRepository, + threadPool, + shardId, + fileTransferTracker + ); + // clean up all remote translog files + translogTransferManager.deleteTranslogFiles(); + } + protected void onDelete() { if (primaryModeSupplier.getAsBoolean() == false) { logger.trace("skipped delete translog"); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 58aca00d2e9d3..f6405bc9b5c82 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -361,6 +361,11 @@ public void onFailure(Exception e) { }); } + public void deleteTranslogFiles() throws IOException { + transferService.delete(remoteMetadataTransferPath); + transferService.delete(remoteDataTransferPath); + } + /** * Deletes list of translog files asynchronously using the {@code REMOTE_PURGE} threadpool. * diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 4a0fab82f9adc..e4b251914fa0b 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -107,6 +107,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.CLOSED; import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED; import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.FAILURE; @@ -544,7 +545,19 @@ private void createIndices(final ClusterState state) { AllocatedIndex indexService = null; try { - indexService = indicesService.createIndex(indexMetadata, builtInIndexListener, true); + List updatedIndexEventListeners = new ArrayList<>(builtInIndexListener); + if (entry.getValue().size() > 0 + && entry.getValue().get(0).recoverySource().getType() == Type.SNAPSHOT + && indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) { + final IndexEventListener refreshListenerAfterSnapshotRestore = new IndexEventListener() { + @Override + public void afterIndexShardStarted(IndexShard indexShard) { + indexShard.refresh("refresh to upload metadata to remote store"); + } + }; + updatedIndexEventListeners.add(refreshListenerAfterSnapshotRestore); + } + indexService = indicesService.createIndex(indexMetadata, updatedIndexEventListeners, true); if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh( state.nodes().getClusterManagerNode(), diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 88e14a4dff3a0..b108e2da1ab04 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -45,6 +45,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; @@ -219,6 +220,15 @@ public void restoreShard( in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState, listener); } + @Override + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + return in.getRemoteStoreShallowCopyShardMetadata(snapshotId, indexId, snapshotShardId); + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { return in.getShardSnapshotStatus(snapshotId, indexId, shardId); diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 045b7ad348a76..c08369b79452d 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -46,6 +46,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; @@ -304,6 +305,22 @@ void restoreShard( ActionListener listener ); + /** + * Returns Snapshot Shard Metadata for remote store interop enabled snapshot. + *

+ * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied. + * @param snapshotId snapshot id + * @param indexId id of the index in the repository from which the restore is occurring + * @param snapshotShardId shard id (in the snapshot) + */ + default RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + throw new UnsupportedOperationException(); + } + /** * Retrieve shard snapshot status for the stored snapshot * diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index c0d6f49a5ce0d..f04bf83c2f1d1 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2789,6 +2789,16 @@ public InputStream maybeRateLimitSnapshots(InputStream stream) { return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos); } + @Override + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + final BlobContainer container = shardContainer(indexId, snapshotShardId); + return loadShallowCopyShardSnapshot(container, snapshotId); + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId); diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 3d4b04889a5c9..bd162914e830e 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -450,12 +450,25 @@ public ClusterState execute(ClusterState currentState) { final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match( snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()) ); + final boolean isRemoteStoreShallowCopy = Boolean.TRUE.equals( + snapshotInfo.isRemoteStoreIndexShallowCopyEnabled() + ) && metadata.index(index).getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false); + if (isRemoteStoreShallowCopy && !currentState.getNodes().getMinNodeVersion().onOrAfter(Version.V_3_0_0)) { + throw new SnapshotRestoreException( + snapshot, + "cannot restore shallow copy snapshot for index [" + + index + + "] as some of the nodes in cluster have version less than 2.9" + ); + } final SnapshotRecoverySource recoverySource = new SnapshotRecoverySource( restoreUUID, snapshot, snapshotInfo.version(), repositoryData.resolveIndexId(index), - isSearchableSnapshot + isSearchableSnapshot, + isRemoteStoreShallowCopy, + request.getSourceRemoteStoreRepository() ); final Version minIndexCompatibilityVersion; if (isSearchableSnapshot && isSearchableSnapshotsExtendedCompatibilityEnabled()) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java index 737e7b2e4887b..bb55ac810ed09 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java @@ -112,6 +112,10 @@ private RestoreSnapshotRequest randomState(RestoreSnapshotRequest instance) { instance.snapshotUuid(randomBoolean() ? null : randomAlphaOfLength(10)); } + if (randomBoolean()) { + instance.setSourceRemoteStoreRepository(randomAlphaOfLengthBetween(5, 10)); + } + return instance; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java b/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java index e4aae52f41e68..a5c006362a20c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java @@ -36,6 +36,8 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.repositories.IndexId; +import org.opensearch.snapshots.Snapshot; +import org.opensearch.snapshots.SnapshotId; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -54,6 +56,28 @@ public void testSerialization() throws IOException { assertEquals(recoverySource, serializedRecoverySource); } + public void testSerializationSnapshotRecoverySource() throws IOException { + boolean isSearchableSnapshot = randomBoolean(); + boolean isRemoteStoreShallowCopyEnabled = randomBoolean(); + String sourceRemoteStoreRepo = "test-remote-repo"; + RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource( + UUIDs.randomBase64UUID(), + new Snapshot("repo", new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())), + Version.CURRENT, + new IndexId("some_index", UUIDs.randomBase64UUID(random())), + isSearchableSnapshot, + isRemoteStoreShallowCopyEnabled, + sourceRemoteStoreRepo + ); + BytesStreamOutput out = new BytesStreamOutput(); + recoverySource.writeTo(out); + RecoverySource serializedRecoverySource = RecoverySource.readFrom(out.bytes().streamInput()); + assertEquals(recoverySource.getType(), serializedRecoverySource.getType()); + assertEquals(recoverySource, serializedRecoverySource); + assertEquals(recoverySource.remoteStoreIndexShallowCopy(), isRemoteStoreShallowCopyEnabled); + assertEquals(recoverySource.isSearchableSnapshot(), isSearchableSnapshot); + } + public void testRecoverySourceTypeOrder() { assertEquals(RecoverySource.Type.EMPTY_STORE.ordinal(), 0); assertEquals(RecoverySource.Type.EXISTING_STORE.ordinal(), 1); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 1c0ebf17285f7..58527dbea5791 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; @@ -49,6 +50,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.junit.Assert; +import org.opensearch.common.io.PathUtils; import org.opensearch.core.Assertions; import org.opensearch.OpenSearchException; import org.opensearch.Version; @@ -127,6 +129,7 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreStats; import org.opensearch.index.store.StoreUtils; @@ -188,7 +191,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; - +import java.util.Collection; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -2797,6 +2800,78 @@ public void restoreShard( closeShards(target); } + public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException { + String remoteStorePath = createTempDir().toString(); + IndexShard source = newStartedShard( + true, + Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStorePath + "__test") + .build(), + new InternalEngineFactory() + ); + indexDoc(source, "_doc", "1"); + indexDoc(source, "_doc", "2"); + source.refresh("test"); + assertDocs(source, "1", "2"); + indexDoc(source, "_doc", "3"); + source.refresh("test"); + flushShard(source); + + indexDoc(source, "_doc", "5"); + source.refresh("test"); + + indexDoc(source, "_doc", "4"); + source.refresh("test"); + + long primaryTerm; + long commitGeneration; + try (GatedCloseable segmentInfosGatedCloseable = source.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + primaryTerm = source.getOperationPrimaryTerm(); + commitGeneration = segmentInfos.getGeneration(); + } + Collection lastCommitedSegmentsInSource = SegmentInfos.readLatestCommit(source.store().directory()).files(false); + + closeShards(source); + + RemoteSegmentStoreDirectory tempRemoteSegmentDirectory = createRemoteSegmentStoreDirectory( + source.shardId(), + PathUtils.get(remoteStorePath) + ); + + IndexShard target = newStartedShard( + true, + Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(), + new InternalEngineFactory() + ); + ShardRouting routing = ShardRoutingHelper.initWithSameId( + target.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.INSTANCE + ); + routing = ShardRoutingHelper.newWithRestoreSource(routing, new RecoverySource.EmptyStoreRecoverySource()); + + target = reinitShard(target, routing); + + target.syncSegmentsFromGivenRemoteSegmentStore(false, tempRemoteSegmentDirectory, primaryTerm, commitGeneration); + RemoteSegmentStoreDirectory remoteStoreDirectory = ((RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) target + .remoteStore() + .directory()).getDelegate()).getDelegate()); + Collection uploadFiles = remoteStoreDirectory.getSegmentsUploadedToRemoteStore().keySet(); + assertTrue(uploadFiles.containsAll(lastCommitedSegmentsInSource)); + assertTrue( + "Failed to sync all files to new shard", + List.of(target.store().directory().listAll()).containsAll(lastCommitedSegmentsInSource) + ); + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) target.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(target); + } + public void testRefreshLevelRestoreShardFromRemoteStore() throws IOException { testRestoreShardFromRemoteStore(false); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index fec9b04d6e371..3417e7b0aee04 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -228,7 +228,8 @@ private Map getDummyMetadata(String prefix, int commitGeneration * @return ByteArrayIndexInput: metadata file bytes with header and footer * @throws IOException IOException */ - private ByteArrayIndexInput createMetadataFileBytes(Map segmentFilesMap, long generation) throws IOException { + private ByteArrayIndexInput createMetadataFileBytes(Map segmentFilesMap, long generation, long primaryTerm) + throws IOException { ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); segmentInfos.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "", "")); byte[] byteArray = byteBuffersIndexOutput.toArrayCopy(); @@ -238,6 +239,7 @@ private ByteArrayIndexInput createMetadataFileBytes(Map segmentF CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, RemoteSegmentMetadata.CURRENT_VERSION); indexOutput.writeMapOfStrings(segmentFilesMap); indexOutput.writeLong(generation); + indexOutput.writeLong(primaryTerm); indexOutput.writeLong(byteArray.length); indexOutput.writeBytes(byteArray, byteArray.length); CodecUtil.writeFooter(indexOutput); @@ -261,14 +263,14 @@ private Map> populateMetadata() throws IOException { ); when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1) + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1, 5) ); when(remoteMetadataDirectory.openInput("metadata__1__6__pqr", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__6__pqr"), 1) + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__6__pqr"), 1, 6) ); when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1), - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1) + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1, 2), + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1, 2) ); return metadataFilenameContentMapping; @@ -503,7 +505,7 @@ public void testGetSegmentsUploadedToRemoteStore() throws IOException { ); when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1) + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1, 5) ); assert (remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore(testPrimaryTerm, testGeneration).containsKey("segments_5")); @@ -577,7 +579,9 @@ public void testContainsFile() throws IOException { metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata, 1)); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( + createMetadataFileBytes(metadata, 1, 5) + ); remoteSegmentStoreDirectory.init(); diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index 3bf7781fb909f..cc0764a6700b1 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -61,6 +61,7 @@ public void testReadContentNoSegmentInfos() throws IOException { Map expectedOutput = getDummyData(); indexOutput.writeMapOfStrings(expectedOutput); indexOutput.writeLong(1234); + indexOutput.writeLong(1234); indexOutput.writeLong(0); indexOutput.writeBytes(new byte[0], 0); indexOutput.close(); @@ -77,6 +78,7 @@ public void testReadContentWithSegmentInfos() throws IOException { Map expectedOutput = getDummyData(); indexOutput.writeMapOfStrings(expectedOutput); indexOutput.writeLong(1234); + indexOutput.writeLong(1234); ByteBuffersIndexOutput segmentInfosOutput = new ByteBuffersIndexOutput(new ByteBuffersDataOutput(), "test", "resource"); segmentInfos.write(segmentInfosOutput); byte[] segmentInfosBytes = segmentInfosOutput.toArrayCopy(); @@ -103,6 +105,7 @@ public void testWriteContent() throws IOException { RemoteSegmentMetadata remoteSegmentMetadata = new RemoteSegmentMetadata( RemoteSegmentMetadata.fromMapOfStrings(expectedOutput), segmentInfosBytes, + 1234, 1234 ); remoteSegmentMetadataHandler.writeContent(indexOutput, remoteSegmentMetadata); @@ -113,6 +116,7 @@ public void testWriteContent() throws IOException { ); assertEquals(expectedOutput, metadata.toMapOfStrings()); assertEquals(1234, metadata.getGeneration()); + assertEquals(1234, metadata.getPrimaryTerm()); assertArrayEquals(segmentInfosBytes, metadata.getSegmentInfosBytes()); } diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 43d371bf5a187..f5295bead19a4 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -58,6 +58,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; @@ -342,6 +343,15 @@ public void restoreShard( } + @Override + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + return null; + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { return null; diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 8e0ee6b16ed48..105ccef500ce8 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -50,7 +50,12 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; @@ -117,11 +122,7 @@ protected void assertSnapshotOrGenericThread() { @Override protected Settings nodeSettings() { - return Settings.builder() - .put(super.nodeSettings()) - .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") - .put(FeatureFlags.REMOTE_STORE, "true") - .build(); + return Settings.builder().put(super.nodeSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); } public void testRetrieveSnapshots() throws Exception { @@ -326,12 +327,89 @@ public void testRetrieveShallowCopySnapshotCase1() throws IOException { final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class); final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(snapshotRepositoryName); - List snapshotIds = OpenSearchBlobStoreRepositoryIntegTestCase.getRepositoryData(repository) - .getSnapshotIds() + RepositoryData repositoryData = OpenSearchBlobStoreRepositoryIntegTestCase.getRepositoryData(repository); + IndexId indexId = repositoryData.resolveIndexId(remoteStoreIndexName); + + List snapshotIds = repositoryData.getSnapshotIds() .stream() .sorted((s1, s2) -> s1.getName().compareTo(s2.getName())) .collect(Collectors.toList()); assertThat(snapshotIds, equalTo(originalSnapshots)); + + // shallow copy shard metadata - getRemoteStoreShallowCopyShardMetadata + RemoteStoreShardShallowCopySnapshot shardShallowCopySnapshot = repository.getRemoteStoreShallowCopyShardMetadata( + snapshotId2, + indexId, + new ShardId(remoteStoreIndexName, indexId.getId(), 0) + ); + assertEquals(shardShallowCopySnapshot.getRemoteStoreRepository(), remoteStoreRepositoryName); + } + + public void testGetRemoteStoreShallowCopyShardMetadata() throws IOException { + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + final Client client = client(); + final String snapshotRepositoryName = "test-repo"; + final String remoteStoreRepositoryName = "test-rs-repo"; + + logger.info("--> creating snapshot repository"); + + Settings snapshotRepoSettings = Settings.builder() + .put(node().settings()) + .put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + .build(); + createRepository(client, snapshotRepositoryName, snapshotRepoSettings); + + logger.info("--> creating remote store repository"); + Settings remoteStoreRepoSettings = Settings.builder() + .put(node().settings()) + .put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + .build(); + createRepository(client, remoteStoreRepositoryName, remoteStoreRepoSettings); + + logger.info("--> creating a remote store enabled index and indexing documents"); + final String remoteStoreIndexName = "test-rs-idx"; + Settings indexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepositoryName); + createIndex(remoteStoreIndexName, indexSettings); + indexDocuments(client, remoteStoreIndexName); + + logger.info("--> create remote index shallow snapshot"); + Settings snapshotRepoSettingsForShallowCopy = Settings.builder() + .put(snapshotRepoSettings) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE) + .build(); + updateRepository(client, snapshotRepositoryName, snapshotRepoSettingsForShallowCopy); + + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-2") + .setWaitForCompletion(true) + .setIndices(remoteStoreIndexName) + .get(); + final SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + String[] lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 1) : "there should be only one lock file, but found " + Arrays.toString(lockFiles); + assert lockFiles[0].endsWith(snapshotId.getUUID() + ".lock"); + + final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(snapshotRepositoryName); + RepositoryData repositoryData = OpenSearchBlobStoreRepositoryIntegTestCase.getRepositoryData(repository); + IndexSettings indexSetting = getIndexSettings(remoteStoreIndexName); + IndexId indexId = repositoryData.resolveIndexId(remoteStoreIndexName); + RemoteStoreShardShallowCopySnapshot shardShallowCopySnapshot = repository.getRemoteStoreShallowCopyShardMetadata( + snapshotId, + indexId, + new ShardId(remoteStoreIndexName, indexSetting.getUUID(), 0) + ); + assertEquals(shardShallowCopySnapshot.getRemoteStoreRepository(), remoteStoreRepositoryName); + assertEquals(shardShallowCopySnapshot.getIndexUUID(), indexSetting.getUUID()); + assertEquals(shardShallowCopySnapshot.getRepositoryBasePath(), ""); + } + + private IndexSettings getIndexSettings(String indexName) { + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService indexService = indicesService.indexService(resolveIndex(indexName)); + return indexService.getIndexSettings(); } // Validate Scenario remoteStoreShallowCopy Snapshot -> remoteStoreShallowCopy Snapshot diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 659f473403ec8..ea9e9342673db 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -67,6 +67,7 @@ import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.io.PathUtils; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -101,6 +102,9 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; @@ -574,7 +578,14 @@ protected IndexShard newShard( RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; if (indexSettings.isRemoteStoreEnabled()) { if (remoteStore == null) { - remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); + Path remoteStorePath; + String remoteStoreRepository = indexSettings.getRemoteStoreRepository(); + if (remoteStoreRepository != null && remoteStoreRepository.endsWith("__test")) { + remoteStorePath = PathUtils.get(remoteStoreRepository.replace("__test", "")); + } else { + remoteStorePath = createTempDir(); + } + remoteStore = createRemoteStore(remoteStorePath, routing, indexMetadata); } remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); } @@ -642,21 +653,30 @@ protected RepositoriesService createRepositoriesService() { protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); + ShardId shardId = shardRouting.shardId(); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = createRemoteSegmentStoreDirectory(shardId, path); + return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); + } - ShardId shardId = new ShardId("index", "_na_", 0); + protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId shardId, Path path) throws IOException { NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(path); ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, null); - return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); + RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( + new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) + ); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException { + return new RemoteDirectory(getBlobContainer(f)); + } + + protected BlobContainer getBlobContainer(Path f) throws IOException { FsBlobStore fsBlobStore = new FsBlobStore(1024, f, false); BlobPath blobPath = new BlobPath(); - BlobContainer fsBlobContainer = new FsBlobContainer(fsBlobStore, blobPath, f); - return new RemoteDirectory(fsBlobContainer); + return new FsBlobContainer(fsBlobStore, blobPath, f); } /** diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index 28660ba834a65..ad515f2405f1d 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -240,15 +240,18 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito final BlobContainer repoRoot = repository.blobContainer(); final Collection snapshotIds = repositoryData.getSnapshotIds(); final List expectedSnapshotUUIDs = snapshotIds.stream().map(SnapshotId::getUUID).collect(Collectors.toList()); - for (String prefix : new String[] { BlobStoreRepository.SNAPSHOT_PREFIX, BlobStoreRepository.METADATA_PREFIX }) { - final Collection foundSnapshotUUIDs = repoRoot.listBlobs() - .keySet() - .stream() - .filter(p -> p.startsWith(prefix)) - .map(p -> p.replace(prefix, "").replace(".dat", "")) - .collect(Collectors.toSet()); - assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY))); + Collection foundSnapshotUUIDs = new HashSet<>(); + for (String prefix : new String[] { BlobStoreRepository.SNAPSHOT_PREFIX, BlobStoreRepository.SHALLOW_SNAPSHOT_PREFIX }) { + foundSnapshotUUIDs.addAll( + repoRoot.listBlobs() + .keySet() + .stream() + .filter(p -> p.startsWith(prefix)) + .map(p -> p.replace(prefix, "").replace(".dat", "")) + .collect(Collectors.toSet()) + ); } + assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY))); final BlobContainer indicesContainer = repository.getBlobContainer().children().get("indices"); final Map indices; @@ -303,10 +306,16 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito .stream() .noneMatch(shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) { final Map shardPathContents = shardContainer.listBlobs(); - assertThat( - shardPathContents, - hasKey(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID())) + + assertTrue( + shardPathContents.containsKey( + String.format(Locale.ROOT, BlobStoreRepository.SHALLOW_SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) + ) + || shardPathContents.containsKey( + String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) + ) ); + assertThat( shardPathContents.keySet() .stream() diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 9933297aa1c96..ddf9f3e96b9b4 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -105,6 +105,7 @@ public abstract class AbstractSnapshotIntegTestCase extends OpenSearchIntegTestCase { + protected final static String TEST_REMOTE_STORE_REPO_SUFFIX = "__rs"; private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-"; // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough @@ -148,14 +149,19 @@ public void verifyNoLeakedListeners() throws Exception { @After public void assertRepoConsistency() { if (skipRepoConsistencyCheckReason == null) { - clusterAdmin().prepareGetRepositories().get().repositories().forEach(repositoryMetadata -> { - final String name = repositoryMetadata.name(); - if (repositoryMetadata.settings().getAsBoolean("readonly", false) == false) { - clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get(); - clusterAdmin().prepareCleanupRepository(name).get(); - } - BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); - }); + clusterAdmin().prepareGetRepositories() + .get() + .repositories() + .stream() + .filter(repositoryMetadata -> !repositoryMetadata.name().endsWith(TEST_REMOTE_STORE_REPO_SUFFIX)) + .forEach(repositoryMetadata -> { + final String name = repositoryMetadata.name(); + if (repositoryMetadata.settings().getAsBoolean("readonly", false) == false) { + clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get(); + clusterAdmin().prepareCleanupRepository(name).get(); + } + BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); + }); } else { logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason); } @@ -367,10 +373,36 @@ protected void createRepository(String repoName, String type, Settings.Builder s assertAcked(clusterAdmin().preparePutRepository(repoName).setType(type).setSettings(settings)); } + protected void updateRepository(String repoName, String type, Settings.Builder settings) { + logger.info("--> updating repository [{}] [{}]", repoName, type); + assertAcked(clusterAdmin().preparePutRepository(repoName).setType(type).setSettings(settings)); + } + protected void createRepository(String repoName, String type, Path location) { createRepository(repoName, type, Settings.builder().put("location", location)); } + protected Settings.Builder getRepositorySettings(Path location, boolean shallowCopyEnabled) { + Settings.Builder settingsBuilder = randomRepositorySettings(); + settingsBuilder.put("location", location); + if (shallowCopyEnabled) { + settingsBuilder.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true); + } + return settingsBuilder; + } + + protected Settings.Builder getRepositorySettings(Path location, String basePath, boolean shallowCopyEnabled) { + Settings.Builder settingsBuilder = randomRepositorySettings(); + settingsBuilder.put("location", location); + if (shallowCopyEnabled) { + settingsBuilder.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true); + } + if (basePath != null) { + settingsBuilder.put("base_path", basePath); + } + return settingsBuilder; + } + protected void createRepository(String repoName, String type) { createRepository(repoName, type, randomRepositorySettings()); }