diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index b767ffff05e3a..78441f74f6b4f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -37,6 +37,7 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -59,6 +60,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; @@ -77,7 +79,7 @@ public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return Arrays.asList(MockTransportService.TestPlugin.class, MockFsRepositoryPlugin.class); } @Override @@ -789,4 +791,72 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep docs + moreDocs + uncommittedOps ); } + + // Test local only translog files which are not uploaded to remote store (no metadata present in remote) + // Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE. + public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception { + clusterSettingsSuppliedByTest = true; + + // Overriding settings to use AsyncMultiStreamBlobContainer + Settings settings = Settings.builder() + .put(super.nodeSettings(1)) + .put( + remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + MockFsRepositoryPlugin.TYPE, + REPOSITORY_2_NAME, + translogRepoPath, + MockFsRepositoryPlugin.TYPE + ) + ) + .build(); + + internalCluster().startClusterManagerOnlyNode(settings); + String dataNode = internalCluster().startDataOnlyNode(settings); + + // 1. Create index with 0 replica + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + // 2. Index docs + int searchableDocs = 0; + for (int i = 0; i < randomIntBetween(1, 5); i++) { + indexBulk(INDEX_NAME, 15); + refresh(INDEX_NAME); + searchableDocs += 15; + } + indexBulk(INDEX_NAME, 15); + + assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs); + + // 3. Delete metadata from remote translog + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + + String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA).buildAsString(); + Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath); + + try (Stream files = Files.list(translogMetaDataPath)) { + files.forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + // Ignore + } + }); + } + + internalCluster().restartNode(dataNode); + + ensureGreen(INDEX_NAME); + + assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs); + indexBulk(INDEX_NAME, 15); + refresh(INDEX_NAME); + assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 67799f0465c29..da905b9605dfd 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -21,6 +21,7 @@ import org.opensearch.core.util.FileSystemUtils; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; import org.opensearch.index.translog.transfer.TransferSnapshot; @@ -219,7 +220,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat throw ex; } - static private void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { + private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { logger.debug("Downloading translog files from remote"); RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker(); long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded(); @@ -254,10 +255,32 @@ static private void downloadOnce(TranslogTransferManager translogTransferManager location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())), location.resolve(Translog.CHECKPOINT_FILE_NAME) ); + } else { + // When code flow reaches this block, it means we don't have any translog files uploaded to remote store. + // If local filesystem contains empty translog or no translog, we don't do anything. + // If local filesystem contains non-empty translog, we clean up these files and create empty translog. + logger.debug("No translog files found on remote, checking local filesystem for cleanup"); + if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) { + final Checkpoint checkpoint = readCheckpoint(location); + if (isEmptyTranslog(checkpoint) == false) { + logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files"); + // Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete + Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint); + } else { + logger.debug("Empty translog on local, skipping clean-up"); + } + } } logger.debug("downloadOnce execution completed"); } + private static boolean isEmptyTranslog(Checkpoint checkpoint) { + return checkpoint.generation == checkpoint.minTranslogGeneration + && checkpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED + && checkpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED + && checkpoint.numOps == 0; + } + public static TranslogTransferManager buildTranslogTransferManager( BlobStoreRepository blobStoreRepository, ThreadPool threadPool, diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 7c50ed6ecd58f..c653605f8fa10 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2011,17 +2011,47 @@ public static String createEmptyTranslog( final long primaryTerm, @Nullable final String translogUUID, @Nullable final ChannelFactory factory + ) throws IOException { + return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, translogUUID, factory, 1); + } + + public static String createEmptyTranslog(final Path location, final ShardId shardId, Checkpoint checkpoint) throws IOException { + final Path highestGenTranslogFile = location.resolve(getFilename(checkpoint.generation)); + final TranslogHeader translogHeader; + try (FileChannel channel = FileChannel.open(highestGenTranslogFile, StandardOpenOption.READ)) { + translogHeader = TranslogHeader.read(highestGenTranslogFile, channel); + } + final String translogUUID = translogHeader.getTranslogUUID(); + final long primaryTerm = translogHeader.getPrimaryTerm(); + final ChannelFactory channelFactory = FileChannel::open; + return Translog.createEmptyTranslog( + location, + shardId, + SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm, + translogUUID, + channelFactory, + checkpoint.generation + 1 + ); + } + + public static String createEmptyTranslog( + final Path location, + final ShardId shardId, + final long initialGlobalCheckpoint, + final long primaryTerm, + @Nullable final String translogUUID, + @Nullable final ChannelFactory factory, + final long generation ) throws IOException { IOUtils.rm(location); Files.createDirectories(location); - final long generation = 1L; - final long minTranslogGeneration = 1L; final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open; final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID(); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); final Path translogFile = location.resolve(getFilename(generation)); - final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, generation); Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); final TranslogWriter writer = TranslogWriter.create( @@ -2031,7 +2061,7 @@ public static String createEmptyTranslog( translogFile, channelFactory, EMPTY_TRANSLOG_BUFFER_SIZE, - minTranslogGeneration, + generation, initialGlobalCheckpoint, () -> { throw new UnsupportedOperationException(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 0e3854f65135f..28979a3dc4f28 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -1716,6 +1716,82 @@ public void testDownloadWithRetries() throws IOException { RemoteFsTranslog.download(mockTransfer, location, logger); } + // No translog data in local as well as remote, we skip creating empty translog + public void testDownloadWithNoTranslogInLocalAndRemote() throws IOException { + Path location = createTempDir(); + + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata()).thenReturn(null); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + Path[] filesBeforeDownload = FileSystemUtils.files(location); + RemoteFsTranslog.download(mockTransfer, location, logger); + assertEquals(filesBeforeDownload, FileSystemUtils.files(location)); + } + + // No translog data in remote but non-empty translog is present in local. In this case, we delete all the files + // from local file system and create empty translog + public void testDownloadWithTranslogOnlyInLocal() throws IOException { + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata()).thenReturn(null); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + Path location = createTempDir(); + for (Path file : FileSystemUtils.files(translogDir)) { + Files.copy(file, location.resolve(file.getFileName())); + } + + Checkpoint existingCheckpoint = Translog.readCheckpoint(location); + + TranslogTransferManager finalMockTransfer = mockTransfer; + RemoteFsTranslog.download(finalMockTransfer, location, logger); + + Path[] filesPostDownload = FileSystemUtils.files(location); + assertEquals(2, filesPostDownload.length); + assertTrue( + filesPostDownload[0].getFileName().toString().contains("translog.ckp") + || filesPostDownload[1].getFileName().toString().contains("translog.ckp") + ); + + Checkpoint newEmptyTranslogCheckpoint = Translog.readCheckpoint(location); + // Verify that the new checkpoint points to empty translog + assertTrue( + newEmptyTranslogCheckpoint.generation == newEmptyTranslogCheckpoint.minTranslogGeneration + && newEmptyTranslogCheckpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED + && newEmptyTranslogCheckpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED + && newEmptyTranslogCheckpoint.numOps == 0 + ); + assertTrue(newEmptyTranslogCheckpoint.generation > existingCheckpoint.generation); + assertEquals(newEmptyTranslogCheckpoint.globalCheckpoint, existingCheckpoint.globalCheckpoint); + } + + // No translog data in remote and empty translog in local. We skip creating another empty translog + public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException { + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata()).thenReturn(null); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + Path location = createTempDir(); + for (Path file : FileSystemUtils.files(translogDir)) { + Files.copy(file, location.resolve(file.getFileName())); + } + + TranslogTransferManager finalMockTransfer = mockTransfer; + + // download first time will ensure creating empty translog + RemoteFsTranslog.download(finalMockTransfer, location, logger); + Path[] filesPostFirstDownload = FileSystemUtils.files(location); + + // download on empty translog should be a no-op + RemoteFsTranslog.download(finalMockTransfer, location, logger); + Path[] filesPostSecondDownload = FileSystemUtils.files(location); + + assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload); + } + public class ThrowingBlobRepository extends FsRepository { private final Environment environment;