From 76419594290b060e97cd5eeb39b7f00a1866e020 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sun, 28 Apr 2024 08:54:30 +0000 Subject: [PATCH] Integrate local recovery with remote store seeding during migration (#12922) Signed-off-by: Bhumika Saini (cherry picked from commit a8008e2f37ec7dca061fcb7b5f84c944dd4f1108) Signed-off-by: github-actions[bot] --- .../RemotePrimaryLocalRecoveryIT.java | 179 ++++++++++++++++++ .../org/opensearch/index/IndexService.java | 3 +- .../index/seqno/ReplicationTracker.java | 11 +- .../opensearch/index/shard/IndexShard.java | 5 +- .../opensearch/index/shard/StoreRecovery.java | 8 + .../index/translog/RemoteFsTranslog.java | 19 +- .../index/translog/TranslogConfig.java | 21 +- .../translog/TruncateTranslogAction.java | 3 +- .../index/engine/InternalEngineTests.java | 8 +- .../index/shard/RefreshListenersTests.java | 3 +- .../InternalTranslogManagerTests.java | 14 +- .../index/translog/LocalTranslogTests.java | 8 +- .../index/translog/RemoteFsTranslogTests.java | 20 +- .../translog/TranslogManagerTestCase.java | 9 +- .../index/engine/EngineTestCase.java | 21 +- 15 files changed, 285 insertions(+), 47 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java new file mode 100644 index 0000000000000..024fc68602a19 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java @@ -0,0 +1,179 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.util.FileSystemUtils; +import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.index.translog.RemoteTranslogStats; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase { + String indexName = "idx1"; + int numOfNodes = randomIntBetween(6, 9); + + /** + * Tests local recovery sanity in the happy path flow + */ + public void testLocalRecoveryRollingRestart() throws Exception { + triggerRollingRestartForRemoteMigration(0); + internalCluster().stopAllNodes(); + } + + /** + * Tests local recovery sanity during remote migration with a node restart in between + */ + public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception { + triggerRollingRestartForRemoteMigration(0); + + DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + DiscoveryNode nodeToRestart = (DiscoveryNode) discoveryNodes.getDataNodes().values().toArray()[randomIntBetween(0, numOfNodes - 4)]; + internalCluster().restartNode(nodeToRestart.getName()); + + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap(); + for (Map.Entry entry : shardStatsMap.entrySet()) { + ShardRouting shardRouting = entry.getKey(); + ShardStats shardStats = entry.getValue(); + if (nodeToRestart.equals(shardRouting.currentNodeId())) { + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + + assertBusy(() -> { + String shardPath = getShardLevelBlobPath( + client(), + indexName, + new BlobPath(), + String.valueOf(shardRouting.getId()), + SEGMENTS, + DATA + ).buildAsString(); + Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath); + List segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath)) + .filter(path -> path.getFileName().toString().contains("segments_")) + .map(path -> path.getFileName().toString()) + .collect(Collectors.toList()); + Set expectedUniqueSegmentsNFiles = segmentsNFilesInRepo.stream() + .map(fileName -> fileName.split(SEGMENT_NAME_UUID_SEPARATOR)[0]) + .collect(Collectors.toSet()); + assertEquals( + "Expected no duplicate segments_N files in remote but duplicates were found " + segmentsNFilesInRepo, + expectedUniqueSegmentsNFiles.size(), + segmentsNFilesInRepo.size() + ); + }, 90, TimeUnit.SECONDS); + } + + internalCluster().stopAllNodes(); + } + + /** + * Tests local recovery flow sanity in the happy path flow with replicas in place + */ + public void testLocalRecoveryFlowWithReplicas() throws Exception { + triggerRollingRestartForRemoteMigration(randomIntBetween(1, 2)); + internalCluster().stopAllNodes(); + } + + /** + * Helper method to run a rolling restart for migration to remote backed cluster + */ + private void triggerRollingRestartForRemoteMigration(int replicaCount) throws Exception { + internalCluster().startClusterManagerOnlyNodes(3); + internalCluster().startNodes(numOfNodes - 3); + + // create index + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)) + .build(); + createIndex(indexName, indexSettings); + ensureGreen(indexName); + indexBulk(indexName, randomIntBetween(100, 10000)); + refresh(indexName); + indexBulk(indexName, randomIntBetween(100, 10000)); + + initDocRepToRemoteMigration(); + + // rolling restart + final Settings remoteNodeAttributes = remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + REPOSITORY_2_NAME, + translogRepoPath + ); + internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { + // Update remote attributes + @Override + public Settings onNodeStopped(String nodeName) { + return remoteNodeAttributes; + } + }); + ensureStableCluster(numOfNodes); + ensureGreen(TimeValue.timeValueSeconds(90), indexName); + assertEquals(internalCluster().size(), numOfNodes); + + // Assert on remote uploads + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap(); + DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + shardStatsMap.forEach((shardRouting, shardStats) -> { + if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) { + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + }); + + // Assert on new remote uploads after seeding + indexBulk(indexName, randomIntBetween(100, 10000)); + refresh(indexName); + indexBulk(indexName, randomIntBetween(100, 10000)); + Map newShardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap(); + newShardStatsMap.forEach((shardRouting, shardStats) -> { + if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) { + RemoteSegmentStats prevRemoteSegmentStats = shardStatsMap.get(shardRouting) + .getStats() + .getSegments() + .getRemoteSegmentStats(); + RemoteSegmentStats newRemoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(newRemoteSegmentStats.getTotalUploadTime() > prevRemoteSegmentStats.getTotalUploadTime()); + assertTrue(newRemoteSegmentStats.getUploadBytesSucceeded() > prevRemoteSegmentStats.getUploadBytesSucceeded()); + + RemoteTranslogStats prevRemoteTranslogStats = shardStatsMap.get(shardRouting) + .getStats() + .getTranslog() + .getRemoteTranslogStats(); + RemoteTranslogStats newRemoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats(); + assertTrue(newRemoteTranslogStats.getUploadBytesSucceeded() > prevRemoteTranslogStats.getUploadBytesSucceeded()); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index a7b29314210df..14c6cecb1f847 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -500,13 +500,14 @@ public synchronized IndexShard createShard( if (this.indexSettings.isRemoteStoreEnabled()) { remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); } else { - if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) { + if (sourceNode == null || sourceNode.isRemoteStoreNode() == false) { if (routing.primary() == false) { throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId()); } logger.info("DocRep shard {} is migrating to remote", shardId); seedRemote = true; } + remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory( RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()), this.indexSettings.getUUID(), diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 85b8a447ebb8a..4e818d02fa0a9 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1108,12 +1108,11 @@ private ReplicationGroup calculateReplicationGroup() { } else { newVersion = replicationGroup.getVersion() + 1; } - assert indexSettings.isRemoteTranslogStoreEnabled() - // Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node - || (replicationGroup != null - && replicationGroup.getReplicationTargets() - .stream() - .anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()))) + assert newVersion == 0 || indexSettings.isRemoteTranslogStoreEnabled() + // Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node + || replicationGroup.getReplicationTargets() + .stream() + .anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId())) || checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated) : "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 994eefab7290a..8791f92f0f9ba 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -438,7 +438,7 @@ public IndexShard( logger.debug("state: [CREATED]"); this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); - this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId); + this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId, seedRemote); final String aId = shardRouting.allocationId().getId(); final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id()); this.pendingPrimaryTerm = primaryTerm; @@ -5012,7 +5012,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { shardPath().resolveTranslog(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings, - logger + logger, + shouldSeedRemoteStore() ); } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index e130f50e105d5..8d689e8769728 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -650,6 +650,14 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe indexShard.recoveryState().getIndex().setFileDetailsComplete(); } indexShard.openEngineAndRecoverFromTranslog(); + if (indexShard.shouldSeedRemoteStore()) { + indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> { + logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId()); + indexShard.refresh("remote store migration"); + }); + indexShard.waitForRemoteStoreSync(); + logger.info("Remote Store is now seeded via local recovery for {}", indexShard.shardId()); + } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index da905b9605dfd..bf0c0ede3cbe1 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -120,7 +120,7 @@ public RemoteFsTranslog( remoteStoreSettings ); try { - download(translogTransferManager, location, logger); + download(translogTransferManager, location, logger, config.shouldSeedRemote()); Checkpoint checkpoint = readCheckpoint(location); logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo); this.readers.addAll(recoverFromFiles(checkpoint)); @@ -168,7 +168,8 @@ public static void download( Path location, RemoteStorePathStrategy pathStrategy, RemoteStoreSettings remoteStoreSettings, - Logger logger + Logger logger, + boolean seedRemote ) throws IOException { assert repository instanceof BlobStoreRepository : String.format( Locale.ROOT, @@ -189,11 +190,12 @@ public static void download( pathStrategy, remoteStoreSettings ); - RemoteFsTranslog.download(translogTransferManager, location, logger); + RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote); logger.trace(remoteTranslogTransferTracker.toString()); } - static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { + static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote) + throws IOException { /* In Primary to Primary relocation , there can be concurrent upload and download of translog. While translog files are getting downloaded by new primary, it might hence be deleted by the primary @@ -206,7 +208,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat boolean success = false; long startTimeMs = System.currentTimeMillis(); try { - downloadOnce(translogTransferManager, location, logger); + downloadOnce(translogTransferManager, location, logger, seedRemote); success = true; return; } catch (FileNotFoundException | NoSuchFileException e) { @@ -220,7 +222,8 @@ static void download(TranslogTransferManager translogTransferManager, Path locat throw ex; } - private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { + private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote) + throws IOException { logger.debug("Downloading translog files from remote"); RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker(); long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded(); @@ -262,7 +265,9 @@ private static void downloadOnce(TranslogTransferManager translogTransferManager logger.debug("No translog files found on remote, checking local filesystem for cleanup"); if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) { final Checkpoint checkpoint = readCheckpoint(location); - if (isEmptyTranslog(checkpoint) == false) { + if (seedRemote) { + logger.debug("Remote migration ongoing. Retaining the translog on local, skipping clean-up"); + } else if (isEmptyTranslog(checkpoint) == false) { logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files"); // Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint); diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java b/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java index 2f00773075d41..f720f041b287c 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java @@ -59,6 +59,7 @@ public final class TranslogConfig { private final Path translogPath; private final ByteSizeValue bufferSize; private final String nodeId; + private final boolean seedRemote; /** * Creates a new TranslogConfig instance @@ -66,9 +67,17 @@ public final class TranslogConfig { * @param translogPath the path to use for the transaction log files * @param indexSettings the index settings used to set internal variables * @param bigArrays a bigArrays instance used for temporarily allocating write operations + * @param seedRemote boolean denoting whether remote store needs to be seeded as part of remote migration */ - public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) { - this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId); + public TranslogConfig( + ShardId shardId, + Path translogPath, + IndexSettings indexSettings, + BigArrays bigArrays, + String nodeId, + boolean seedRemote + ) { + this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId, seedRemote); } TranslogConfig( @@ -77,7 +86,8 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet IndexSettings indexSettings, BigArrays bigArrays, ByteSizeValue bufferSize, - String nodeId + String nodeId, + boolean seedRemote ) { this.bufferSize = bufferSize; this.indexSettings = indexSettings; @@ -85,6 +95,7 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet this.translogPath = translogPath; this.bigArrays = bigArrays; this.nodeId = nodeId; + this.seedRemote = seedRemote; } /** @@ -125,4 +136,8 @@ public ByteSizeValue getBufferSize() { public String getNodeId() { return nodeId; } + + public boolean shouldSeedRemote() { + return seedRemote; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java index 25fcdc614172a..cf6c9087777fe 100644 --- a/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java @@ -195,7 +195,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, - "" + "", + false ); long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id()); // We open translog to check for corruption, do not clean anything. diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 0cfa6e0dbe490..a1dbb6a8bf266 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -4049,7 +4049,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final Path badTranslogLog = createTempDir(); final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); Translog translog = new LocalTranslog( - new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED, @@ -4067,7 +4067,8 @@ public void testRecoverFromForeignTranslog() throws IOException { translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE, - "" + "", + false ); EngineConfig brokenConfig = new EngineConfig.Builder().shardId(shardId) @@ -7780,7 +7781,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { createTempDir(), config.getTranslogConfig().getIndexSettings(), config.getTranslogConfig().getBigArrays(), - "" + "", + false ); EngineConfig configWithWarmer = new EngineConfig.Builder().shardId(config.getShardId()) .threadPool(config.getThreadPool()) diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index 87aa1f20c5991..50d2e0e7bc6b9 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -134,7 +134,8 @@ public void setupListeners() throws Exception { createTempDir("translog"), indexSettings, BigArrays.NON_RECYCLING_INSTANCE, - "" + "", + false ); Engine.EventListener eventListener = new Engine.EventListener() { @Override diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index c098d11a3487f..c27d0367abf68 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -38,7 +38,7 @@ public void testRecoveryFromTranslog() throws IOException { LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -68,7 +68,7 @@ public void testRecoveryFromTranslog() throws IOException { translogManager.syncTranslog(); translogManager.close(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -117,7 +117,7 @@ public void testTranslogRollsGeneration() throws IOException { LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -147,7 +147,7 @@ public void testTranslogRollsGeneration() throws IOException { translogManager.syncTranslog(); translogManager.close(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -182,7 +182,7 @@ public void testTrimOperationsFromTranslog() throws IOException { LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -214,7 +214,7 @@ public void testTrimOperationsFromTranslog() throws IOException { translogManager.close(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -253,7 +253,7 @@ public void testTranslogSync() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); AtomicReference translogManagerAtomicReference = new AtomicReference<>(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), diff --git a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java index f725783c50e52..70eb99674cd20 100644 --- a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java @@ -291,7 +291,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting ); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); - return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, ""); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, "", false); } private Location addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { @@ -1457,7 +1457,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { temp.getIndexSettings(), temp.getBigArrays(), new ByteSizeValue(1, ByteSizeUnit.KB), - "" + "", + false ); final Set persistedSeqNos = new HashSet<>(); @@ -1556,7 +1557,8 @@ public void testTranslogWriterFsyncedWithLocalTranslog() throws IOException { temp.getIndexSettings(), temp.getBigArrays(), new ByteSizeValue(1, ByteSizeUnit.KB), - "" + "", + false ); final Set persistedSeqNos = new HashSet<>(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 28979a3dc4f28..6bf35cc1eac9b 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -224,7 +224,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting // To simulate that the node is remote backed Settings nodeSettings = Settings.builder().put("node.attr.remote_store.translog.repository", "my-repo-1").build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings, nodeSettings); - return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, ""); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, "", false); } private BlobStoreRepository createRepository() { @@ -399,7 +399,8 @@ private TranslogConfig getConfig(int gensToKeep) { temp.getIndexSettings(), temp.getBigArrays(), new ByteSizeValue(1, ByteSizeUnit.KB), - "" + "", + false ); return config; } @@ -1561,7 +1562,8 @@ public void testTranslogWriterFsyncDisabledInRemoteFsTranslog() throws IOExcepti temp.getIndexSettings(), temp.getBigArrays(), new ByteSizeValue(1, ByteSizeUnit.KB), - "" + "", + false ); final Set persistedSeqNos = new HashSet<>(); @@ -1692,7 +1694,7 @@ public void testDownloadWithRetries() throws IOException { // Always File not found when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found")); TranslogTransferManager finalMockTransfer = mockTransfer; - assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger)); + assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger, false)); // File not found in first attempt . File found in second attempt. mockTransfer = mock(TranslogTransferManager.class); @@ -1713,7 +1715,7 @@ public void testDownloadWithRetries() throws IOException { }).when(mockTransfer).downloadTranslog(any(), any(), any()); // no exception thrown - RemoteFsTranslog.download(mockTransfer, location, logger); + RemoteFsTranslog.download(mockTransfer, location, logger, false); } // No translog data in local as well as remote, we skip creating empty translog @@ -1726,7 +1728,7 @@ public void testDownloadWithNoTranslogInLocalAndRemote() throws IOException { when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); Path[] filesBeforeDownload = FileSystemUtils.files(location); - RemoteFsTranslog.download(mockTransfer, location, logger); + RemoteFsTranslog.download(mockTransfer, location, logger, false); assertEquals(filesBeforeDownload, FileSystemUtils.files(location)); } @@ -1746,7 +1748,7 @@ public void testDownloadWithTranslogOnlyInLocal() throws IOException { Checkpoint existingCheckpoint = Translog.readCheckpoint(location); TranslogTransferManager finalMockTransfer = mockTransfer; - RemoteFsTranslog.download(finalMockTransfer, location, logger); + RemoteFsTranslog.download(finalMockTransfer, location, logger, false); Path[] filesPostDownload = FileSystemUtils.files(location); assertEquals(2, filesPostDownload.length); @@ -1782,11 +1784,11 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException { TranslogTransferManager finalMockTransfer = mockTransfer; // download first time will ensure creating empty translog - RemoteFsTranslog.download(finalMockTransfer, location, logger); + RemoteFsTranslog.download(finalMockTransfer, location, logger, false); Path[] filesPostFirstDownload = FileSystemUtils.files(location); // download on empty translog should be a no-op - RemoteFsTranslog.download(finalMockTransfer, location, logger); + RemoteFsTranslog.download(finalMockTransfer, location, logger, false); Path[] filesPostSecondDownload = FileSystemUtils.files(location); assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java index e17d2770f014a..a16e607bbf37a 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java @@ -74,7 +74,14 @@ protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOExc } protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""); + TranslogConfig translogConfig = new TranslogConfig( + shardId, + translogPath, + INDEX_SETTINGS, + BigArrays.NON_RECYCLING_INSTANCE, + "", + false + ); String translogUUID = Translog.createEmptyTranslog( translogPath, SequenceNumbers.NO_OPS_PERFORMED, diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 93ae935fba558..cbc3aaba6fb33 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -527,7 +527,14 @@ protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOExc } protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""); + TranslogConfig translogConfig = new TranslogConfig( + shardId, + translogPath, + INDEX_SETTINGS, + BigArrays.NON_RECYCLING_INSTANCE, + "", + false + ); String translogUUID = Translog.createEmptyTranslog( translogPath, SequenceNumbers.NO_OPS_PERFORMED, @@ -877,7 +884,8 @@ public EngineConfig config( translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, - "" + "", + false ); final List extRefreshListenerList = externalRefreshListener == null ? emptyList() @@ -946,7 +954,14 @@ protected EngineConfig config( .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .build() ); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, ""); + TranslogConfig translogConfig = new TranslogConfig( + shardId, + translogPath, + indexSettings, + BigArrays.NON_RECYCLING_INSTANCE, + "", + false + ); return new EngineConfig.Builder().shardId(config.getShardId()) .threadPool(config.getThreadPool()) .indexSettings(indexSettings)