From edcbdafb20e2c0afae36ca45f4ad66ad8ed12ed4 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Date: Thu, 23 May 2024 15:24:28 +0530 Subject: [PATCH] [Remote Store] Upload translog checkpoint as object metadata to translog (#13637) (#13795) * Upload translog checkpoint as object metadata to translog Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> --- CHANGELOG.md | 1 + .../repositories/s3/S3BlobContainer.java | 6 +- .../repositories/s3/S3BlobStore.java | 5 + .../indices/create/RemoteCloneIndexIT.java | 3 + .../indices/create/RemoteShrinkIndexIT.java | 9 + .../indices/create/RemoteSplitIndexIT.java | 2 + .../remote/RemoteClusterStateServiceIT.java | 6 + .../RemoteMigrationIndexMetadataUpdateIT.java | 1 + .../remotestore/BaseRemoteStoreRestoreIT.java | 7 +- .../remotestore/PrimaryTermValidationIT.java | 3 +- .../RemoteStoreBaseIntegTestCase.java | 47 +++- .../RemoteStoreClusterStateRestoreIT.java | 6 + .../remotestore/RemoteStoreForceMergeIT.java | 5 +- .../opensearch/remotestore/RemoteStoreIT.java | 26 +- .../RemoteStoreRepositoryRegistrationIT.java | 4 +- .../remotestore/RemoteStoreStatsIT.java | 2 +- .../RemoteStoreUploadIndexPathIT.java | 1 + .../MockFsMetadataSupportedBlobContainer.java | 92 +++++++ .../MockFsMetadataSupportedBlobStore.java | 44 +++ .../MockFsMetadataSupportedRepository.java | 51 ++++ ...ckFsMetadataSupportedRepositoryPlugin.java | 38 +++ .../cluster/metadata/IndexMetadata.java | 1 + .../metadata/MetadataCreateIndexService.java | 33 ++- .../allocation/IndexMetadataUpdater.java | 2 +- .../common/blobstore/BlobContainer.java | 6 +- .../common/blobstore/BlobStore.java | 7 + ...sult.java => InputStreamWithMetadata.java} | 12 +- .../common/settings/ClusterSettings.java | 3 +- .../org/opensearch/index/IndexSettings.java | 7 + .../RemoteMigrationIndexMetadataUpdater.java | 22 +- .../RemoteStoreCustomMetadataResolver.java | 74 ++++++ .../RemoteStorePathStrategyResolver.java | 44 --- .../index/remote/RemoteStoreUtils.java | 32 ++- .../opensearch/index/shard/IndexShard.java | 12 +- .../index/translog/RemoteFsTranslog.java | 31 ++- .../transfer/BlobStoreTransferService.java | 41 ++- .../index/translog/transfer/FileSnapshot.java | 10 + .../translog/transfer/TransferService.java | 6 +- .../translog/transfer/TransferSnapshot.java | 7 + .../TranslogCheckpointTransferSnapshot.java | 10 + .../transfer/TranslogTransferManager.java | 104 ++++++-- .../indices/RemoteStoreSettings.java | 24 ++ .../main/java/org/opensearch/node/Node.java | 3 +- .../opensearch/snapshots/RestoreService.java | 2 +- .../MetadataRolloverServiceTests.java | 9 +- .../MetadataCreateIndexServiceTests.java | 62 +++-- .../MetadataIndexTemplateServiceTests.java | 10 +- ...oteMigrationIndexMetadataUpdaterTests.java | 13 +- ...emoteStoreCustomMetadataResolverTests.java | 223 ++++++++++++++++ .../RemoteStorePathStrategyResolverTests.java | 148 ----------- .../index/remote/RemoteStoreUtilsTests.java | 57 ++++ .../BlobStoreTransferServiceTests.java | 40 +++ .../TranslogTransferManagerTests.java | 251 +++++++++++++----- .../indices/cluster/ClusterStateChanges.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../test/OpenSearchIntegTestCase.java | 1 + 56 files changed, 1287 insertions(+), 385 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java rename server/src/main/java/org/opensearch/common/blobstore/{FetchBlobResult.java => InputStreamWithMetadata.java} (74%) create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java delete mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java delete mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyResolverTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index cbbfc18a7c9b2..d52efd755b892 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423)) - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) - Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293)) +- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index acf0c5e83a17b..b489a3cc85037 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -78,7 +78,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; -import org.opensearch.common.blobstore.FetchBlobResult; +import org.opensearch.common.blobstore.InputStreamWithMetadata; import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -143,9 +143,9 @@ public boolean blobExists(String blobName) { @ExperimentalApi @Override - public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException { + public InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException { S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName)); - return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata()); + return new InputStreamWithMetadata(s3RetryingInputStream, s3RetryingInputStream.getMetadata()); } @Override diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index de815f9202f44..f688be9216b8f 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -244,6 +244,11 @@ public Map> extendedStats() { return extendedStats; } + @Override + public boolean isBlobMetadataEnabled() { + return true; + } + public ObjectCannedACL getCannedACL() { return cannedACL; } diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java index 4be049c9a9109..a1122f279c7e4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java @@ -52,6 +52,7 @@ import org.opensearch.index.query.TermsQueryBuilder; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.VersionUtils; import java.util.concurrent.ExecutionException; @@ -60,6 +61,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase { @Override @@ -139,6 +141,7 @@ public void testCreateCloneIndex() { } public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException { + asyncUploadMockFsRepo = false; Version version = VersionUtils.randomIndexCompatibleVersion(random()); int numPrimaryShards = 1; prepareCreate("source").setSettings( diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java index 282eb9c6ad95e..cd19a0ee1ff77 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java @@ -48,7 +48,9 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.VersionUtils; +import org.junit.Before; import java.util.Arrays; import java.util.Map; @@ -61,12 +63,18 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteShrinkIndexIT extends RemoteStoreBaseIntegTestCase { @Override protected boolean forbidPrivateIndexSettings() { return false; } + @Before + public void setup() { + asyncUploadMockFsRepo = false; + } + public Settings indexSettings() { return Settings.builder() .put(super.indexSettings()) @@ -84,6 +92,7 @@ public void testCreateShrinkIndexToN() { int[] shardSplits = randomFrom(possibleShardSplits); assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]); assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]); + internalCluster().ensureAtLeastNumDataNodes(2); prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get(); for (int i = 0; i < 20; i++) { diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java index dd4252d24f314..dc3c8793a93f6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java @@ -67,6 +67,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.VersionUtils; import java.io.IOException; @@ -86,6 +87,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteSplitIndexIT extends RemoteStoreBaseIntegTestCase { @Override diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index dfde1b958882c..07377caaba0bc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -18,6 +18,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; import java.nio.charset.StandardCharsets; import java.util.Base64; @@ -32,6 +33,11 @@ public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { private static String INDEX_NAME = "test-index"; + @Before + public void setup() { + asyncUploadMockFsRepo = false; + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java index 45679598dc551..53f4ef3fe281f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java @@ -512,5 +512,6 @@ private void assertCustomIndexMetadata(String index) { logger.info("---> Asserting custom index metadata"); IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index); assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)); + assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY)); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java index d29dacb001434..280fd13f0fdcf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java @@ -11,15 +11,18 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.common.settings.Settings; import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; -import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { static final String INDEX_NAME = "remote-store-test-idx-1"; static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2"; @@ -39,7 +42,7 @@ public Settings indexSettings(int shards, int replicas) { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); } protected void restore(String... indices) { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java index e14a4062f7775..6b94e638a6876 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java @@ -30,7 +30,6 @@ import org.junit.Before; import java.nio.file.Path; -import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -50,7 +49,7 @@ public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); } @Before diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index d7ad0daa43524..740aee69f7d80 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -36,6 +36,9 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; @@ -48,6 +51,7 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -55,6 +59,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -74,6 +79,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected Path segmentRepoPath; protected Path translogRepoPath; protected boolean clusterSettingsSuppliedByTest = false; + protected boolean asyncUploadMockFsRepo = randomBoolean(); + private boolean metadataSupportedType = randomBoolean(); private final List documentKeys = List.of( randomAlphaOfLength(5), randomAlphaOfLength(5), @@ -129,6 +136,19 @@ protected Map indexData(int numberOfIterations, boolean invokeFlus return indexingStats; } + @Override + protected Collection> nodePlugins() { + if (!clusterSettingsSuppliedByTest && asyncUploadMockFsRepo) { + if (metadataSupportedType) { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsMetadataSupportedRepositoryPlugin.class)) + .collect(Collectors.toList()); + } else { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList()); + } + } + return super.nodePlugins(); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { if (segmentRepoPath == null || translogRepoPath == null) { @@ -138,10 +158,27 @@ protected Settings nodeSettings(int nodeOrdinal) { if (clusterSettingsSuppliedByTest) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); } else { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) - .build(); + if (asyncUploadMockFsRepo) { + String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : MockFsRepositoryPlugin.TYPE; + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + repoType, + REPOSITORY_2_NAME, + translogRepoPath, + repoType + ) + ) + .build(); + } else { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) + .build(); + } } } @@ -221,6 +258,8 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFiel @After public void teardown() { clusterSettingsSuppliedByTest = false; + asyncUploadMockFsRepo = randomBoolean(); + metadataSupportedType = randomBoolean(); assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME); assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME); clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java index 3f90732f1f13d..8f27c25c56667 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java @@ -24,6 +24,7 @@ import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; import java.io.IOException; import java.nio.file.Files; @@ -47,6 +48,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT { + @Before + public void setup() { + asyncUploadMockFsRepo = false; + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java index 0bcde4b44c734..d957dda1ba04f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java @@ -19,11 +19,12 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -37,7 +38,7 @@ public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); } @Override diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index ca0ae3ca9a700..7721b18a4fe6b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -38,7 +38,6 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; -import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -48,7 +47,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -56,6 +54,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; @@ -81,7 +80,7 @@ public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class, MockFsRepositoryPlugin.class); + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); } @Override @@ -797,25 +796,8 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep // Test local only translog files which are not uploaded to remote store (no metadata present in remote) // Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE. public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception { - clusterSettingsSuppliedByTest = true; - - // Overriding settings to use AsyncMultiStreamBlobContainer - Settings settings = Settings.builder() - .put(super.nodeSettings(1)) - .put( - remoteStoreClusterSettings( - REPOSITORY_NAME, - segmentRepoPath, - MockFsRepositoryPlugin.TYPE, - REPOSITORY_2_NAME, - translogRepoPath, - MockFsRepositoryPlugin.TYPE - ) - ) - .build(); - - internalCluster().startClusterManagerOnlyNode(settings); - String dataNode = internalCluster().startDataOnlyNode(settings); + internalCluster().startClusterManagerOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); // 1. Create index with 0 replica createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java index ef2dcf3217df6..b0827dcfe4892 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java @@ -25,11 +25,11 @@ import org.opensearch.test.transport.MockTransportService; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; @@ -38,7 +38,7 @@ public class RemoteStoreRepositoryRegistrationIT extends RemoteStoreBaseIntegTes @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); } public void testSingleNodeClusterRepositoryRegistration() throws Exception { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index cb5e2e911705b..7715e19ef349d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -54,7 +54,7 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); } public void setup() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java index 9b30dacfced13..44c02dbb6d611 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java @@ -41,6 +41,7 @@ protected Settings nodeSettings(int nodeOrdinal) { * wherever not required. */ public void testRemoteIndexPathFileCreation() throws ExecutionException, InterruptedException, IOException { + asyncUploadMockFsRepo = false; String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNodes(2); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java new file mode 100644 index 0000000000000..109a884ff6c5d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.InputStreamWithMetadata; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.remotestore.multipart.mocks.MockFsAsyncBlobContainer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +public class MockFsMetadataSupportedBlobContainer extends MockFsAsyncBlobContainer { + + private static String CHECKPOINT_FILE_DATA_KEY = "ckp-data"; + + public MockFsMetadataSupportedBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) { + super(blobStore, blobPath, path, triggerDataIntegrityFailure); + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + // If the upload writeContext have a non-null metadata, we store the metadata content as translog.ckp file. + if (writeContext.getMetadata() != null) { + String base64String = writeContext.getMetadata().get(CHECKPOINT_FILE_DATA_KEY); + byte[] decodedBytes = Base64.getDecoder().decode(base64String); + ByteArrayInputStream inputStream = new ByteArrayInputStream(decodedBytes); + int length = decodedBytes.length; + String ckpFileName = getCheckpointFileName(writeContext.getFileName()); + writeBlob(ckpFileName, inputStream, length, true); + } + super.asyncBlobUpload(writeContext, completionListener); + } + + // This is utility to get the translog.ckp file name for a given translog.tlog file. + private String getCheckpointFileName(String translogFileName) { + if (!translogFileName.endsWith(".tlog")) { + throw new IllegalArgumentException("Invalid translog file name format: " + translogFileName); + } + + int dotIndex = translogFileName.lastIndexOf('.'); + String baseName = translogFileName.substring(0, dotIndex); + return baseName + ".ckp"; + } + + public static String convertToBase64(InputStream inputStream) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[128]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + totalBytesRead += bytesRead; + if (totalBytesRead > 1024) { + // We enforce a limit of 1KB on the size of the checkpoint file. + throw new AssertionError("Input stream exceeds 1KB limit"); + } + } + + byte[] bytes = byteArrayOutputStream.toByteArray(); + return Base64.getEncoder().encodeToString(bytes); + } + } + + // during readBlobWithMetadata call we separately download translog.ckp file and return it as metadata. + @Override + public InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException { + String ckpFileName = getCheckpointFileName(blobName); + InputStream inputStream = readBlob(blobName); + try (InputStream ckpInputStream = readBlob(ckpFileName)) { + String ckpString = convertToBase64(ckpInputStream); + Map metadata = new HashMap<>(); + metadata.put(CHECKPOINT_FILE_DATA_KEY, ckpString); + return new InputStreamWithMetadata(inputStream, metadata); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java new file mode 100644 index 0000000000000..89dd91c8222ac --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobStore; + +import java.io.IOException; +import java.nio.file.Path; + +public class MockFsMetadataSupportedBlobStore extends FsBlobStore { + + private final boolean triggerDataIntegrityFailure; + + public MockFsMetadataSupportedBlobStore(int bufferSizeInBytes, Path path, boolean readonly, boolean triggerDataIntegrityFailure) + throws IOException { + super(bufferSizeInBytes, path, readonly); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + try { + return new MockFsMetadataSupportedBlobContainer(this, path, buildAndCreate(path), triggerDataIntegrityFailure); + } catch (IOException ex) { + throw new OpenSearchException("failed to create blob container", ex); + } + } + + // Make MockFs metadata supported + @Override + public boolean isBlobMetadataEnabled() { + return true; + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java new file mode 100644 index 0000000000000..333fba413ce4e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.fs.FsRepository; + +public class MockFsMetadataSupportedRepository extends FsRepository { + + public static Setting TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting( + "mock_fs_repository.trigger_data_integrity_failure", + false + ); + + private final boolean triggerDataIntegrityFailure; + + public MockFsMetadataSupportedRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + triggerDataIntegrityFailure = TRIGGER_DATA_INTEGRITY_FAILURE.get(metadata.settings()); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore(); + return new MockFsMetadataSupportedBlobStore( + fsBlobStore.bufferSizeInBytes(), + fsBlobStore.path(), + isReadOnly(), + triggerDataIntegrityFailure + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java new file mode 100644 index 0000000000000..71ae652a6b23d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.RepositoryPlugin; +import org.opensearch.repositories.Repository; + +import java.util.Collections; +import java.util.Map; + +public class MockFsMetadataSupportedRepositoryPlugin extends Plugin implements RepositoryPlugin { + + public static final String TYPE_MD = "fs_metadata_supported_repository"; + + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + "fs_metadata_supported_repository", + metadata -> new MockFsMetadataSupportedRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) + ); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index b8cb9e684fcec..fdf8c7a74df68 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -636,6 +636,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_SYSTEM = "system"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store"; + public static final String TRANSLOG_METADATA_KEY = "translog_metadata"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index d42d647aad80f..7829d42b803ef 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -90,10 +90,10 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.MapperService.MergeReason; import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.remote.RemoteStoreCustomMetadataResolver; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStorePathStrategy; -import org.opensearch.index.remote.RemoteStorePathStrategyResolver; import org.opensearch.index.shard.IndexSettingProvider; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndexCreationException; @@ -105,6 +105,7 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -178,7 +179,7 @@ public class MetadataCreateIndexService { private AwarenessReplicaBalance awarenessReplicaBalance; @Nullable - private final RemoteStorePathStrategyResolver remoteStorePathStrategyResolver; + private final RemoteStoreCustomMetadataResolver remoteStoreCustomMetadataResolver; public MetadataCreateIndexService( final Settings settings, @@ -194,7 +195,8 @@ public MetadataCreateIndexService( final SystemIndices systemIndices, final boolean forbidPrivateIndexSettings, final AwarenessReplicaBalance awarenessReplicaBalance, - final RemoteStoreSettings remoteStoreSettings + final RemoteStoreSettings remoteStoreSettings, + final Supplier repositoriesServiceSupplier ) { this.settings = settings; this.clusterService = clusterService; @@ -213,8 +215,8 @@ public MetadataCreateIndexService( // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true); Supplier minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion(); - remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings) - ? new RemoteStorePathStrategyResolver(remoteStoreSettings, minNodeVersionSupplier) + remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings) + ? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier, repositoriesServiceSupplier, settings) : null; } @@ -563,7 +565,7 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata( tmpImdBuilder.setRoutingNumShards(routingNumShards); tmpImdBuilder.settings(indexSettings); tmpImdBuilder.system(isSystem); - addRemoteStorePathStrategyInCustomData(tmpImdBuilder, true); + addRemoteStoreCustomMetadata(tmpImdBuilder, true); // Set up everything, now locally create the index to see that things are ok, and apply IndexMetadata tempMetadata = tmpImdBuilder.build(); @@ -573,13 +575,13 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata( } /** - * Adds the remote store path type information in custom data of index metadata. + * Adds the 1) remote store path type 2) ckp as translog metadata information in custom data of index metadata. * * @param tmpImdBuilder index metadata builder. * @param assertNullOldType flag to verify that the old remote store path type is null */ - public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) { - if (remoteStorePathStrategyResolver == null) { + public void addRemoteStoreCustomMetadata(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) { + if (remoteStoreCustomMetadataResolver == null) { return; } // It is possible that remote custom data exists already. In such cases, we need to only update the path type @@ -587,14 +589,21 @@ public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdB Map existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); assert assertNullOldType == false || Objects.isNull(existingCustomData); - // Determine the path type for use using the remoteStorePathResolver. - RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get(); Map remoteCustomData = new HashMap<>(); + + // Determine if the ckp would be stored as translog metadata + boolean isTranslogMetadataEnabled = remoteStoreCustomMetadataResolver.isTranslogMetadataEnabled(); + remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(isTranslogMetadataEnabled)); + + // Determine the path type for use using the remoteStorePathResolver. + RemoteStorePathStrategy newPathStrategy = remoteStoreCustomMetadataResolver.getPathStrategy(); remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name()); if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) { remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name()); } - logger.trace(() -> new ParameterizedMessage("Added newStrategy={}, replaced oldStrategy={}", remoteCustomData, existingCustomData)); + logger.trace( + () -> new ParameterizedMessage("Added newCustomData={}, replaced oldCustomData={}", remoteCustomData, existingCustomData) + ); tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index ddcccd597e894..2431f57a6a1f9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -176,7 +176,7 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable, oldMetadata.settings(), logger ); - migrationImdUpdater.maybeUpdateRemoteStorePathStrategy(indexMetadataBuilder, index.getName()); + migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName()); migrationImdUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, index.getName()); } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 4f5f8d4b1ef5f..a2e4199029ef4 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -80,16 +80,16 @@ public interface BlobContainer { InputStream readBlob(String blobName) throws IOException; /** - * Creates a new {@link FetchBlobResult} for the given blob name. + * Creates a new {@link InputStreamWithMetadata} for the given blob name. * * @param blobName * The name of the blob to get an {@link InputStream} for. - * @return The {@link FetchBlobResult} of the blob. + * @return The {@link InputStreamWithMetadata} of the blob. * @throws NoSuchFileException if the blob does not exist * @throws IOException if the blob can not be read. */ @ExperimentalApi - default FetchBlobResult readBlobWithMetadata(String blobName) throws IOException { + default InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException { throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet"); }; diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java index 8ce8ec8e01abe..406ccc6aa4a18 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java @@ -71,6 +71,13 @@ default Map> extendedStats() { */ default void reload(RepositoryMetadata repositoryMetadata) {} + /** + * Returns a boolean indicating if blobStore has object metadata support enabled + */ + default boolean isBlobMetadataEnabled() { + return false; + } + /** * Metrics for BlobStore interactions */ diff --git a/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java b/server/src/main/java/org/opensearch/common/blobstore/InputStreamWithMetadata.java similarity index 74% rename from server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java rename to server/src/main/java/org/opensearch/common/blobstore/InputStreamWithMetadata.java index 55aca771b586c..aa307e260e033 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java +++ b/server/src/main/java/org/opensearch/common/blobstore/InputStreamWithMetadata.java @@ -10,6 +10,8 @@ import org.opensearch.common.annotation.ExperimentalApi; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -20,7 +22,7 @@ * @opensearch.experimental */ @ExperimentalApi -public class FetchBlobResult { +public class InputStreamWithMetadata implements Closeable { /** * Downloaded blob InputStream @@ -40,9 +42,15 @@ public Map getMetadata() { return metadata; } - public FetchBlobResult(InputStream inputStream, Map metadata) { + public InputStreamWithMetadata(InputStream inputStream, Map metadata) { this.inputStream = inputStream; this.metadata = metadata; } + @Override + public void close() throws IOException { + if (inputStream != null) { + inputStream.close(); + } + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5da15caaa8777..72a7df6ecc189 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -741,7 +741,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, - RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS + RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS, + RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA ) ) ); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 3636521f181d4..a7be456e3996f 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -763,6 +763,7 @@ public static IndexMergePolicy fromString(String text) { private final boolean widenIndexSortType; private final boolean assignedOnRemoteNode; private final RemoteStorePathStrategy remoteStorePathStrategy; + private final boolean isTranslogMetadataEnabled; /** * The maximum age of a retention lease before it is considered expired. @@ -988,6 +989,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings()); remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata); + isTranslogMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(indexMetadata); + setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING)); setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING)); @@ -1911,4 +1914,8 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo public RemoteStorePathStrategy getRemoteStorePathStrategy() { return remoteStorePathStrategy; } + + public boolean isTranslogMetadataEnabled() { + return isTranslogMetadataEnabled; + } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java index 761fa20ea64e5..cc51fcd2f18f6 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java @@ -28,7 +28,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; -import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategyDuringMigration; +import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomMetadataDuringMigration; import static org.opensearch.index.remote.RemoteStoreUtils.getRemoteStoreRepoName; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -118,7 +118,7 @@ private boolean needsRemoteIndexSettingsUpdate( } /** - * Updates the remote store path strategy metadata for the index when it is migrating to remote. + * Updates the remote store custom metadata for the index when it is migrating to remote. * This is run during state change of each shard copy when the cluster is in `MIXED` mode and the direction of migration is `REMOTE_STORE` * Should not interfere with docrep functionality even if the index is in docrep nodes since this metadata * is not used anywhere in the docrep flow @@ -127,20 +127,20 @@ private boolean needsRemoteIndexSettingsUpdate( * @param indexMetadataBuilder Mutated {@link IndexMetadata.Builder} having the previous state updates * @param index index name */ - public void maybeUpdateRemoteStorePathStrategy(IndexMetadata.Builder indexMetadataBuilder, String index) { - if (indexHasRemotePathMetadata(indexMetadata) == false) { - logger.info("Adding remote store path strategy for index [{}] during migration", index); + public void maybeUpdateRemoteStoreCustomMetadata(IndexMetadata.Builder indexMetadataBuilder, String index) { + if (indexHasRemoteCustomMetadata(indexMetadata) == false) { + logger.info("Adding remote store custom data for index [{}] during migration", index); indexMetadataBuilder.putCustom( REMOTE_STORE_CUSTOM_KEY, - determineRemoteStorePathStrategyDuringMigration(clusterSettings, discoveryNodes) + determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes) ); } else { - logger.debug("Index {} already has remote store path strategy", index); + logger.debug("Index {} already has remote store custom data", index); } } public static boolean indexHasAllRemoteStoreRelatedMetadata(IndexMetadata indexMetadata) { - return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemotePathMetadata(indexMetadata); + return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemoteCustomMetadata(indexMetadata); } /** @@ -167,9 +167,11 @@ public static boolean indexHasRemoteStoreSettings(Settings indexSettings) { * @param indexMetadata Current index metadata * @return true if all above conditions match. false otherwise */ - public static boolean indexHasRemotePathMetadata(IndexMetadata indexMetadata) { + public static boolean indexHasRemoteCustomMetadata(IndexMetadata indexMetadata) { Map customMetadata = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY); - return Objects.nonNull(customMetadata) && Objects.nonNull(customMetadata.get(PathType.NAME)); + return Objects.nonNull(customMetadata) + && (Objects.nonNull(customMetadata.get(PathType.NAME)) + || Objects.nonNull(customMetadata.get(IndexMetadata.TRANSLOG_METADATA_KEY))); } public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, String segmentRepository, String translogRepository) { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java new file mode 100644 index 0000000000000..e8a0dda5a699e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.Version; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.util.function.Supplier; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo; + +/** + * Determines the {@link RemoteStorePathStrategy} at the time of index metadata creation. + * + * @opensearch.internal + */ +@ExperimentalApi +public class RemoteStoreCustomMetadataResolver { + + private final RemoteStoreSettings remoteStoreSettings; + private final Supplier minNodeVersionSupplier; + private final Supplier repositoriesServiceSupplier; + private final Settings settings; + + public RemoteStoreCustomMetadataResolver( + RemoteStoreSettings remoteStoreSettings, + Supplier minNodeVersionSupplier, + Supplier repositoriesServiceSupplier, + Settings settings + ) { + this.remoteStoreSettings = remoteStoreSettings; + this.minNodeVersionSupplier = minNodeVersionSupplier; + this.repositoriesServiceSupplier = repositoriesServiceSupplier; + this.settings = settings; + } + + public RemoteStorePathStrategy getPathStrategy() { + PathType pathType; + PathHashAlgorithm pathHashAlgorithm; + // Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it. + pathType = Version.V_2_14_0.compareTo(minNodeVersionSupplier.get()) <= 0 ? remoteStoreSettings.getPathType() : PathType.FIXED; + // If the path type is fixed, hash algorithm is not applicable. + pathHashAlgorithm = pathType == PathType.FIXED ? null : remoteStoreSettings.getPathHashAlgorithm(); + return new RemoteStorePathStrategy(pathType, pathHashAlgorithm); + } + + public boolean isTranslogMetadataEnabled() { + Repository repository; + try { + repository = repositoriesServiceSupplier.get().repository(getRemoteStoreTranslogRepo(settings)); + } catch (RepositoryMissingException ex) { + throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex); + } + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + return Version.V_2_15_0.compareTo(minNodeVersionSupplier.get()) <= 0 + && remoteStoreSettings.isTranslogMetadataEnabled() + && blobStoreRepository.blobStore().isBlobMetadataEnabled(); + } + +} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java deleted file mode 100644 index 178de406ed681..0000000000000 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.remote; - -import org.opensearch.Version; -import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; -import org.opensearch.index.remote.RemoteStoreEnums.PathType; -import org.opensearch.indices.RemoteStoreSettings; - -import java.util.function.Supplier; - -/** - * Determines the {@link RemoteStorePathStrategy} at the time of index metadata creation. - * - * @opensearch.internal - */ -@ExperimentalApi -public class RemoteStorePathStrategyResolver { - - private final RemoteStoreSettings remoteStoreSettings; - private final Supplier minNodeVersionSupplier; - - public RemoteStorePathStrategyResolver(RemoteStoreSettings remoteStoreSettings, Supplier minNodeVersionSupplier) { - this.remoteStoreSettings = remoteStoreSettings; - this.minNodeVersionSupplier = minNodeVersionSupplier; - } - - public RemoteStorePathStrategy get() { - PathType pathType; - PathHashAlgorithm pathHashAlgorithm; - // Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it. - pathType = Version.V_2_14_0.compareTo(minNodeVersionSupplier.get()) <= 0 ? remoteStoreSettings.getPathType() : PathType.FIXED; - // If the path type is fixed, hash algorithm is not applicable. - pathHashAlgorithm = pathType == PathType.FIXED ? null : remoteStoreSettings.getPathHashAlgorithm(); - return new RemoteStorePathStrategy(pathType, pathHashAlgorithm); - } -} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 27b1b88034573..9a9de6c819424 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -32,6 +32,7 @@ import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA; /** * Utils for remote store @@ -181,25 +182,50 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta return new RemoteStorePathStrategy(RemoteStoreEnums.PathType.FIXED); } + /** + * Determines if translog file object metadata can be used to store checkpoint file data. + */ + public static boolean determineTranslogMetadataEnabled(IndexMetadata indexMetadata) { + Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + assert remoteCustomData == null || remoteCustomData.containsKey(IndexMetadata.TRANSLOG_METADATA_KEY); + if (remoteCustomData != null && remoteCustomData.containsKey(IndexMetadata.TRANSLOG_METADATA_KEY)) { + return Boolean.parseBoolean(remoteCustomData.get(IndexMetadata.TRANSLOG_METADATA_KEY)); + } + return false; + } + /** * Generates the remote store path type information to be added to custom data of index metadata during migration * * @param clusterSettings Current Cluster settings from {@link ClusterState} - * @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state + * @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state * @return {@link Map} to be added as custom data in index metadata */ - public static Map determineRemoteStorePathStrategyDuringMigration( + public static Map determineRemoteStoreCustomMetadataDuringMigration( Settings clusterSettings, DiscoveryNodes discoveryNodes ) { + Map remoteCustomData = new HashMap<>(); Version minNodeVersion = discoveryNodes.getMinNodeVersion(); + + // TODO: During the document replication to a remote store migration, there should be a check to determine if the registered + // translog blobstore supports custom metadata or not. + // Currently, the blobStoreMetadataEnabled flag is set to false because the integration tests run on the local file system, which + // does not support custom metadata. + // https://github.com/opensearch-project/OpenSearch/issues/13745 + boolean blobStoreMetadataEnabled = false; + boolean translogMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0 + && CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.get(clusterSettings) + && blobStoreMetadataEnabled; + + remoteCustomData.put(IndexMetadata.TRANSLOG_METADATA_KEY, Boolean.toString(translogMetadata)); + RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0 ? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings) : RemoteStoreEnums.PathType.FIXED; RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm = pathType == RemoteStoreEnums.PathType.FIXED ? null : CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.get(clusterSettings); - Map remoteCustomData = new HashMap<>(); remoteCustomData.put(RemoteStoreEnums.PathType.NAME, pathType.name()); if (Objects.nonNull(pathHashAlgorithm)) { remoteCustomData.put(RemoteStoreEnums.PathHashAlgorithm.NAME, pathHashAlgorithm.name()); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 81663016409e4..fbec820882a51 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4989,7 +4989,14 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException { TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); - RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings); + RemoteFsTranslog.cleanup( + repository, + shardId, + getThreadPool(), + indexSettings.getRemoteStorePathStrategy(), + remoteStoreSettings, + indexSettings().isTranslogMetadataEnabled() + ); } /* @@ -5014,7 +5021,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings, logger, - shouldSeedRemoteStore() + shouldSeedRemoteStore(), + indexSettings().isTranslogMetadataEnabled() ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index c7f756957076c..67549c86b7dd2 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -91,6 +91,7 @@ public class RemoteFsTranslog extends Translog { private static final int SYNC_PERMIT = 1; private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); private final AtomicBoolean pauseSync = new AtomicBoolean(false); + private final boolean isTranslogMetadataEnabled; public RemoteFsTranslog( TranslogConfig config, @@ -110,6 +111,7 @@ public RemoteFsTranslog( this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + isTranslogMetadataEnabled = indexSettings().isTranslogMetadataEnabled(); this.translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, @@ -117,7 +119,8 @@ public RemoteFsTranslog( fileTransferTracker, remoteTranslogTransferTracker, indexSettings().getRemoteStorePathStrategy(), - remoteStoreSettings + remoteStoreSettings, + isTranslogMetadataEnabled ); try { download(translogTransferManager, location, logger, config.shouldSeedRemote()); @@ -169,7 +172,8 @@ public static void download( RemoteStorePathStrategy pathStrategy, RemoteStoreSettings remoteStoreSettings, Logger logger, - boolean seedRemote + boolean seedRemote, + boolean isTranslogMetadataEnabled ) throws IOException { assert repository instanceof BlobStoreRepository : String.format( Locale.ROOT, @@ -188,7 +192,8 @@ public static void download( fileTransferTracker, remoteTranslogTransferTracker, pathStrategy, - remoteStoreSettings + remoteStoreSettings, + isTranslogMetadataEnabled ); RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote); logger.trace(remoteTranslogTransferTracker.toString()); @@ -293,7 +298,8 @@ public static TranslogTransferManager buildTranslogTransferManager( FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker tracker, RemoteStorePathStrategy pathStrategy, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + boolean isTranslogMetadataEnabled ) { assert Objects.nonNull(pathStrategy); String indexUUID = shardId.getIndex().getUUID(); @@ -315,7 +321,16 @@ public static TranslogTransferManager buildTranslogTransferManager( .build(); BlobPath mdPath = pathStrategy.generatePath(mdPathInput); BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool); - return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings); + return new TranslogTransferManager( + shardId, + transferService, + dataPath, + mdPath, + fileTransferTracker, + tracker, + remoteStoreSettings, + isTranslogMetadataEnabled + ); } @Override @@ -592,7 +607,8 @@ public static void cleanup( ShardId shardId, ThreadPool threadPool, RemoteStorePathStrategy pathStrategy, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + boolean isTranslogMetadataEnabled ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; @@ -607,7 +623,8 @@ public static void cleanup( fileTransferTracker, remoteTranslogTransferTracker, pathStrategy, - remoteStoreSettings + remoteStoreSettings, + isTranslogMetadataEnabled ); // clean up all remote translog files translogTransferManager.deleteTranslogFiles(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index bec2d78d9af62..704f6419da60a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -18,7 +18,7 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.blobstore.FetchBlobResult; +import org.opensearch.common.blobstore.InputStreamWithMetadata; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; @@ -28,16 +28,20 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; +import java.util.Base64; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; +import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; /** * Service that handles remote transfer of translog and checkpoint files @@ -104,6 +108,30 @@ public void uploadBlobs( } + // Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file. + static Map buildTransferFileMetadata(InputStream metadataInputStream) throws IOException { + Map metadata = new HashMap<>(); + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[128]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = metadataInputStream.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + totalBytesRead += bytesRead; + if (totalBytesRead > 1024) { + // We enforce a limit of 1KB on the size of the checkpoint file. + throw new IOException("Input stream exceeds 1KB limit"); + } + } + + byte[] bytes = byteArrayOutputStream.toByteArray(); + String metadataString = Base64.getEncoder().encodeToString(bytes); + metadata.put(CHECKPOINT_FILE_DATA_KEY, metadataString); + } + return metadata; + } + private void uploadBlob( TransferFileSnapshot fileSnapshot, ActionListener listener, @@ -113,6 +141,11 @@ private void uploadBlob( try { ChannelFactory channelFactory = FileChannel::open; + Map metadata = null; + if (fileSnapshot.getMetadataFileInputStream() != null) { + metadata = buildTransferFileMetadata(fileSnapshot.getMetadataFileInputStream()); + } + long contentLength; try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { contentLength = channel.size(); @@ -130,7 +163,8 @@ private void uploadBlob( writePriority, (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), Objects.requireNonNull(fileSnapshot.getChecksum()), - remoteIntegrityEnabled + remoteIntegrityEnabled, + metadata ); ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); @@ -168,7 +202,8 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I @Override @ExperimentalApi - public FetchBlobResult downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + public InputStreamWithMetadata downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + assert blobStore.isBlobMetadataEnabled(); return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index dcec94edd694f..86f042af0584b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -108,6 +108,8 @@ public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; private Long checksum; + @Nullable + private InputStream metadataFileInputStream; public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { super(path); @@ -128,6 +130,14 @@ public long getPrimaryTerm() { return primaryTerm; } + public void setMetadataFileInputStream(InputStream inputStream) { + this.metadataFileInputStream = inputStream; + } + + public InputStream getMetadataFileInputStream() { + return metadataFileInputStream; + } + @Override public int hashCode() { return Objects.hash(primaryTerm, super.hashCode()); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 0894ebf500ebd..0ff983739438b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -11,7 +11,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.FetchBlobResult; +import org.opensearch.common.blobstore.InputStreamWithMetadata; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -131,11 +131,11 @@ void uploadBlobs( * * @param path the remote path from where download should be made * @param fileName the name of the file - * @return {@link FetchBlobResult} of the remote file + * @return {@link InputStreamWithMetadata} of the remote file * @throws IOException the exception while reading the data */ @ExperimentalApi - FetchBlobResult downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; + InputStreamWithMetadata downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java index ef34fd31a296b..6dcdc8f8cf44a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -12,6 +12,7 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import java.io.IOException; import java.util.Set; /** @@ -39,4 +40,10 @@ public interface TransferSnapshot { * @return the translog transfer metadata */ TranslogTransferMetadata getTranslogTransferMetadata(); + + /** + * The snapshot of the translog generational files having checkpoint file inputStream as metadata + * @return the set of translog files having checkpoint file inputStream as metadata. + */ + Set getTranslogFileSnapshotWithMetadata() throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index fb78731246a07..ae007c0c33e1e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -64,6 +64,16 @@ public Set getTranslogFileSnapshots() { return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); } + @Override + public Set getTranslogFileSnapshotWithMetadata() throws IOException { + for (Tuple tuple : translogCheckpointFileInfoTupleSet) { + TransferFileSnapshot translogFileSnapshot = tuple.v1(); + TransferFileSnapshot checkpointFileSnapshot = tuple.v2(); + translogFileSnapshot.setMetadataFileInputStream(checkpointFileSnapshot.inputStream()); + } + return getTranslogFileSnapshots(); + } + @Override public TranslogTransferMetadata getTranslogTransferMetadata() { return new TranslogTransferMetadata( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 47638f44fd6fc..1cc39cdf442e2 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -16,6 +16,7 @@ import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.InputStreamWithMetadata; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -36,6 +37,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -63,6 +65,9 @@ public class TranslogTransferManager { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private final RemoteStoreSettings remoteStoreSettings; private static final int METADATA_FILES_TO_FETCH = 10; + // Flag to include checkpoint file data as translog file metadata during upload/download + private final boolean isTranslogMetadataEnabled; + final static String CHECKPOINT_FILE_DATA_KEY = "ckp-data"; private final Logger logger; @@ -79,7 +84,8 @@ public TranslogTransferManager( BlobPath remoteMetadataTransferPath, FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker remoteTranslogTransferTracker, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + boolean isTranslogMetadataEnabled ) { this.shardId = shardId; this.transferService = transferService; @@ -89,6 +95,7 @@ public TranslogTransferManager( this.logger = Loggers.getLogger(getClass(), shardId); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; this.remoteStoreSettings = remoteStoreSettings; + this.isTranslogMetadataEnabled = isTranslogMetadataEnabled; } public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() { @@ -110,8 +117,12 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); try { - toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); - toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + if (isTranslogMetadataEnabled) { + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshotWithMetadata())); + } else { + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); + toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + } if (toUpload.isEmpty()) { logger.trace("Nothing to upload for transfer"); return true; @@ -236,30 +247,78 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca generation, location ); - // Download Checkpoint file from remote to local FS String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); - // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + if (isTranslogMetadataEnabled == false) { + // Download Checkpoint file, translog file from remote to local FS + downloadToFS(ckpFileName, location, primaryTerm, false); + downloadToFS(translogFilename, location, primaryTerm, false); + } else { + // Download translog.tlog file with object metadata from remote to local FS + Map metadata = downloadToFS(translogFilename, location, primaryTerm, true); + try { + assert metadata != null && !metadata.isEmpty() && metadata.containsKey(CHECKPOINT_FILE_DATA_KEY); + recoverCkpFileUsingMetadata(metadata, location, generation, translogFilename); + } catch (Exception e) { + throw new IOException("Failed to recover checkpoint file from remote", e); + } + } return true; } - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { + /** + * Process the provided metadata and tries to recover translog.ckp file to the FS. + */ + private void recoverCkpFileUsingMetadata(Map metadata, Path location, String generation, String fileName) + throws IOException { + + String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + Path filePath = location.resolve(ckpFileName); + // Here, we always override the existing file if present. + deleteFileIfExists(filePath); + + String ckpDataBase64 = metadata.get(CHECKPOINT_FILE_DATA_KEY); + if (ckpDataBase64 == null) { + logger.error("Error processing metadata for translog file: {}", fileName); + throw new IllegalStateException( + "Checkpoint file data key " + CHECKPOINT_FILE_DATA_KEY + " is expected but not found in metadata for file: " + fileName + ); + } + byte[] ckpFileBytes = Base64.getDecoder().decode(ckpDataBase64); + Files.write(filePath, ckpFileBytes); + } + + private Map downloadToFS(String fileName, Path location, String primaryTerm, boolean withMetadata) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download - if (Files.exists(filePath)) { - Files.delete(filePath); - } + deleteFileIfExists(filePath); + Map metadata = null; boolean downloadStatus = false; long bytesToRead = 0, downloadStartTime = System.nanoTime(); - try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { - // Capture number of bytes for stats before reading - bytesToRead = inputStream.available(); - Files.copy(inputStream, filePath); - downloadStatus = true; + try { + if (withMetadata) { + try ( + InputStreamWithMetadata inputStreamWithMetadata = transferService.downloadBlobWithMetadata( + remoteDataTransferPath.add(primaryTerm), + fileName + ) + ) { + InputStream inputStream = inputStreamWithMetadata.getInputStream(); + metadata = inputStreamWithMetadata.getMetadata(); + + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + } + } else { + try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + } + } } finally { remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); if (downloadStatus) { @@ -269,6 +328,13 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync fileTransferTracker.add(fileName, true); + return metadata; + } + + private void deleteFileIfExists(Path filePath) throws IOException { + if (Files.exists(filePath)) { + Files.delete(filePath); + } } public TranslogTransferMetadata readMetadata() throws IOException { @@ -391,7 +457,11 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna // Add .ckp and .tlog file to translog file list which is located in basePath/ String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFileName = Translog.getFilename(generation); - translogFiles.addAll(List.of(ckpFileName, translogFileName)); + if (isTranslogMetadataEnabled == false) { + translogFiles.addAll(List.of(ckpFileName, translogFileName)); + } else { + translogFiles.add(translogFileName); + } }); // Delete the translog and checkpoint files asynchronously deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion); diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 2e45888e099e7..836af2a14637f 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -80,6 +80,18 @@ public class RemoteStoreSettings { Property.Dynamic ); + /** + * This setting is used to disable uploading translog.ckp file as metadata to translog.tlog. This setting is effective only for + * repositories that supports metadata read and write with metadata and is applicable for only remote store enabled clusters. + */ + @ExperimentalApi + public static final Setting CLUSTER_REMOTE_STORE_TRANSLOG_METADATA = Setting.boolSetting( + "cluster.remote_store.index.translog.translog_metadata", + true, + Property.NodeScope, + Property.Dynamic + ); + /** * This setting is used to set the remote store blob store path hash algorithm strategy. This setting is effective only for * remote store enabled cluster. This setting will come to effect if the {@link #CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING} @@ -111,6 +123,7 @@ public class RemoteStoreSettings { private volatile RemoteStoreEnums.PathType pathType; private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm; private volatile int maxRemoteTranslogReaders; + private volatile boolean isTranslogMetadataEnabled; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -134,6 +147,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { pathType = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, this::setPathType); + isTranslogMetadataEnabled = clusterSettings.get(CLUSTER_REMOTE_STORE_TRANSLOG_METADATA); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_TRANSLOG_METADATA, this::setTranslogMetadataEnabled); + pathHashAlgorithm = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setPathHashAlgorithm); @@ -180,6 +196,14 @@ private void setPathType(RemoteStoreEnums.PathType pathType) { this.pathType = pathType; } + private void setTranslogMetadataEnabled(boolean isTranslogMetadataEnabled) { + this.isTranslogMetadataEnabled = isTranslogMetadataEnabled; + } + + public boolean isTranslogMetadataEnabled() { + return isTranslogMetadataEnabled; + } + private void setPathHashAlgorithm(RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm) { this.pathHashAlgorithm = pathHashAlgorithm; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6f90ebd8ad188..4a702ec0234ef 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -888,7 +888,8 @@ protected Node( systemIndices, forbidPrivateIndexSettings, awarenessReplicaBalance, - remoteStoreSettings + remoteStoreSettings, + repositoriesServiceReference::get ); pluginsService.filterPlugins(Plugin.class) .forEach( diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 0ac28ff4e6d8e..6884cbdf753ae 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -484,7 +484,7 @@ public ClusterState execute(ClusterState currentState) { .put(snapshotIndexMetadata.getSettings()) .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) ); - createIndexService.addRemoteStorePathStrategyInCustomData(indexMdBuilder, false); + createIndexService.addRemoteStoreCustomMetadata(indexMdBuilder, false); shardLimitValidator.validateShardLimit( renamedIndexName, snapshotIndexMetadata.getSettings(), diff --git a/server/src/test/java/org/opensearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/opensearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 2fefbed6b2b64..88dac571ba861 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -740,7 +740,8 @@ public void testRolloverClusterState() throws Exception { systemIndices, false, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + null ); MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService( clusterService, @@ -879,7 +880,8 @@ public void testRolloverClusterStateForDataStream() throws Exception { systemIndices, false, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + null ); MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService( clusterService, @@ -1058,7 +1060,8 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception { new SystemIndices(emptyMap()), false, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + null ); MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService( clusterService, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index ac8fcfa487b22..de1d422d6016b 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -57,6 +57,7 @@ import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; @@ -88,6 +89,8 @@ import org.opensearch.indices.SystemIndices; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -117,6 +120,7 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -157,6 +161,7 @@ import static org.opensearch.node.Node.NODE_ATTRIBUTES; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.hamcrest.Matchers.containsString; @@ -177,6 +182,9 @@ public class MetadataCreateIndexServiceTests extends OpenSearchTestCase { private CreateIndexClusterStateUpdateRequest request; private QueryShardContext queryShardContext; private ClusterSettings clusterSettings; + private IndicesService indicesServices; + private RepositoriesService repositoriesService; + private Supplier repositoriesServiceSupplier; private static final String segmentRepositoryNameAttributeKey = NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; private static final String translogRepositoryNameAttributeKey = NODE_ATTRIBUTES.getKey() @@ -189,6 +197,9 @@ public class MetadataCreateIndexServiceTests extends OpenSearchTestCase { public void setup() throws Exception { super.setUp(); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + indicesServices = mock(IndicesService.class); + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); } @Before @@ -703,7 +714,7 @@ public void testValidateIndexName() throws Exception { MetadataCreateIndexService checkerService = new MetadataCreateIndexService( Settings.EMPTY, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -714,7 +725,8 @@ public void testValidateIndexName() throws Exception { new SystemIndices(Collections.emptyMap()), false, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); validateIndexName( checkerService, @@ -790,7 +802,7 @@ public void testValidateDotIndex() { MetadataCreateIndexService checkerService = new MetadataCreateIndexService( Settings.EMPTY, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -801,7 +813,8 @@ public void testValidateDotIndex() { new SystemIndices(Collections.singletonMap("foo", systemIndexDescriptors)), false, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); // Check deprecations assertFalse(checkerService.validateDotIndex(".test2", false)); @@ -1216,7 +1229,7 @@ public void testvalidateIndexSettings() { MetadataCreateIndexService checkerService = new MetadataCreateIndexService( settings, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -1227,7 +1240,8 @@ public void testvalidateIndexSettings() { new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(settings, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); List validationErrors = checkerService.getIndexSettingsValidationErrors(settings, false, Optional.empty()); @@ -1336,7 +1350,7 @@ public void testClusterForceReplicationTypeInValidateIndexSettings() { final MetadataCreateIndexService checkerService = new MetadataCreateIndexService( forceClusterSettingEnabled, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -1347,7 +1361,8 @@ public void testClusterForceReplicationTypeInValidateIndexSettings() { new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(forceClusterSettingEnabled, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); // Use DOCUMENT replication type setting for index creation final Settings indexSettings = Settings.builder().put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build(); @@ -1462,7 +1477,7 @@ public void testRemoteStoreDisabledByUserIndexSettings() { MetadataCreateIndexService checkerService = new MetadataCreateIndexService( Settings.EMPTY, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -1473,7 +1488,8 @@ public void testRemoteStoreDisabledByUserIndexSettings() { new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); final List validationErrors = checkerService.getIndexSettingsValidationErrors( @@ -1497,7 +1513,7 @@ public void testRemoteStoreOverrideSegmentRepoIndexSettings() { MetadataCreateIndexService checkerService = new MetadataCreateIndexService( Settings.EMPTY, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -1508,7 +1524,8 @@ public void testRemoteStoreOverrideSegmentRepoIndexSettings() { new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); final List validationErrors = checkerService.getIndexSettingsValidationErrors( @@ -1537,7 +1554,7 @@ public void testRemoteStoreOverrideTranslogRepoIndexSettings() { MetadataCreateIndexService checkerService = new MetadataCreateIndexService( Settings.EMPTY, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -1548,7 +1565,8 @@ public void testRemoteStoreOverrideTranslogRepoIndexSettings() { new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); final List validationErrors = checkerService.getIndexSettingsValidationErrors( @@ -1755,10 +1773,16 @@ private IndexMetadata testRemoteCustomData(boolean remoteStoreEnabled, PathType RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); ThreadPool threadPool = new TestThreadPool(getTestName()); + BlobStoreRepository repositoryMock = mock(BlobStoreRepository.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + when(repositoriesService.repository(getRemoteStoreTranslogRepo(settings))).thenReturn(repositoryMock); + BlobStore blobStoreMock = mock(BlobStore.class); + when(repositoryMock.blobStore()).thenReturn(blobStoreMock); + when(blobStoreMock.isBlobMetadataEnabled()).thenReturn(randomBoolean()); MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService( settings, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -1769,7 +1793,8 @@ private IndexMetadata testRemoteCustomData(boolean remoteStoreEnabled, PathType new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(settings, clusterService.getClusterSettings()), - remoteStoreSettings + remoteStoreSettings, + repositoriesServiceSupplier ); CreateIndexClusterStateUpdateRequest request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); Settings indexSettings = Settings.builder() @@ -1882,7 +1907,7 @@ public void testIndexLifecycleNameSetting() { MetadataCreateIndexService checkerService = new MetadataCreateIndexService( Settings.EMPTY, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), @@ -1893,7 +1918,8 @@ public void testIndexLifecycleNameSetting() { new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + repositoriesServiceSupplier ); final List validationErrors = checkerService.getIndexSettingsValidationErrors(ilnSetting, true, Optional.empty()); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java index 2cb345144b220..b64cf0ed172af 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java @@ -56,8 +56,10 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.DefaultRemoteStoreSettings; import org.opensearch.indices.IndexTemplateMissingException; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.InvalidIndexTemplateException; import org.opensearch.indices.SystemIndices; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; @@ -2038,10 +2040,13 @@ private static List putTemplate(NamedXContentRegistry xContentRegistr when(clusterService.state()).thenReturn(clusterState); when(clusterService.getSettings()).thenReturn(settings); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + IndicesService indicesServices = mock(IndicesService.class); + RepositoriesService repositoriesService = mock(RepositoriesService.class); + // when(indicesServices.getRepositoriesServiceSupplier()).thenReturn(() -> repositoriesService); MetadataCreateIndexService createIndexService = new MetadataCreateIndexService( Settings.EMPTY, clusterService, - null, + indicesServices, null, null, createTestShardLimitService(randomIntBetween(1, 1000), false), @@ -2052,7 +2057,8 @@ private static List putTemplate(NamedXContentRegistry xContentRegistr new SystemIndices(Collections.emptyMap()), true, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + null ); MetadataIndexTemplateService service = new MetadataIndexTemplateService( clusterService, diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java index d8220c93e4eeb..a2cd3f6b803e0 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java @@ -163,7 +163,7 @@ public void testMaybeUpdateRemoteStorePathStrategyExecutes() { .build(), logger ); - migrationIndexMetadataUpdater.maybeUpdateRemoteStorePathStrategy(builder, indexName); + migrationIndexMetadataUpdater.maybeUpdateRemoteStoreCustomMetadata(builder, indexName); assertCustomPathMetadataIsPresent(builder.build()); } @@ -186,7 +186,7 @@ public void testMaybeUpdateRemoteStorePathStrategyDoesNotExecute() { logger ); - migrationIndexMetadataUpdater.maybeUpdateRemoteStorePathStrategy(builder, indexName); + migrationIndexMetadataUpdater.maybeUpdateRemoteStoreCustomMetadata(builder, indexName); assertCustomPathMetadataIsPresent(builder.build()); } @@ -298,7 +298,14 @@ public static Metadata createIndexMetadataWithRemoteStoreSettings(String indexNa ) .putCustom( REMOTE_STORE_CUSTOM_KEY, - Map.of(RemoteStoreEnums.PathType.NAME, "dummy", RemoteStoreEnums.PathHashAlgorithm.NAME, "dummy") + Map.of( + RemoteStoreEnums.PathType.NAME, + "dummy", + RemoteStoreEnums.PathHashAlgorithm.NAME, + "dummy", + IndexMetadata.TRANSLOG_METADATA_KEY, + "dummy" + ) ) .build(); return Metadata.builder().put(indexMetadata).build(); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java new file mode 100644 index 0000000000000..7e702ad3773e8 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java @@ -0,0 +1,223 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.Version; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; + +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteStoreCustomMetadataResolverTests extends OpenSearchTestCase { + + RepositoriesService repositoriesService = mock(RepositoriesService.class); + Settings settings = Settings.EMPTY; + + public void testGetPathStrategyMinVersionOlder() { + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values())).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver( + remoteStoreSettings, + () -> Version.V_2_13_0, + () -> repositoriesService, + settings + ); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); + assertNull(resolver.getPathStrategy().getHashAlgorithm()); + } + + public void testGetPathStrategyMinVersionNewer() { + PathType pathType = randomFrom(PathType.values()); + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), pathType).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver( + remoteStoreSettings, + () -> Version.V_2_14_0, + () -> repositoriesService, + settings + ); + assertEquals(pathType, resolver.getPathStrategy().getType()); + if (pathType.requiresHashAlgorithm()) { + assertNotNull(resolver.getPathStrategy().getHashAlgorithm()); + } else { + assertNull(resolver.getPathStrategy().getHashAlgorithm()); + } + } + + public void testGetPathStrategyStrategy() { + // FIXED type + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver( + remoteStoreSettings, + () -> Version.V_2_14_0, + () -> repositoriesService, + settings + ); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); + + // FIXED type with hash algorithm + settings = Settings.builder() + .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED) + .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), randomFrom(PathHashAlgorithm.values())) + .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0, () -> repositoriesService, settings); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); + + // HASHED_PREFIX type with FNV_1A_COMPOSITE + settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0, () -> repositoriesService, settings); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); + + // HASHED_PREFIX type with FNV_1A_COMPOSITE + settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0, () -> repositoriesService, settings); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); + + // HASHED_PREFIX type with FNV_1A_BASE64 + settings = Settings.builder() + .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX) + .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) + .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0, () -> repositoriesService, settings); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); + + // HASHED_PREFIX type with FNV_1A_BASE64 + settings = Settings.builder() + .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX) + .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) + .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0, () -> repositoriesService, settings); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); + } + + public void testGetPathStrategyStrategyWithDynamicUpdate() { + + // Default value + Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver( + remoteStoreSettings, + () -> Version.V_2_14_0, + () -> repositoriesService, + settings + ); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); + assertNull(resolver.getPathStrategy().getHashAlgorithm()); + + // Set HASHED_PREFIX with default hash algorithm + clusterSettings.applySettings( + Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build() + ); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); + + // Set HASHED_PREFIX with FNV_1A_BASE64 hash algorithm + clusterSettings.applySettings( + Settings.builder() + .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX) + .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) + .build() + ); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); + + // Set HASHED_INFIX with default hash algorithm + clusterSettings.applySettings( + Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_INFIX).build() + ); + assertEquals(PathType.HASHED_INFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); + + // Set HASHED_INFIX with FNV_1A_BASE64 hash algorithm + clusterSettings.applySettings( + Settings.builder() + .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_INFIX) + .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) + .build() + ); + assertEquals(PathType.HASHED_INFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); + } + + public void testTranslogMetadataAllowedTrueWithMinVersionNewer() { + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), true).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + BlobStoreRepository repositoryMock = mock(BlobStoreRepository.class); + when(repositoriesService.repository(getRemoteStoreTranslogRepo(settings))).thenReturn(repositoryMock); + BlobStore blobStoreMock = mock(BlobStore.class); + when(repositoryMock.blobStore()).thenReturn(blobStoreMock); + when(blobStoreMock.isBlobMetadataEnabled()).thenReturn(true); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver( + remoteStoreSettings, + () -> Version.CURRENT, + () -> repositoriesService, + settings + ); + assertTrue(resolver.isTranslogMetadataEnabled()); + } + + public void testTranslogMetadataAllowedFalseWithMinVersionNewer() { + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver( + remoteStoreSettings, + () -> Version.CURRENT, + () -> repositoriesService, + settings + ); + assertFalse(resolver.isTranslogMetadataEnabled()); + } + + public void testTranslogMetadataAllowedMinVersionOlder() { + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean()).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver( + remoteStoreSettings, + () -> Version.V_2_14_0, + () -> repositoriesService, + settings + ); + assertFalse(resolver.isTranslogMetadataEnabled()); + } + +} diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyResolverTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyResolverTests.java deleted file mode 100644 index de61c902bf13e..0000000000000 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyResolverTests.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.remote; - -import org.opensearch.Version; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; -import org.opensearch.index.remote.RemoteStoreEnums.PathType; -import org.opensearch.indices.RemoteStoreSettings; -import org.opensearch.test.OpenSearchTestCase; - -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; - -public class RemoteStorePathStrategyResolverTests extends OpenSearchTestCase { - - public void testGetMinVersionOlder() { - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values())).build(); - ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_13_0); - assertEquals(PathType.FIXED, resolver.get().getType()); - assertNull(resolver.get().getHashAlgorithm()); - } - - public void testGetMinVersionNewer() { - PathType pathType = randomFrom(PathType.values()); - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), pathType).build(); - ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(pathType, resolver.get().getType()); - if (pathType.requiresHashAlgorithm()) { - assertNotNull(resolver.get().getHashAlgorithm()); - } else { - assertNull(resolver.get().getHashAlgorithm()); - } - } - - public void testGetStrategy() { - // FIXED type - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED).build(); - ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.FIXED, resolver.get().getType()); - - // FIXED type with hash algorithm - settings = Settings.builder() - .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED) - .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), randomFrom(PathHashAlgorithm.values())) - .build(); - clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.FIXED, resolver.get().getType()); - - // HASHED_PREFIX type with FNV_1A_COMPOSITE - settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build(); - clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); - - // HASHED_PREFIX type with FNV_1A_COMPOSITE - settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build(); - clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); - - // HASHED_PREFIX type with FNV_1A_BASE64 - settings = Settings.builder() - .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX) - .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) - .build(); - clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); - - // HASHED_PREFIX type with FNV_1A_BASE64 - settings = Settings.builder() - .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX) - .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) - .build(); - clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); - } - - public void testGetStrategyWithDynamicUpdate() { - - // Default value - Settings settings = Settings.builder().build(); - ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.FIXED, resolver.get().getType()); - assertNull(resolver.get().getHashAlgorithm()); - - // Set HASHED_PREFIX with default hash algorithm - clusterSettings.applySettings( - Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build() - ); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); - - // Set HASHED_PREFIX with FNV_1A_BASE64 hash algorithm - clusterSettings.applySettings( - Settings.builder() - .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX) - .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) - .build() - ); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); - - // Set HASHED_INFIX with default hash algorithm - clusterSettings.applySettings( - Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_INFIX).build() - ); - assertEquals(PathType.HASHED_INFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); - - // Set HASHED_INFIX with FNV_1A_BASE64 hash algorithm - clusterSettings.applySettings( - Settings.builder() - .put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_INFIX) - .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) - .build() - ); - assertEquals(PathType.HASHED_INFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); - } -} diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index c1fc0cdaa0d3b..59c3d3dccdd0f 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -8,9 +8,13 @@ package org.opensearch.index.remote; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; @@ -26,7 +30,9 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.opensearch.cluster.metadata.IndexMetadata.REMOTE_STORE_CUSTOM_KEY; import static org.opensearch.index.remote.RemoteStoreUtils.URL_BASE64_CHARSET; +import static org.opensearch.index.remote.RemoteStoreUtils.determineTranslogMetadataEnabled; import static org.opensearch.index.remote.RemoteStoreUtils.longToCompositeBase64AndBinaryEncoding; import static org.opensearch.index.remote.RemoteStoreUtils.longToUrlBase64; import static org.opensearch.index.remote.RemoteStoreUtils.urlBase64ToLong; @@ -40,6 +46,7 @@ public class RemoteStoreUtilsTests extends OpenSearchTestCase { private static Map BASE64_CHARSET_IDX_MAP; + private static String index = "test-index"; static { Map charToIndexMap = new HashMap<>(); @@ -341,4 +348,54 @@ static long compositeUrlBase64BinaryEncodingToLong(String encodedValue) { String binaryString = base64PartBinary + encodedValue.substring(1); return new BigInteger(binaryString, 2).longValue(); } + + public void testDeterdetermineTranslogMetadataEnabledWhenTrue() { + Metadata metadata = createIndexMetadataWithRemoteStoreSettings(index, 1); + IndexMetadata indexMetadata = metadata.index(index); + assertTrue(determineTranslogMetadataEnabled(indexMetadata)); + } + + public void testDeterdetermineTranslogMetadataEnabledWhenFalse() { + Metadata metadata = createIndexMetadataWithRemoteStoreSettings(index, 0); + IndexMetadata indexMetadata = metadata.index(index); + assertFalse(determineTranslogMetadataEnabled(indexMetadata)); + } + + public void testDeterdetermineTranslogMetadataEnabledWhenKeyNotFound() { + Metadata metadata = createIndexMetadataWithRemoteStoreSettings(index, 2); + IndexMetadata indexMetadata = metadata.index(index); + assertThrows(AssertionError.class, () -> determineTranslogMetadataEnabled(indexMetadata)); + } + + private static Metadata createIndexMetadataWithRemoteStoreSettings(String indexName, int option) { + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); + indexMetadata.settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "dummy-tlog-repo") + .put(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.getKey(), "dummy-segment-repo") + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), "SEGMENT") + .build() + ).putCustom(REMOTE_STORE_CUSTOM_KEY, getCustomDataMap(option)).build(); + return Metadata.builder().put(indexMetadata).build(); + } + + private static Map getCustomDataMap(int option) { + if (option > 1) { + return Map.of(); + } + String value = (option == 1) ? "true" : "false"; + return Map.of( + RemoteStoreEnums.PathType.NAME, + "dummy", + RemoteStoreEnums.PathHashAlgorithm.NAME, + "dummy", + IndexMetadata.TRANSLOG_METADATA_KEY, + value + ); + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index e4f5a454b15f6..3ed90c0837663 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -26,14 +26,21 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Base64; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; + public class BlobStoreTransferServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; @@ -144,4 +151,37 @@ private Environment createEnvironment() { .build() ); } + + public void testBuildTransferFileMetadata_EmptyInputStream() throws IOException { + InputStream emptyInputStream = new ByteArrayInputStream(new byte[0]); + Map metadata = BlobStoreTransferService.buildTransferFileMetadata(emptyInputStream); + assertTrue(metadata.containsKey(CHECKPOINT_FILE_DATA_KEY)); + assertEquals("", metadata.get(CHECKPOINT_FILE_DATA_KEY)); + } + + public void testBuildTransferFileMetadata_NonEmptyInputStream() throws IOException { + String inputData = "This is a test input stream."; + InputStream inputStream = new ByteArrayInputStream(inputData.getBytes(StandardCharsets.UTF_8)); + Map metadata = BlobStoreTransferService.buildTransferFileMetadata(inputStream); + assertTrue(metadata.containsKey(CHECKPOINT_FILE_DATA_KEY)); + String expectedBase64String = Base64.getEncoder().encodeToString(inputData.getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedBase64String, metadata.get(CHECKPOINT_FILE_DATA_KEY)); + } + + public void testBuildTransferFileMetadata_InputStreamExceedsLimit() { + byte[] largeData = new byte[1025]; // 1025 bytes, exceeding the 1KB limit + InputStream largeInputStream = new ByteArrayInputStream(largeData); + IOException exception = assertThrows(IOException.class, () -> BlobStoreTransferService.buildTransferFileMetadata(largeInputStream)); + assertEquals(exception.getMessage(), "Input stream exceeds 1KB limit"); + } + + public void testBuildTransferFileMetadata_SmallInputStreamOptimization() throws IOException { + String inputData = "Small input"; + InputStream inputStream = new ByteArrayInputStream(inputData.getBytes(StandardCharsets.UTF_8)); + Map metadata = BlobStoreTransferService.buildTransferFileMetadata(inputStream); + assertTrue(metadata.containsKey(CHECKPOINT_FILE_DATA_KEY)); + String expectedBase64String = Base64.getEncoder().encodeToString(inputData.getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedBase64String, metadata.get(CHECKPOINT_FILE_DATA_KEY)); + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 8b3fc6651a505..c6f9838ad2d52 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -15,6 +15,7 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.InputStreamWithMetadata; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.collect.Tuple; @@ -41,9 +42,12 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -53,15 +57,17 @@ import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY; import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anySet; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -81,6 +87,7 @@ public class TranslogTransferManagerTests extends OpenSearchTestCase { FileTransferTracker tracker; TranslogTransferManager translogTransferManager; long delayForBlobDownload; + boolean isTranslogMetadataEnabled; @Override public void setUp() throws Exception { @@ -97,6 +104,7 @@ public void setUp() throws Exception { tlogBytes = "Hello Translog".getBytes(StandardCharsets.UTF_8); ckpBytes = "Hello Checkpoint".getBytes(StandardCharsets.UTF_8); tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0), remoteTranslogTransferTracker); + isTranslogMetadataEnabled = false; translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -104,7 +112,8 @@ public void setUp() throws Exception { remoteBaseTransferPath.add(METADATA.getName()), tracker, remoteTranslogTransferTracker, - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled ); delayForBlobDownload = 1; @@ -170,7 +179,8 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { remoteBaseTransferPath.add(METADATA.getName()), fileTransferTracker, remoteTranslogTransferTracker, - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -222,7 +232,8 @@ public void testTransferSnapshotOnUploadTimeout() throws Exception { remoteBaseTransferPath.add(METADATA.getName()), fileTransferTracker, remoteTranslogTransferTracker, - remoteStoreSettings + remoteStoreSettings, + isTranslogMetadataEnabled ); SetOnce exception = new SetOnce<>(); translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -265,7 +276,8 @@ public void testTransferSnapshotOnThreadInterrupt() throws Exception { remoteBaseTransferPath.add(METADATA.getName()), fileTransferTracker, remoteTranslogTransferTracker, - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled ); SetOnce exception = new SetOnce<>(); @@ -297,64 +309,66 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { uploadThread.get().interrupt(); } - private TransferSnapshot createTransferSnapshot() { - return new TransferSnapshot() { - @Override - public Set getCheckpointFileSnapshots() { - try { - return Set.of( - new CheckpointFileSnapshot( - primaryTerm, - generation, - minTranslogGeneration, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX), - null - ), - new CheckpointFileSnapshot( - primaryTerm, - generation, - minTranslogGeneration, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX), - null - ) - ); - } catch (IOException e) { - throw new AssertionError("Failed to create temp file", e); + private TransferSnapshot createTransferSnapshot() throws IOException { + try { + CheckpointFileSnapshot checkpointFileSnapshot1 = new CheckpointFileSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX), + null + ); + CheckpointFileSnapshot checkpointFileSnapshot2 = new CheckpointFileSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX), + null + ); + TranslogFileSnapshot translogFileSnapshot1 = new TranslogFileSnapshot( + primaryTerm, + generation, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX), + null + ); + TranslogFileSnapshot translogFileSnapshot2 = new TranslogFileSnapshot( + primaryTerm, + generation - 1, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX), + null + ); + + return new TransferSnapshot() { + @Override + public Set getCheckpointFileSnapshots() { + return Set.of(checkpointFileSnapshot1, checkpointFileSnapshot2); } - } - @Override - public Set getTranslogFileSnapshots() { - try { - return Set.of( - new TranslogFileSnapshot( - primaryTerm, - generation, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX), - null - ), - new TranslogFileSnapshot( - primaryTerm, - generation - 1, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX), - null - ) - ); - } catch (IOException e) { - throw new AssertionError("Failed to create temp file", e); + @Override + public Set getTranslogFileSnapshots() { + return Set.of(translogFileSnapshot1, translogFileSnapshot2); } - } - @Override - public TranslogTransferMetadata getTranslogTransferMetadata() { - return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); - } + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); + } - @Override - public String toString() { - return "test-to-string"; - } - }; + @Override + public Set getTranslogFileSnapshotWithMetadata() throws IOException { + translogFileSnapshot1.setMetadataFileInputStream(checkpointFileSnapshot1.inputStream()); + translogFileSnapshot2.setMetadataFileInputStream(checkpointFileSnapshot2.inputStream()); + return Set.of(translogFileSnapshot1, translogFileSnapshot2); + } + + @Override + public String toString() { + return "test-to-string"; + } + }; + } catch (Exception e) { + throw new IOException("Failed to create transfer snapshot"); + } } public void testReadMetadataNoFile() throws IOException { @@ -502,7 +516,8 @@ public void testDeleteTranslogSuccess() throws Exception { remoteBaseTransferPath.add(METADATA.getName()), tracker, remoteTranslogTransferTracker, - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -567,7 +582,8 @@ public void testDeleteTranslogFailure() throws Exception { remoteBaseTransferPath.add(METADATA.getName()), tracker, remoteTranslogTransferTracker, - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled ); String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; tracker.add(translogFile, true); @@ -624,4 +640,117 @@ public void testMetadataConflict() throws InterruptedException { assertThrows(RuntimeException.class, translogTransferManager::readMetadata); } + + // tests for cases when ckp is stored as translog metadata. + public void testTransferSnapshotWithTranslogMetadata() throws Exception { + AtomicInteger fileTransferSucceeded = new AtomicInteger(); + AtomicInteger fileTransferFailed = new AtomicInteger(); + AtomicInteger translogTransferSucceeded = new AtomicInteger(); + AtomicInteger translogTransferFailed = new AtomicInteger(); + + isTranslogMetadataEnabled = true; + + doNothing().when(transferService) + .uploadBlob( + any(TransferFileSnapshot.class), + Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm))), + any(WritePriority.class) + ); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; + transferFileSnapshots.forEach(transferFileSnapshot -> { + assertNotNull(transferFileSnapshot.getMetadataFileInputStream()); + listener.onResponse(transferFileSnapshot); + }); + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + + FileTransferTracker fileTransferTracker = new FileTransferTracker( + new ShardId("index", "indexUUid", 0), + remoteTranslogTransferTracker + ) { + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + super.onSuccess(fileSnapshot); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + super.onFailure(fileSnapshot, e); + } + + }; + + translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + fileTransferTracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled + ); + + assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) { + translogTransferSucceeded.incrementAndGet(); + } + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + translogTransferFailed.incrementAndGet(); + } + })); + assertEquals(2, fileTransferSucceeded.get()); + assertEquals(0, fileTransferFailed.get()); + assertEquals(1, translogTransferSucceeded.get()); + assertEquals(0, translogTransferFailed.get()); + assertEquals(2, fileTransferTracker.allUploaded().size()); + } + + public void testDownloadTranslogWithMetadata() throws IOException { + isTranslogMetadataEnabled = true; + translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + tracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + isTranslogMetadataEnabled + ); + Path location = createTempDir(); + assertFalse(Files.exists(location.resolve("translog-23.tlog"))); + assertFalse(Files.exists(location.resolve("translog-23.ckp"))); + mockDownloadBlobWithMetadataResponse(); + translogTransferManager.downloadTranslog("12", "23", location); + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); + verify(transferService, times(1)).downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog")); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + assertTlogCkpDownloadStatsWithMetadata(); + } + + private void mockDownloadBlobWithMetadataResponse() throws IOException { + Map metadata = new HashMap<>(); + String ckpDataString = Base64.getEncoder().encodeToString(ckpBytes); + metadata.put(CHECKPOINT_FILE_DATA_KEY, ckpDataString); + when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new InputStreamWithMetadata(new ByteArrayInputStream(tlogBytes), metadata); + }); + } + + private void assertTlogCkpDownloadStatsWithMetadata() { + assertEquals(tlogBytes.length, remoteTranslogTransferTracker.getDownloadBytesSucceeded()); + // Expect delay for both tlog and ckp file + assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= delayForBlobDownload); + } } diff --git a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java index 69282c13a3f37..b6ca632d4b49a 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java @@ -314,7 +314,8 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m systemIndices, true, awarenessReplicaBalance, - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + null ); transportCloseIndexAction = new TransportCloseIndexAction( diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 95a343f3b4025..3d31fa4df2455 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2164,7 +2164,8 @@ public void onFailure(final Exception e) { systemIndices, false, new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()), - DefaultRemoteStoreSettings.INSTANCE + DefaultRemoteStoreSettings.INSTANCE, + null ); actions.put( CreateIndexAction.INSTANCE, diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 5fa23a7b0827a..055bcd159efc3 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2659,6 +2659,7 @@ private static Settings buildRemoteStoreNodeAttributes( .put(segmentRepoSettingsAttributeKeyPrefix + "chunk_size", 200, ByteSizeUnit.BYTES); } settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values())); + settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean()); return settings.build(); }