From 8d6c34bd43a20e034079b98d7d1f59c71228de32 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Tue, 16 Jan 2024 13:12:17 +0530 Subject: [PATCH 1/4] Create a clone of local segements size map used for Remote Segment Stats until sync to remote completes Signed-off-by: bansvaru --- .../remote/RemoteSegmentTransferTracker.java | 3 +- .../shard/RemoteStoreRefreshListener.java | 28 +++++++++++-------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index fe9440813b94f..40b329f52dad4 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -306,7 +306,7 @@ public Map getLatestLocalFileNameLengthMap() { * @param segmentFiles list of local refreshed segment files * @param fileSizeFunction function is used to determine the file size in bytes */ - public void updateLatestLocalFileNameLengthMap( + public Map updateLatestLocalFileNameLengthMap( Collection segmentFiles, CheckedFunction fileSizeFunction ) { @@ -332,6 +332,7 @@ public void updateLatestLocalFileNameLengthMap( // Remove keys from the fileSizeMap that do not exist in the latest segment files latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); computeBytesLag(); + return Collections.unmodifiableMap(latestLocalFileNameLengthMap); } public void addToLatestUploadedFiles(String file) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index d96a7e7c95ecf..c77028cb0d470 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -215,7 +215,7 @@ private boolean syncSegments() { Collection localSegmentsPostRefresh = segmentInfos.files(true); // Create a map of file name to size and update the refresh segment tracker - updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh); CountDownLatch latch = new CountDownLatch(1); ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { @Override @@ -231,6 +231,7 @@ public void onResponse(Void unused) { refreshClockTimeMs, refreshSeqNo, lastRefreshedCheckpoint, + localSegmentsSizeMap, checkpoint ); // At this point since we have uploaded new segments, segment infos and segment metadata file, @@ -251,7 +252,7 @@ public void onFailure(Exception e) { }, latch); // Start the segments files upload - uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener); + uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener); latch.await(); } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); @@ -295,10 +296,11 @@ private void onSuccessfulSegmentsSync( long refreshClockTimeMs, long refreshSeqNo, long lastRefreshedCheckpoint, + Map localFileSizeMap, ReplicationCheckpoint checkpoint ) { // Update latest uploaded segment files name in segment tracker - segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet()); + segmentTracker.setLatestUploadedFiles(localFileSizeMap.keySet()); // Update the remote refresh time and refresh seq no updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures @@ -371,7 +373,11 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se } } - private void uploadNewSegments(Collection localSegmentsPostRefresh, ActionListener listener) { + private void uploadNewSegments( + Collection localSegmentsPostRefresh, + Map localSegmentsSizeMap, + ActionListener listener + ) { Collection filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList()); if (filteredFiles.size() == 0) { logger.debug("No new segments to upload in uploadNewSegments"); @@ -385,7 +391,7 @@ private void uploadNewSegments(Collection localSegmentsPostRefresh, Acti for (String src : filteredFiles) { // Initializing listener here to ensure that the stats increment operations are thread-safe - UploadListener statsListener = createUploadListener(); + UploadListener statsListener = createUploadListener(localSegmentsSizeMap); ActionListener aggregatedListener = ActionListener.wrap(resp -> { statsListener.onSuccess(src); batchUploadListener.onResponse(resp); @@ -445,8 +451,8 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo * * @param segmentFiles list of segment files that are part of the most recent local refresh. */ - private void updateLocalSizeMapAndTracker(Collection segmentFiles) { - segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); + private Map updateLocalSizeMapAndTracker(Collection segmentFiles) { + return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); } private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { @@ -522,21 +528,21 @@ private boolean isLocalOrSnapshotRecovery() { /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events */ - private UploadListener createUploadListener() { + private UploadListener createUploadListener(Map fileSizeMap) { return new UploadListener() { private long uploadStartTime = 0; @Override public void beforeUpload(String file) { // Start tracking the upload bytes started - segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesStarted(fileSizeMap.get(file)); uploadStartTime = System.currentTimeMillis(); } @Override public void onSuccess(String file) { // Track upload success - segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file)); segmentTracker.addToLatestUploadedFiles(file); segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime)); } @@ -544,7 +550,7 @@ public void onSuccess(String file) { @Override public void onFailure(String file) { // Track upload failure - segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesFailed(fileSizeMap.get(file)); segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime)); } }; From c257994380f7492f6ffcb93b2c12abbe7fc2e429 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Wed, 17 Jan 2024 16:46:58 +0530 Subject: [PATCH 2/4] fix bug in clone and update doc string Signed-off-by: bansvaru --- .../index/remote/RemoteSegmentTransferTracker.java | 7 ++++++- .../index/shard/RemoteStoreRefreshListener.java | 8 +++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 40b329f52dad4..92436a09a4e7e 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -301,10 +301,15 @@ public Map getLatestLocalFileNameLengthMap() { } /** - * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files. + * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. + * The method is given a function as an argument which is used for determining the file size (length in bytes). + * This method is also provided the collection of segment files which are the latest refresh local segment files. + * This method also removes the stale segment files from the map that are not part of the input segment files. * * @param segmentFiles list of local refreshed segment files * @param fileSizeFunction function is used to determine the file size in bytes + * + * @return updated map of local segment files and filesize */ public Map updateLatestLocalFileNameLengthMap( Collection segmentFiles, diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index c77028cb0d470..9d1e30f1dbb7a 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -41,6 +41,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -215,7 +216,8 @@ private boolean syncSegments() { Collection localSegmentsPostRefresh = segmentInfos.files(true); // Create a map of file name to size and update the refresh segment tracker - Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); CountDownLatch latch = new CountDownLatch(1); ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { @Override @@ -450,6 +452,8 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo * Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size. * * @param segmentFiles list of segment files that are part of the most recent local refresh. + * + * @return updated map of local segment files and filesize */ private Map updateLocalSizeMapAndTracker(Collection segmentFiles) { return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); @@ -527,6 +531,8 @@ private boolean isLocalOrSnapshotRecovery() { /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events + * + * @param fileSizeMap updated map of current snapshot of local segments to their sizes */ private UploadListener createUploadListener(Map fileSizeMap) { return new UploadListener() { From 78751610cc9e78aa9fb88eac5c3d2af3a3f01def Mon Sep 17 00:00:00 2001 From: bansvaru Date: Wed, 17 Jan 2024 17:22:25 +0530 Subject: [PATCH 3/4] fix spotless Signed-off-by: bansvaru --- .../opensearch/index/shard/RemoteStoreRefreshListener.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 9d1e30f1dbb7a..cb6d81a17222e 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -41,7 +41,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -216,7 +215,8 @@ private boolean syncSegments() { Collection localSegmentsPostRefresh = segmentInfos.files(true); // Create a map of file name to size and update the refresh segment tracker - Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet().stream() + Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet() + .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); CountDownLatch latch = new CountDownLatch(1); ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { From 4d0de06c774589467d2c44720160237fe6c01c20 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Wed, 31 Jan 2024 16:01:02 +0530 Subject: [PATCH 4/4] Empty Commit for build retry Signed-off-by: bansvaru