Skip to content

Commit

Permalink
[Remote Store] Add check for byte lag in time lag calculation (opense…
Browse files Browse the repository at this point in the history
…arch-project#12565)

Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
ashking94 authored and Aman Khare committed Mar 12, 2024
1 parent e11e8ac commit 303a346
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public long getRefreshSeqNoLag() {
}

public long getTimeMsLag() {
if (remoteRefreshTimeMs == localRefreshTimeMs) {
if (remoteRefreshTimeMs == localRefreshTimeMs || bytesLag == 0) {
return 0;
}
return currentTimeMsUsingSystemNanos() - remoteRefreshStartTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos;
Expand Down Expand Up @@ -149,6 +150,7 @@ public void testComputeTimeLagOnUpdate() throws InterruptedException {
Thread.sleep(1);
transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());

transferTracker.updateLatestLocalFileNameLengthMap(List.of("test"), k -> 1L);
// Sleep for 100ms and then the lag should be within 100ms +/- 20ms
Thread.sleep(100);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - 100) <= 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void testValidateSegmentUploadLag() throws InterruptedException {
while (currentTimeMsUsingSystemNanos() - localRefreshTimeMs <= 20 * avg) {
Thread.sleep((long) (4 * avg));
}
pressureTracker.updateLatestLocalFileNameLengthMap(List.of("test"), k -> 1L);
Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
String regex = "^rejected execution on primary shard:\\[index]\\[0] due to remote segments lagging behind "
+ "local segments.time_lag:[0-9]{2,3} ms dynamic_time_lag_threshold:95\\.0 ms$";
Expand Down

0 comments on commit 303a346

Please sign in to comment.