diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java index e4dcd637ac448..97c44280f4429 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java @@ -64,7 +64,6 @@ protected Settings featureFlagSettings() { .build(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191") public void testPrimaryRelocationWhileIndexing() throws Exception { super.testPrimaryRelocationWhileIndexing(); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 43b6f615e7002..c488127857ed5 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -29,8 +29,10 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.HashSet; import java.util.Locale; @@ -68,6 +70,7 @@ public class RemoteFsTranslog extends Translog { private final SetOnce olderPrimaryCleaned = new SetOnce<>(); private static final int REMOTE_DELETION_PERMITS = 2; + private static final int DOWNLOAD_RETRIES = 2; public static final String TRANSLOG = "translog"; // Semaphore used to allow only single remote generation to happen at a time @@ -161,7 +164,28 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t RemoteFsTranslog.download(translogTransferManager, location, logger); } - public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { + static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { + /* + In Primary to Primary relocation , there can be concurrent upload and download of translog. + While translog files are getting downloaded by new primary, it might hence be deleted by the primary + Hence we retry if tlog/ckp files are not found . + + This doesn't happen in last download , where it is ensured that older primary has stopped modifying tlog data. + */ + IOException ex = null; + for (int i = 0; i <= DOWNLOAD_RETRIES; i++) { + try { + downloadOnce(translogTransferManager, location, logger); + return; + } catch (FileNotFoundException | NoSuchFileException e) { + // continue till download retries + ex = e; + } + } + throw ex; + } + + static private void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { logger.trace("Downloading translog files from remote"); RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker(); long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index b69d065f56de0..de1b2990f0a50 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -45,6 +45,8 @@ import org.opensearch.index.seqno.LocalCheckpointTrackerTests; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.index.translog.transfer.TranslogTransferManager; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -59,12 +61,14 @@ import java.io.Closeable; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -98,6 +102,10 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("ExtrasFS") @@ -1466,6 +1474,46 @@ public void testCloseIntoReader() throws IOException { } } + public void testDownloadWithRetries() throws IOException { + long generation = 1, primaryTerm = 1; + Path location = createTempDir(); + TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1); + Map generationToPrimaryTermMapper = new HashMap<>(); + generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm)); + translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + // Always File not found + when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found")); + TranslogTransferManager finalMockTransfer = mockTransfer; + assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger)); + + // File not found in first attempt . File found in second attempt. + mockTransfer = mock(TranslogTransferManager.class); + when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + String msg = "File not found"; + Exception toThrow = randomBoolean() ? new NoSuchFileException(msg) : new FileNotFoundException(msg); + when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(toThrow).thenReturn(true); + + AtomicLong downloadCounter = new AtomicLong(); + doAnswer(invocation -> { + if (downloadCounter.incrementAndGet() <= 1) { + throw new NoSuchFileException("File not found"); + } else if (downloadCounter.get() == 2) { + Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation))); + } + return true; + }).when(mockTransfer).downloadTranslog(any(), any(), any()); + + // no exception thrown + RemoteFsTranslog.download(mockTransfer, location, logger); + } + public class ThrowingBlobRepository extends FsRepository { private final Environment environment;