-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce FS Health HEALTHY threshold to fail stuck node #1167
Changes from 2 commits
88f885b
d1789a5
c7c909c
bec0b33
fc17516
1cdc7aa
739a1da
7987ad9
4164b10
53e7b0e
0e4c57a
a908188
7848646
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,7 @@ | |
import java.nio.file.StandardOpenOption; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.LongSupplier; | ||
import java.util.stream.Collectors; | ||
|
||
|
@@ -78,18 +79,23 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH | |
private final NodeEnvironment nodeEnv; | ||
private final LongSupplier currentTimeMillisSupplier; | ||
private volatile Scheduler.Cancellable scheduledFuture; | ||
private volatile TimeValue healthyTimeoutThreshold; | ||
private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); | ||
|
||
@Nullable | ||
private volatile Set<Path> unhealthyPaths; | ||
|
||
public static final Setting<Boolean> ENABLED_SETTING = | ||
Setting.boolSetting("monitor.fs.health.enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
public static final Setting<TimeValue> REFRESH_INTERVAL_SETTING = | ||
Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(120), TimeValue.timeValueMillis(1), | ||
Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), | ||
Setting.Property.NodeScope); | ||
public static final Setting<TimeValue> SLOW_PATH_LOGGING_THRESHOLD_SETTING = | ||
Setting.timeSetting("monitor.fs.health.slow_path_logging_threshold", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(1), | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
public static final Setting<TimeValue> HEALTHY_TIMEOUT_SETTING = | ||
Setting.timeSetting("monitor.fs.health.healthy_timeout_threshold", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
Comment on lines
+98
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given FS Healthcheck has deeper impacts, having retries of lets say There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would either delay the detection or increases chances of false positives. Maybe we can extend support in future. Not expanding the scope of this PR |
||
|
||
|
||
public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { | ||
|
@@ -98,8 +104,10 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa | |
this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); | ||
this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings); | ||
this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; | ||
this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings); | ||
this.nodeEnv = nodeEnv; | ||
clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); | ||
clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold); | ||
clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); | ||
} | ||
|
||
|
@@ -126,6 +134,10 @@ public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) { | |
this.slowPathLoggingThreshold = slowPathLoggingThreshold; | ||
} | ||
|
||
public void setHealthyTimeoutThreshold(TimeValue healthyTimeoutThreshold) { | ||
this.healthyTimeoutThreshold = healthyTimeoutThreshold; | ||
} | ||
|
||
@Override | ||
public StatusInfo getHealth() { | ||
StatusInfo statusInfo; | ||
|
@@ -134,6 +146,9 @@ public StatusInfo getHealth() { | |
statusInfo = new StatusInfo(HEALTHY, "health check disabled"); | ||
} else if (brokenLock) { | ||
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); | ||
} else if (lastRunTimeMillis.get() > Long.MIN_VALUE && currentTimeMillisSupplier.getAsLong() - | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This won't catch the if the first ever run for health check does not complete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Done! |
||
lastRunTimeMillis.get() > refreshInterval.millis() + healthyTimeoutThreshold.millis()) { | ||
statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we simplify this condition by having current run start time, instead of relying on the last run time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Good point |
||
} else if (unhealthyPaths == null) { | ||
statusInfo = new StatusInfo(HEALTHY, "health check passed"); | ||
} else { | ||
|
@@ -162,6 +177,8 @@ public void run() { | |
} | ||
} catch (Exception e) { | ||
logger.error("health check failed", e); | ||
} finally { | ||
setLastRunTimeMillis(); | ||
} | ||
} | ||
|
||
|
@@ -205,5 +222,9 @@ private void monitorFSHealth() { | |
brokenLock = false; | ||
} | ||
} | ||
|
||
private void setLastRunTimeMillis() { | ||
lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.env.NodeEnvironment; | ||
import org.opensearch.monitor.StatusInfo; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
import org.opensearch.test.MockLogAppender; | ||
import org.opensearch.test.junit.annotations.TestLogging; | ||
|
@@ -63,6 +64,7 @@ | |
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; | ||
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; | ||
import static org.opensearch.node.Node.NODE_NAME_SETTING; | ||
|
@@ -170,7 +172,7 @@ public void testLoggingOnHungIO() throws Exception { | |
} | ||
|
||
//disrupt file system | ||
disruptFileSystemProvider.injectIOException.set(true); | ||
disruptFileSystemProvider.injectIODelay.set(true); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); | ||
assertBusy(mockAppender::assertAllExpectationsMatched); | ||
|
@@ -182,6 +184,60 @@ public void testLoggingOnHungIO() throws Exception { | |
} | ||
} | ||
|
||
public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { | ||
long healthyTimeoutThreshold = randomLongBetween(500, 1000); | ||
long refreshInterval = randomLongBetween(20, 50); | ||
long slowLogThreshold = randomLongBetween(100, 200); | ||
long fsFreezeDelay = randomLongBetween(healthyTimeoutThreshold + 1000 , 2000); | ||
final Settings settings = Settings.builder() | ||
.put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms") | ||
.put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms") | ||
.put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms") | ||
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0)//we need to verify exact time | ||
.build(); | ||
FileSystem fileSystem = PathUtils.getDefaultFileSystem(); | ||
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); | ||
FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, | ||
fsFreezeDelay, testThreadPool); | ||
fileSystem = disruptFileSystemProvider.getFileSystem(null); | ||
PathUtilsForTesting.installMock(fileSystem); | ||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
try (NodeEnvironment env = newNodeEnvironment()) { | ||
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
logger.info("--> Initial health status prior to the first monitor run"); | ||
StatusInfo fsHealth = fsHealthService.getHealth(); | ||
assertEquals(HEALTHY, fsHealth.getStatus()); | ||
assertEquals("health check passed", fsHealth.getInfo()); | ||
logger.info("--> First monitor run"); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
fsHealth = fsHealthService.getHealth(); | ||
assertEquals(HEALTHY, fsHealth.getStatus()); | ||
assertEquals("health check passed", fsHealth.getInfo()); | ||
logger.info("--> Disrupt file system"); | ||
disruptFileSystemProvider.injectIODelay.set(true); | ||
fsHealthService.doStart(); | ||
assertTrue(waitUntil(() -> fsHealthService.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + refreshInterval, | ||
TimeUnit.MILLISECONDS)); | ||
fsHealth = fsHealthService.getHealth(); | ||
assertEquals(UNHEALTHY, fsHealth.getStatus()); | ||
assertEquals("healthy threshold breached", fsHealth.getInfo()); | ||
int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); | ||
assertThat(1, greaterThanOrEqualTo(disruptedPathCount)); | ||
logger.info("--> Fix file system disruption"); | ||
disruptFileSystemProvider.injectIODelay.set(false); | ||
assertTrue(waitUntil(() -> fsHealthService.getHealth().getStatus() == HEALTHY, (2 * fsFreezeDelay) + refreshInterval, | ||
TimeUnit.MILLISECONDS)); | ||
fsHealth = fsHealthService.getHealth(); | ||
assertEquals(HEALTHY, fsHealth.getStatus()); | ||
assertEquals("health check passed", fsHealth.getInfo()); | ||
assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); | ||
fsHealthService.doStop(); | ||
} finally { | ||
PathUtilsForTesting.teardown(); | ||
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { | ||
FileSystem fileSystem = PathUtils.getDefaultFileSystem(); | ||
FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem); | ||
|
@@ -347,7 +403,7 @@ public void force(boolean metaData) throws IOException { | |
|
||
private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider { | ||
|
||
AtomicBoolean injectIOException = new AtomicBoolean(); | ||
AtomicBoolean injectIODelay = new AtomicBoolean(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Th delay is achieved by using Thread.sleep. You could use latching to make this more deterministic in tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tests actually does a latch, we are mimicking a stuck IO operations which is being achieved by causing the fsync operation to go into a sleep. I have however improved the mock to make it more deterministic based on your suggestion |
||
AtomicInteger injectedPaths = new AtomicInteger(); | ||
|
||
private final long delay; | ||
|
@@ -368,7 +424,7 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, | |
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { | ||
@Override | ||
public void force(boolean metaData) throws IOException { | ||
if (injectIOException.get()) { | ||
if (injectIODelay.get()) { | ||
if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { | ||
injectedPaths.incrementAndGet(); | ||
final long startTimeMillis = threadPool.relativeTimeInMillis(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to decrease the interval frequency, here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to checks to be more frequent to be able to catch issues faster