From 88f885ba06993165bbbb8c02692cef761242b8e2 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sat, 28 Aug 2021 20:05:54 +0530 Subject: [PATCH 01/10] Introduce FS Health HEALTHY threshold to fail stuck node Signed-off-by: Bukhtawar Khan --- .../common/settings/ClusterSettings.java | 1 + .../monitor/fs/FsHealthService.java | 26 +++++++- .../monitor/fs/FsHealthServiceTests.java | 62 ++++++++++++++++++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fdd48fe0ee2af..446b3bb0bb59b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -579,6 +579,7 @@ public void apply(Settings value, Settings current, Settings previous) { FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, + FsHealthService.HEALTHY_TIMEOUT_SETTING, TransportMainAction.OVERRIDE_MAIN_RESPONSE_VERSION, IndexingPressure.MAX_INDEXING_BYTES))); diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 5c5108a47e080..4f6400cfe541e 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -57,6 +57,7 @@ import java.nio.file.StandardOpenOption; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -78,6 +79,8 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private final NodeEnvironment nodeEnv; private final LongSupplier currentTimeMillisSupplier; private volatile Scheduler.Cancellable scheduledFuture; + private volatile TimeValue healthyTimeoutThreshold; + private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); @Nullable private volatile Set unhealthyPaths; @@ -85,11 +88,14 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH public static final Setting ENABLED_SETTING = Setting.boolSetting("monitor.fs.health.enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting REFRESH_INTERVAL_SETTING = - Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(120), TimeValue.timeValueMillis(1), + Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); public static final Setting SLOW_PATH_LOGGING_THRESHOLD_SETTING = Setting.timeSetting("monitor.fs.health.slow_path_logging_threshold", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(1), Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting HEALTHY_TIMEOUT_SETTING = + Setting.timeSetting("monitor.fs.health.healthy_timeout_threshold", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), + Setting.Property.NodeScope, Setting.Property.Dynamic); public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { @@ -98,8 +104,10 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings); this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; + this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings); this.nodeEnv = nodeEnv; clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); + clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold); clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); } @@ -126,14 +134,22 @@ public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) { this.slowPathLoggingThreshold = slowPathLoggingThreshold; } + public void setHealthyTimeoutThreshold(TimeValue healthyTimeoutThreshold) { + this.healthyTimeoutThreshold = healthyTimeoutThreshold; + } + @Override public StatusInfo getHealth() { StatusInfo statusInfo; Set unhealthyPaths = this.unhealthyPaths; - if (enabled == false) { + if (enabled == false ) { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); + } else if (lastRunTimeMillis.get() > Long.MIN_VALUE && currentTimeMillisSupplier.getAsLong() - + lastRunTimeMillis.get() > refreshInterval.millis() + healthyTimeoutThreshold.millis()) { + logger.info("healthy threshold breached"); + statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); } else if (unhealthyPaths == null) { statusInfo = new StatusInfo(HEALTHY, "health check passed"); } else { @@ -173,6 +189,7 @@ private void monitorFSHealth() { } catch (IllegalStateException e) { logger.error("health check failed", e); brokenLock = true; + setLastRunTimeMillis(); return; } @@ -203,7 +220,12 @@ private void monitorFSHealth() { } unhealthyPaths = currentUnhealthyPaths; brokenLock = false; + setLastRunTimeMillis(); } } + + private void setLastRunTimeMillis() { + lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); + } } diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index aefdde554b3c7..4cc6ba5b3aba8 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -44,6 +44,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; +import org.opensearch.monitor.StatusInfo; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; @@ -63,6 +64,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; @@ -170,7 +172,7 @@ public void testLoggingOnHungIO() throws Exception { } //disrupt file system - disruptFileSystemProvider.injectIOException.set(true); + disruptFileSystemProvider.injectIODelay.set(true); fsHealthService.new FsHealthMonitor().run(); assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); assertBusy(mockAppender::assertAllExpectationsMatched); @@ -182,6 +184,60 @@ public void testLoggingOnHungIO() throws Exception { } } + public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { + long healthyTimeoutThreshold = randomLongBetween(500, 1000); + long refreshInterval = randomLongBetween(20, 50); + long slowLogThreshold = randomLongBetween(100, 200); + long fsFreezeDelay = randomLongBetween(healthyTimeoutThreshold + 1000 , 2000); + final Settings settings = Settings.builder() + .put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms") + .put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms") + .put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms") + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0)//we need to verify exact time + .build(); + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, + fsFreezeDelay, testThreadPool); + fileSystem = disruptFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try (NodeEnvironment env = newNodeEnvironment()) { + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + logger.info("--> Initial health status prior to the first monitor run"); + StatusInfo fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> First monitor run"); + fsHealthService.new FsHealthMonitor().run(); + fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> Disrupt file system"); + disruptFileSystemProvider.injectIODelay.set(true); + fsHealthService.doStart(); + assertTrue(waitUntil(() -> fsHealthService.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + refreshInterval, + TimeUnit.MILLISECONDS)); + fsHealth = fsHealthService.getHealth(); + assertEquals(UNHEALTHY, fsHealth.getStatus()); + assertEquals("healthy threshold breached", fsHealth.getInfo()); + int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); + assertThat(1, greaterThanOrEqualTo(disruptedPathCount)); + logger.info("--> Fix file system disruption"); + disruptFileSystemProvider.injectIODelay.set(false); + assertTrue(waitUntil(() -> fsHealthService.getHealth().getStatus() == HEALTHY, (2 * fsFreezeDelay) + refreshInterval, + TimeUnit.MILLISECONDS)); + fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); + fsHealthService.doStop(); + } finally { + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { FileSystem fileSystem = PathUtils.getDefaultFileSystem(); FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem); @@ -347,7 +403,7 @@ public void force(boolean metaData) throws IOException { private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider { - AtomicBoolean injectIOException = new AtomicBoolean(); + AtomicBoolean injectIODelay = new AtomicBoolean(); AtomicInteger injectedPaths = new AtomicInteger(); private final long delay; @@ -368,7 +424,7 @@ public FileChannel newFileChannel(Path path, Set options, return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { @Override public void force(boolean metaData) throws IOException { - if (injectIOException.get()) { + if (injectIODelay.get()) { if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { injectedPaths.incrementAndGet(); final long startTimeMillis = threadPool.relativeTimeInMillis(); From d1789a529a5be582aefa60737c6f808370e66242 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 30 Aug 2021 11:19:06 +0530 Subject: [PATCH 02/10] Minor fix up Signed-off-by: Bukhtawar Khan --- .../java/org/opensearch/monitor/fs/FsHealthService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 4f6400cfe541e..c2490c80b477d 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -142,13 +142,12 @@ public void setHealthyTimeoutThreshold(TimeValue healthyTimeoutThreshold) { public StatusInfo getHealth() { StatusInfo statusInfo; Set unhealthyPaths = this.unhealthyPaths; - if (enabled == false ) { + if (enabled == false) { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); } else if (lastRunTimeMillis.get() > Long.MIN_VALUE && currentTimeMillisSupplier.getAsLong() - lastRunTimeMillis.get() > refreshInterval.millis() + healthyTimeoutThreshold.millis()) { - logger.info("healthy threshold breached"); statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); } else if (unhealthyPaths == null) { statusInfo = new StatusInfo(HEALTHY, "health check passed"); @@ -178,6 +177,8 @@ public void run() { } } catch (Exception e) { logger.error("health check failed", e); + } finally { + setLastRunTimeMillis(); } } @@ -189,7 +190,6 @@ private void monitorFSHealth() { } catch (IllegalStateException e) { logger.error("health check failed", e); brokenLock = true; - setLastRunTimeMillis(); return; } @@ -220,7 +220,6 @@ private void monitorFSHealth() { } unhealthyPaths = currentUnhealthyPaths; brokenLock = false; - setLastRunTimeMillis(); } } From c7c909c406d48c5d7de7ba9593377cbab0dbf6e8 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 30 Aug 2021 23:06:43 +0530 Subject: [PATCH 03/10] Review comments Signed-off-by: Bukhtawar Khan --- .../monitor/fs/FsHealthService.java | 5 +-- .../monitor/fs/FsHealthServiceTests.java | 34 ++++++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index c2490c80b477d..edecdc616efb2 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -146,7 +146,7 @@ public StatusInfo getHealth() { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); - } else if (lastRunTimeMillis.get() > Long.MIN_VALUE && currentTimeMillisSupplier.getAsLong() - + } else if (lastRunTimeMillis.get() >= 0 && currentTimeMillisSupplier.getAsLong() - lastRunTimeMillis.get() > refreshInterval.millis() + healthyTimeoutThreshold.millis()) { statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); } else if (unhealthyPaths == null) { @@ -164,8 +164,9 @@ class FsHealthMonitor implements Runnable { static final String TEMP_FILE_NAME = ".opensearch_temp_file"; private byte[] byteToWrite; - FsHealthMonitor(){ + FsHealthMonitor() { this.byteToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8); + setLastRunTimeMillis(); } @Override diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index 4cc6ba5b3aba8..e9c22db589d0a 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -64,7 +64,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; @@ -197,8 +197,7 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { .build(); FileSystem fileSystem = PathUtils.getDefaultFileSystem(); TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); - FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, - fsFreezeDelay, testThreadPool); + FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, testThreadPool); fileSystem = disruptFileSystemProvider.getFileSystem(null); PathUtilsForTesting.installMock(fileSystem); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -215,23 +214,24 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { assertEquals("health check passed", fsHealth.getInfo()); logger.info("--> Disrupt file system"); disruptFileSystemProvider.injectIODelay.set(true); - fsHealthService.doStart(); - assertTrue(waitUntil(() -> fsHealthService.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + refreshInterval, + final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthSrvc.doStart(); + assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + refreshInterval, TimeUnit.MILLISECONDS)); - fsHealth = fsHealthService.getHealth(); + fsHealth = fsHealthSrvc.getHealth(); assertEquals(UNHEALTHY, fsHealth.getStatus()); assertEquals("healthy threshold breached", fsHealth.getInfo()); int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); - assertThat(1, greaterThanOrEqualTo(disruptedPathCount)); + assertThat(disruptedPathCount, equalTo(1)); logger.info("--> Fix file system disruption"); disruptFileSystemProvider.injectIODelay.set(false); - assertTrue(waitUntil(() -> fsHealthService.getHealth().getStatus() == HEALTHY, (2 * fsFreezeDelay) + refreshInterval, + assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, (2 * fsFreezeDelay) + refreshInterval, TimeUnit.MILLISECONDS)); - fsHealth = fsHealthService.getHealth(); + fsHealth = fsHealthSrvc.getHealth(); assertEquals(HEALTHY, fsHealth.getStatus()); assertEquals("health check passed", fsHealth.getInfo()); assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); - fsHealthService.doStop(); + fsHealthSrvc.doStop(); } finally { PathUtilsForTesting.teardown(); ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); @@ -408,6 +408,7 @@ private static class FileSystemFsyncHungProvider extends FilterFileSystemProvide private final long delay; private final ThreadPool threadPool; + private static final long AWAIT_BUSY_THRESHOLD = 100L; FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) { super("disrupt_fs_health://", inner); @@ -415,6 +416,12 @@ private static class FileSystemFsyncHungProvider extends FilterFileSystemProvide this.threadPool = threadPool; } + FileSystemFsyncHungProvider(FileSystem inner, ThreadPool threadPool) { + super("disrupt_fs_health://", inner); + this.threadPool = threadPool; + this.delay = Long.MAX_VALUE; + } + public int getInjectedPathCount(){ return injectedPaths.get(); } @@ -428,13 +435,16 @@ public void force(boolean metaData) throws IOException { if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { injectedPaths.incrementAndGet(); final long startTimeMillis = threadPool.relativeTimeInMillis(); + long timeInMillis = 1; + long maxWaitTimeMillis = startTimeMillis + delay >= 0 ? startTimeMillis + delay : Long.MAX_VALUE;//long overflow do { try { - Thread.sleep(delay); + Thread.sleep(timeInMillis); } catch (InterruptedException e) { throw new AssertionError(e); } - } while (threadPool.relativeTimeInMillis() <= startTimeMillis + delay); + timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2); + } while (threadPool.relativeTimeInMillis() <= maxWaitTimeMillis && injectIODelay.get()); } } super.force(metaData); From bec0b330ba431d41163a20ba4a1b4344add9ddba Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 31 Aug 2021 00:03:48 +0530 Subject: [PATCH 04/10] Review comments Signed-off-by: Bukhtawar Khan --- .../org/opensearch/monitor/fs/FsHealthService.java | 14 ++++++-------- .../monitor/fs/FsHealthServiceTests.java | 4 ++-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index edecdc616efb2..06b40799180e8 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -80,7 +80,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private final LongSupplier currentTimeMillisSupplier; private volatile Scheduler.Cancellable scheduledFuture; private volatile TimeValue healthyTimeoutThreshold; - private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); + private final AtomicLong lastRunStartTimeMillis = new AtomicLong(Long.MIN_VALUE); @Nullable private volatile Set unhealthyPaths; @@ -146,8 +146,8 @@ public StatusInfo getHealth() { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); - } else if (lastRunTimeMillis.get() >= 0 && currentTimeMillisSupplier.getAsLong() - - lastRunTimeMillis.get() > refreshInterval.millis() + healthyTimeoutThreshold.millis()) { + } else if (lastRunStartTimeMillis.get() >= 0 && currentTimeMillisSupplier.getAsLong() - + lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis()) { statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); } else if (unhealthyPaths == null) { statusInfo = new StatusInfo(HEALTHY, "health check passed"); @@ -166,20 +166,18 @@ class FsHealthMonitor implements Runnable { FsHealthMonitor() { this.byteToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8); - setLastRunTimeMillis(); } @Override public void run() { try { if (enabled) { + setLastRunStartTimeMillis(); monitorFSHealth(); logger.debug("health check succeeded"); } } catch (Exception e) { logger.error("health check failed", e); - } finally { - setLastRunTimeMillis(); } } @@ -224,8 +222,8 @@ private void monitorFSHealth() { } } - private void setLastRunTimeMillis() { - lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); + private void setLastRunStartTimeMillis() { + lastRunStartTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); } } diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index e9c22db589d0a..03a06238e89a8 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -188,7 +188,7 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { long healthyTimeoutThreshold = randomLongBetween(500, 1000); long refreshInterval = randomLongBetween(20, 50); long slowLogThreshold = randomLongBetween(100, 200); - long fsFreezeDelay = randomLongBetween(healthyTimeoutThreshold + 1000 , 2000); + long delayBetweenChecks = 100; final Settings settings = Settings.builder() .put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms") .put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms") @@ -225,7 +225,7 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { assertThat(disruptedPathCount, equalTo(1)); logger.info("--> Fix file system disruption"); disruptFileSystemProvider.injectIODelay.set(false); - assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, (2 * fsFreezeDelay) + refreshInterval, + assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, delayBetweenChecks + (2 * refreshInterval), TimeUnit.MILLISECONDS)); fsHealth = fsHealthSrvc.getHealth(); assertEquals(HEALTHY, fsHealth.getStatus()); From fc17516954956a27f273dbc23a0f8b126ef0cea9 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 31 Aug 2021 11:37:29 +0530 Subject: [PATCH 05/10] Increasing refresh interval Signed-off-by: Bukhtawar Khan --- .../java/org/opensearch/monitor/fs/FsHealthServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index 03a06238e89a8..1c8a6a419a750 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -186,7 +186,7 @@ public void testLoggingOnHungIO() throws Exception { public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { long healthyTimeoutThreshold = randomLongBetween(500, 1000); - long refreshInterval = randomLongBetween(20, 50); + long refreshInterval = randomLongBetween(50, 100); long slowLogThreshold = randomLongBetween(100, 200); long delayBetweenChecks = 100; final Settings settings = Settings.builder() From 739a1da9b1f014c595ab9c717126d81e2292cc85 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 31 Aug 2021 22:32:18 +0530 Subject: [PATCH 06/10] Add refresh interval checks Signed-off-by: Bukhtawar Khan --- .../main/java/org/opensearch/monitor/fs/FsHealthService.java | 2 +- .../java/org/opensearch/monitor/fs/FsHealthServiceTests.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 06b40799180e8..1d3b93db156b2 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -147,7 +147,7 @@ public StatusInfo getHealth() { } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); } else if (lastRunStartTimeMillis.get() >= 0 && currentTimeMillisSupplier.getAsLong() - - lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis()) { + lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis() + refreshInterval.millis()) { statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); } else if (unhealthyPaths == null) { statusInfo = new StatusInfo(HEALTHY, "health check passed"); diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index 1c8a6a419a750..b1a960c0e6710 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -186,7 +186,7 @@ public void testLoggingOnHungIO() throws Exception { public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { long healthyTimeoutThreshold = randomLongBetween(500, 1000); - long refreshInterval = randomLongBetween(50, 100); + long refreshInterval = randomLongBetween(500, 1000); long slowLogThreshold = randomLongBetween(100, 200); long delayBetweenChecks = 100; final Settings settings = Settings.builder() @@ -216,7 +216,7 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { disruptFileSystemProvider.injectIODelay.set(true); final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env); fsHealthSrvc.doStart(); - assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + refreshInterval, + assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + (2 *refreshInterval), TimeUnit.MILLISECONDS)); fsHealth = fsHealthSrvc.getHealth(); assertEquals(UNHEALTHY, fsHealth.getStatus()); From 7987ad90594b63b7f57be33046bbaeb8faa4f84c Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Wed, 1 Sep 2021 01:29:26 +0530 Subject: [PATCH 07/10] Enabling check in progress Signed-off-by: Bukhtawar Khan --- .../monitor/fs/FsHealthService.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 1d3b93db156b2..0e827a88cabe9 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -57,6 +57,7 @@ import java.nio.file.StandardOpenOption; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -81,6 +82,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private volatile Scheduler.Cancellable scheduledFuture; private volatile TimeValue healthyTimeoutThreshold; private final AtomicLong lastRunStartTimeMillis = new AtomicLong(Long.MIN_VALUE); + private final AtomicBoolean checkInProgress = new AtomicBoolean(); @Nullable private volatile Set unhealthyPaths; @@ -146,8 +148,8 @@ public StatusInfo getHealth() { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); - } else if (lastRunStartTimeMillis.get() >= 0 && currentTimeMillisSupplier.getAsLong() - - lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis() + refreshInterval.millis()) { + } else if (checkInProgress.get() && currentTimeMillisSupplier.getAsLong() - + lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis()) { statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); } else if (unhealthyPaths == null) { statusInfo = new StatusInfo(HEALTHY, "health check passed"); @@ -170,14 +172,22 @@ class FsHealthMonitor implements Runnable { @Override public void run() { + boolean checkEnabled = enabled; try { - if (enabled) { - setLastRunStartTimeMillis(); - monitorFSHealth(); - logger.debug("health check succeeded"); + if (checkEnabled) { + if (checkInProgress.compareAndSet(false, true)) { + setLastRunStartTimeMillis(); + monitorFSHealth(); + logger.debug("health check succeeded"); + } } } catch (Exception e) { logger.error("health check failed", e); + } finally { + if (checkEnabled) { + boolean completed = checkInProgress.compareAndSet(true, false); + assert completed; + } } } From 4164b107acd6dc399caf1e2c16bab64e2e270323 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Wed, 1 Sep 2021 01:32:34 +0530 Subject: [PATCH 08/10] Minor fix up Signed-off-by: Bukhtawar Khan --- .../org/opensearch/monitor/fs/FsHealthService.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 0e827a88cabe9..834e40022b620 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -174,12 +174,10 @@ class FsHealthMonitor implements Runnable { public void run() { boolean checkEnabled = enabled; try { - if (checkEnabled) { - if (checkInProgress.compareAndSet(false, true)) { - setLastRunStartTimeMillis(); - monitorFSHealth(); - logger.debug("health check succeeded"); - } + if (checkEnabled && checkInProgress.compareAndSet(false, true)) { + setLastRunStartTimeMillis(); + monitorFSHealth(); + logger.debug("health check succeeded"); } } catch (Exception e) { logger.error("health check failed", e); From 53e7b0e236e65cd47dc66c30fb3a93b317c04e1a Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Wed, 1 Sep 2021 09:35:01 +0530 Subject: [PATCH 09/10] Minor fix up Signed-off-by: Bukhtawar Khan --- .../main/java/org/opensearch/monitor/fs/FsHealthService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 834e40022b620..39888e16ac5ff 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -174,8 +174,10 @@ class FsHealthMonitor implements Runnable { public void run() { boolean checkEnabled = enabled; try { - if (checkEnabled && checkInProgress.compareAndSet(false, true)) { + if (checkEnabled) { setLastRunStartTimeMillis(); + boolean started = checkInProgress.compareAndSet(false, true); + assert started; monitorFSHealth(); logger.debug("health check succeeded"); } From 0e4c57afc861adea32d88b63af2630c4996f8e7c Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 5 Sep 2021 15:42:39 +0530 Subject: [PATCH 10/10] Changes to mark node unhealthy if the timeout is breached Signed-off-by: Bukhtawar Khan --- .../java/org/opensearch/monitor/fs/FsHealthService.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 39888e16ac5ff..7bf1b3207aca6 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -218,6 +218,14 @@ private void monitorFSHealth() { logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]", path, elapsedTime, slowPathLoggingThreshold); } + if (elapsedTime > healthyTimeoutThreshold.millis()) { + logger.error("health check of [{}] failed, took [{}ms] which is above the healthy threshold of [{}]", + path, elapsedTime, healthyTimeoutThreshold); + if (currentUnhealthyPaths == null) { + currentUnhealthyPaths = new HashSet<>(1); + } + currentUnhealthyPaths.add(path); + } } } catch (Exception ex) { logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex);