From c62b81a34e314dc47c58b0ebdf26fdaa1521fd1d Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 18 Aug 2023 17:57:53 +0530 Subject: [PATCH] Skip upload of segments_N file Signed-off-by: Sachin Kale --- .../opensearch/index/shard/IndexShard.java | 88 +++++++------------ .../shard/RemoteStoreRefreshListener.java | 2 +- .../opensearch/index/shard/StoreRecovery.java | 2 +- .../recovery/PeerRecoveryTargetService.java | 2 +- .../RemoteStoreRefreshListenerTests.java | 8 +- 5 files changed, 35 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1d0184de9d93c..090888eff3f6f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -49,8 +49,6 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.BufferedChecksumIndexInput; -import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; @@ -2335,7 +2333,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { - syncSegmentsFromRemoteSegmentStore(false, true); + syncSegmentsFromRemoteSegmentStore(false); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { if (syncFromRemote) { @@ -4596,7 +4594,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false, true); + syncSegmentsFromRemoteSegmentStore(false); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4656,10 +4654,9 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { /** * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store - * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.trace("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); @@ -4667,13 +4664,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re // are uploaded to the remote segment store. RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init(); + assert remoteSegmentMetadata != null : "RemoteSegmentMetadata should not be null"; + Map uploadedSegments = remoteDirectory .getSegmentsUploadedToRemoteStore() .entrySet() .stream() // if this is a refresh level sync, ignore any segments_n uploaded to the store, we will commit the received infos bytes // locally. - .filter(entry -> refreshLevelSegmentSync && entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) + .filter(entry -> entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); store.incRef(); remoteStore.incRef(); @@ -4693,24 +4692,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re storeDirectory = store.directory(); } copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); - - if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { - final SegmentInfos infosSnapshot = store.buildSegmentInfos( - remoteSegmentMetadata.getSegmentInfosBytes(), - remoteSegmentMetadata.getGeneration() - ); - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. - // Extra segments will be wiped on engine open. - for (String file : List.of(store.directory().listAll())) { - if (file.startsWith(IndexFileNames.SEGMENTS)) { - store.deleteQuiet(file); - } - } - assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() - : "There should not be any segments file in the dir"; - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); - } + commitSegmentInfos(remoteSegmentMetadata); } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); } finally { @@ -4740,36 +4722,14 @@ public void syncSegmentsFromGivenRemoteSegmentStore( remoteDirectory.init(); remoteStore.incRef(); } - Map uploadedSegments = sourceRemoteDirectory - .initializeToSpecificCommit(primaryTerm, commitGeneration) - .getMetadata(); + RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificCommit(primaryTerm, commitGeneration); + Map uploadedSegments = remoteSegmentMetadata.getMetadata(); final Directory storeDirectory = store.directory(); store.incRef(); try { - String segmentsNFile = copySegmentFiles( - storeDirectory, - sourceRemoteDirectory, - remoteDirectory, - uploadedSegments, - overrideLocal - ); - if (segmentsNFile != null) { - try ( - ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( - storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT) - ) - ) { - SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration); - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - if (remoteStore != null) { - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); - } else { - store.directory().sync(infosSnapshot.files(true)); - store.directory().syncMetaData(); - } - } - } + copySegmentFiles(storeDirectory, sourceRemoteDirectory, remoteDirectory, uploadedSegments, overrideLocal); + commitSegmentInfos(remoteSegmentMetadata); } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); } finally { @@ -4780,7 +4740,25 @@ public void syncSegmentsFromGivenRemoteSegmentStore( } } - private String copySegmentFiles( + private void commitSegmentInfos(RemoteSegmentMetadata remoteSegmentMetadata) throws IOException { + final SegmentInfos infosSnapshot = store.buildSegmentInfos( + remoteSegmentMetadata.getSegmentInfosBytes(), + remoteSegmentMetadata.getGeneration() + ); + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. + // Extra segments will be wiped on engine open. + for (String file : List.of(store.directory().listAll())) { + if (file.startsWith(IndexFileNames.SEGMENTS)) { + store.deleteQuiet(file); + } + } + assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } + + private void copySegmentFiles( Directory storeDirectory, RemoteSegmentStoreDirectory sourceRemoteDirectory, RemoteSegmentStoreDirectory targetRemoteDirectory, @@ -4789,7 +4767,6 @@ private String copySegmentFiles( ) throws IOException { List downloadedSegments = new ArrayList<>(); List skippedSegments = new ArrayList<>(); - String segmentNFile = null; try { Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); if (overrideLocal) { @@ -4808,16 +4785,11 @@ private String copySegmentFiles( if (targetRemoteDirectory != null) { targetRemoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); } - if (file.startsWith(IndexFileNames.SEGMENTS)) { - assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file"; - segmentNFile = file; - } } } finally { logger.trace("Downloaded segments here: {}", downloadedSegments); logger.trace("Skipped download for segments here: {}", skippedSegments); } - return segmentNFile; } private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 2f0d11fb6a8b3..4bb8ef4a38481 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -221,7 +221,7 @@ private boolean syncSegments() { // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); - Collection localSegmentsPostRefresh = segmentInfos.files(true); + Collection localSegmentsPostRefresh = segmentInfos.files(false); // Create a map of file name to size and update the refresh segment tracker updateLocalSizeMapAndTracker(localSegmentsPostRefresh); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index b565ddd6c819a..3b1e65c4f75a4 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco remoteStore.incRef(); try { // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true, true); + indexShard.syncSegmentsFromRemoteSegmentStore(true); if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 88b4cd063b8a6..df1589b3f29b9 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, true); + indexShard.syncSegmentsFromRemoteSegmentStore(false); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 83b07e986bcc5..9c1bcbeaac1c2 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -423,7 +423,7 @@ private void assertNoLag(RemoteSegmentTransferTracker tracker) { assertEquals(0, tracker.getTimeMsLag()); assertEquals(0, tracker.getRejectionCount()); assertEquals(tracker.getUploadBytesStarted(), tracker.getUploadBytesSucceeded()); - assertTrue(tracker.getUploadBytesStarted() > 0); + assertEquals(0, tracker.getUploadBytesStarted()); assertEquals(0, tracker.getUploadBytesFailed()); assertEquals(0, tracker.getInflightUploads()); assertEquals(tracker.getTotalUploadsStarted(), tracker.getTotalUploadsSucceeded()); @@ -557,16 +557,12 @@ public TestFilterDirectory(Directory in) { private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException { Map uploadedSegments = remoteSegmentStoreDirectory .getSegmentsUploadedToRemoteStore(); - String segmentsNFilename = null; try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); for (String file : segmentInfos.files(true)) { - if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) { + if (RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file) == false && file.startsWith(IndexFileNames.SEGMENTS) == false) { assertTrue(uploadedSegments.containsKey(file)); } - if (file.startsWith(IndexFileNames.SEGMENTS)) { - segmentsNFilename = file; - } } } }