-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce FS Health HEALTHY threshold to fail stuck node #1167
Changes from 4 commits
88f885b
d1789a5
c7c909c
bec0b33
fc17516
1cdc7aa
739a1da
7987ad9
4164b10
53e7b0e
0e4c57a
a908188
7848646
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,7 @@ | |
import java.nio.file.StandardOpenOption; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.LongSupplier; | ||
import java.util.stream.Collectors; | ||
|
||
|
@@ -78,18 +79,23 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH | |
private final NodeEnvironment nodeEnv; | ||
private final LongSupplier currentTimeMillisSupplier; | ||
private volatile Scheduler.Cancellable scheduledFuture; | ||
private volatile TimeValue healthyTimeoutThreshold; | ||
private final AtomicLong lastRunStartTimeMillis = new AtomicLong(Long.MIN_VALUE); | ||
|
||
@Nullable | ||
private volatile Set<Path> unhealthyPaths; | ||
|
||
public static final Setting<Boolean> ENABLED_SETTING = | ||
Setting.boolSetting("monitor.fs.health.enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
public static final Setting<TimeValue> REFRESH_INTERVAL_SETTING = | ||
Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(120), TimeValue.timeValueMillis(1), | ||
Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), | ||
Setting.Property.NodeScope); | ||
public static final Setting<TimeValue> SLOW_PATH_LOGGING_THRESHOLD_SETTING = | ||
Setting.timeSetting("monitor.fs.health.slow_path_logging_threshold", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(1), | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
public static final Setting<TimeValue> HEALTHY_TIMEOUT_SETTING = | ||
Setting.timeSetting("monitor.fs.health.healthy_timeout_threshold", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), | ||
Setting.Property.NodeScope, Setting.Property.Dynamic); | ||
Comment on lines
+98
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given FS Healthcheck has deeper impacts, having retries of lets say There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would either delay the detection or increases chances of false positives. Maybe we can extend support in future. Not expanding the scope of this PR |
||
|
||
|
||
public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { | ||
|
@@ -98,8 +104,10 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa | |
this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); | ||
this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings); | ||
this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; | ||
this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings); | ||
this.nodeEnv = nodeEnv; | ||
clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); | ||
clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold); | ||
clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); | ||
} | ||
|
||
|
@@ -126,6 +134,10 @@ public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) { | |
this.slowPathLoggingThreshold = slowPathLoggingThreshold; | ||
} | ||
|
||
public void setHealthyTimeoutThreshold(TimeValue healthyTimeoutThreshold) { | ||
this.healthyTimeoutThreshold = healthyTimeoutThreshold; | ||
} | ||
|
||
@Override | ||
public StatusInfo getHealth() { | ||
StatusInfo statusInfo; | ||
|
@@ -134,6 +146,9 @@ public StatusInfo getHealth() { | |
statusInfo = new StatusInfo(HEALTHY, "health check disabled"); | ||
} else if (brokenLock) { | ||
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); | ||
} else if (lastRunStartTimeMillis.get() >= 0 && currentTimeMillisSupplier.getAsLong() - | ||
lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis()) { | ||
statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); | ||
Bukhtawar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else if (unhealthyPaths == null) { | ||
statusInfo = new StatusInfo(HEALTHY, "health check passed"); | ||
} else { | ||
|
@@ -149,14 +164,15 @@ class FsHealthMonitor implements Runnable { | |
static final String TEMP_FILE_NAME = ".opensearch_temp_file"; | ||
private byte[] byteToWrite; | ||
|
||
FsHealthMonitor(){ | ||
FsHealthMonitor() { | ||
this.byteToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
if (enabled) { | ||
setLastRunStartTimeMillis(); | ||
monitorFSHealth(); | ||
logger.debug("health check succeeded"); | ||
} | ||
|
@@ -205,5 +221,9 @@ private void monitorFSHealth() { | |
brokenLock = false; | ||
} | ||
} | ||
|
||
private void setLastRunStartTimeMillis() { | ||
lastRunStartTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.env.NodeEnvironment; | ||
import org.opensearch.monitor.StatusInfo; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
import org.opensearch.test.MockLogAppender; | ||
import org.opensearch.test.junit.annotations.TestLogging; | ||
|
@@ -63,6 +64,7 @@ | |
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; | ||
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; | ||
import static org.opensearch.node.Node.NODE_NAME_SETTING; | ||
|
@@ -170,7 +172,7 @@ public void testLoggingOnHungIO() throws Exception { | |
} | ||
|
||
//disrupt file system | ||
disruptFileSystemProvider.injectIOException.set(true); | ||
disruptFileSystemProvider.injectIODelay.set(true); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); | ||
assertBusy(mockAppender::assertAllExpectationsMatched); | ||
|
@@ -182,6 +184,60 @@ public void testLoggingOnHungIO() throws Exception { | |
} | ||
} | ||
|
||
public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { | ||
long healthyTimeoutThreshold = randomLongBetween(500, 1000); | ||
long refreshInterval = randomLongBetween(20, 50); | ||
long slowLogThreshold = randomLongBetween(100, 200); | ||
long delayBetweenChecks = 100; | ||
final Settings settings = Settings.builder() | ||
.put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms") | ||
.put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms") | ||
.put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms") | ||
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0)//we need to verify exact time | ||
.build(); | ||
FileSystem fileSystem = PathUtils.getDefaultFileSystem(); | ||
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); | ||
FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, testThreadPool); | ||
fileSystem = disruptFileSystemProvider.getFileSystem(null); | ||
PathUtilsForTesting.installMock(fileSystem); | ||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
try (NodeEnvironment env = newNodeEnvironment()) { | ||
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
logger.info("--> Initial health status prior to the first monitor run"); | ||
StatusInfo fsHealth = fsHealthService.getHealth(); | ||
assertEquals(HEALTHY, fsHealth.getStatus()); | ||
assertEquals("health check passed", fsHealth.getInfo()); | ||
logger.info("--> First monitor run"); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
fsHealth = fsHealthService.getHealth(); | ||
assertEquals(HEALTHY, fsHealth.getStatus()); | ||
assertEquals("health check passed", fsHealth.getInfo()); | ||
logger.info("--> Disrupt file system"); | ||
disruptFileSystemProvider.injectIODelay.set(true); | ||
final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthSrvc.doStart(); | ||
assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + refreshInterval, | ||
TimeUnit.MILLISECONDS)); | ||
fsHealth = fsHealthSrvc.getHealth(); | ||
assertEquals(UNHEALTHY, fsHealth.getStatus()); | ||
assertEquals("healthy threshold breached", fsHealth.getInfo()); | ||
int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); | ||
assertThat(disruptedPathCount, equalTo(1)); | ||
logger.info("--> Fix file system disruption"); | ||
disruptFileSystemProvider.injectIODelay.set(false); | ||
assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, delayBetweenChecks + (2 * refreshInterval), | ||
TimeUnit.MILLISECONDS)); | ||
fsHealth = fsHealthSrvc.getHealth(); | ||
assertEquals(HEALTHY, fsHealth.getStatus()); | ||
assertEquals("health check passed", fsHealth.getInfo()); | ||
assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); | ||
fsHealthSrvc.doStop(); | ||
} finally { | ||
PathUtilsForTesting.teardown(); | ||
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { | ||
FileSystem fileSystem = PathUtils.getDefaultFileSystem(); | ||
FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem); | ||
|
@@ -347,18 +403,25 @@ public void force(boolean metaData) throws IOException { | |
|
||
private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider { | ||
|
||
AtomicBoolean injectIOException = new AtomicBoolean(); | ||
AtomicBoolean injectIODelay = new AtomicBoolean(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Th delay is achieved by using Thread.sleep. You could use latching to make this more deterministic in tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tests actually does a latch, we are mimicking a stuck IO operations which is being achieved by causing the fsync operation to go into a sleep. I have however improved the mock to make it more deterministic based on your suggestion |
||
AtomicInteger injectedPaths = new AtomicInteger(); | ||
|
||
private final long delay; | ||
private final ThreadPool threadPool; | ||
private static final long AWAIT_BUSY_THRESHOLD = 100L; | ||
|
||
FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) { | ||
super("disrupt_fs_health://", inner); | ||
this.delay = delay; | ||
this.threadPool = threadPool; | ||
} | ||
|
||
FileSystemFsyncHungProvider(FileSystem inner, ThreadPool threadPool) { | ||
super("disrupt_fs_health://", inner); | ||
this.threadPool = threadPool; | ||
this.delay = Long.MAX_VALUE; | ||
} | ||
|
||
public int getInjectedPathCount(){ | ||
return injectedPaths.get(); | ||
} | ||
|
@@ -368,17 +431,20 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, | |
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { | ||
@Override | ||
public void force(boolean metaData) throws IOException { | ||
if (injectIOException.get()) { | ||
if (injectIODelay.get()) { | ||
if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { | ||
injectedPaths.incrementAndGet(); | ||
final long startTimeMillis = threadPool.relativeTimeInMillis(); | ||
long timeInMillis = 1; | ||
long maxWaitTimeMillis = startTimeMillis + delay >= 0 ? startTimeMillis + delay : Long.MAX_VALUE;//long overflow | ||
do { | ||
try { | ||
Thread.sleep(delay); | ||
Thread.sleep(timeInMillis); | ||
} catch (InterruptedException e) { | ||
throw new AssertionError(e); | ||
} | ||
} while (threadPool.relativeTimeInMillis() <= startTimeMillis + delay); | ||
timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2); | ||
} while (threadPool.relativeTimeInMillis() <= maxWaitTimeMillis && injectIODelay.get()); | ||
} | ||
} | ||
super.force(metaData); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to decrease the interval frequency, here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to checks to be more frequent to be able to catch issues faster