diff --git a/CHANGELOG.md b/CHANGELOG.md index 60ca47d477c8c..fe50ddcfee88b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix flaky ResourceAwareTasksTests.testBasicTaskResourceTracking test ([#8993](https://github.com/opensearch-project/OpenSearch/pull/8993)) - Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403)) +- Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437)) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java index 39d1da19db923..83faa2a4d2f18 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -407,9 +407,21 @@ public void onNewInfo(ClusterInfo info) { if ((state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()) == false) && nodes.size() > 0 && nodesOverHighThreshold.size() == nodes.size()) { + logger.warn( + "Putting index create block on cluster as all nodes are breaching high disk watermark. " + + "Number of nodes above high watermark: {}.", + nodesOverHighThreshold.size() + ); setIndexCreateBlock(listener, true); } else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()) - && diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()) { + && diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled() + && nodesOverHighThreshold.size() < nodes.size()) { + logger.warn( + "Removing index create block on cluster as all nodes are no longer breaching high disk watermark. " + + "Number of nodes above high watermark: {}. Total numbers of nodes: {}.", + nodesOverHighThreshold.size(), + nodes.size() + ); setIndexCreateBlock(listener, false); } else { listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 3f54387d39579..6ab57d10b05c1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -55,10 +55,12 @@ import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,6 +69,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -581,12 +584,16 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC ); advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed - assertSingleWarningMessage( - monitor, - aboveHighWatermark, + final List messages = new ArrayList<>(); + messages.add( "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + "the node is expected to continue to exceed the high disk watermark when these relocations are complete" ); + messages.add( + "Putting index create block on cluster as all nodes are breaching high disk watermark. " + + "Number of nodes above high watermark: 1." + ); + assertMultipleWarningMessages(monitor, aboveHighWatermark, messages); advanceTime.set(true); assertRepeatedWarningMessages( @@ -605,22 +612,11 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC relocatingShardSizeRef.set(-5L); advanceTime.set(true); - assertSingleInfoMessage( - monitor, - aboveHighWatermark, - "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " - + "the node is expected to be below the high disk watermark when these relocations are complete" - ); relocatingShardSizeRef.set(0L); timeSupplier.getAsLong(); // advance time long enough to do another reroute advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed - assertSingleWarningMessage( - monitor, - aboveHighWatermark, - "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " - + "the node is expected to continue to exceed the high disk watermark when these relocations are complete" - ); + assertMultipleWarningMessages(monitor, aboveHighWatermark, messages); advanceTime.set(true); assertRepeatedWarningMessages( @@ -722,6 +718,113 @@ protected void setIndexCreateBlock(ActionListener listener, boolean indexC assertTrue(countBlocksCalled.get() == 0); } + public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() { + AllocationService allocation = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.blocks.create_index.enabled", false) + .build() + ); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node2")) + .numberOfShards(1) + .numberOfReplicas(0) + ) + .put( + IndexMetadata.builder("test_1") + .settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1")) + .numberOfShards(1) + .numberOfReplicas(0) + ) + .put( + IndexMetadata.builder("test_2") + .settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1")) + .numberOfShards(1) + .numberOfReplicas(0) + ) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .addAsNew(metadata.index("test_1")) + .addAsNew(metadata.index("test_2")) + .build(); + + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .blocks(ClusterBlocks.builder().addGlobalBlock(Metadata.CLUSTER_CREATE_INDEX_BLOCK).build()) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(), + allocation + ); + AtomicReference> indices = new AtomicReference<>(); + AtomicInteger countBlocksCalled = new AtomicInteger(); + AtomicInteger countUnblockBlocksCalled = new AtomicInteger(); + AtomicLong currentTime = new AtomicLong(); + Settings settings = Settings.builder().put(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), true).build(); + DiskThresholdMonitor monitor = new DiskThresholdMonitor( + settings, + () -> clusterState, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null, + currentTime::get, + (reason, priority, listener) -> { + listener.onResponse(null); + } + ) { + + @Override + protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { + assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + assertTrue(readOnly); + listener.onResponse(null); + } + + @Override + protected void setIndexCreateBlock(ActionListener listener, boolean indexCreateBlock) { + if (indexCreateBlock == true) { + countBlocksCalled.set(countBlocksCalled.get() + 1); + } else { + countUnblockBlocksCalled.set(countUnblockBlocksCalled.get() + 1); + } + + listener.onResponse(null); + } + }; + + Map builder = new HashMap<>(); + + // Initially all the nodes are breaching high watermark and IndexCreateBlock is already present on the cluster. + // Since block is already present, DiskThresholdMonitor should not again try to apply block. + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 9)); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 9)); + monitor.onNewInfo(clusterInfo(builder)); + // Since Block is already present and nodes are below high watermark so neither block nor unblock will be called. + assertEquals(countBlocksCalled.get(), 0); + assertEquals(countUnblockBlocksCalled.get(), 0); + + // Ensure DiskThresholdMonitor does not try to remove block in the next iteration if all nodes are breaching high watermark. + monitor.onNewInfo(clusterInfo(builder)); + assertEquals(countBlocksCalled.get(), 0); + assertEquals(countUnblockBlocksCalled.get(), 0); + + builder = new HashMap<>(); + + // If any node is no longer breaching high watermark, DiskThresholdMonitor should remove IndexCreateBlock. + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 19)); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 1)); + // Need to add delay in current time to allow nodes to be removed high watermark list. + currentTime.addAndGet(randomLongBetween(60001, 120000)); + + monitor.onNewInfo(clusterInfo(builder)); + // Block will be removed if any nodes is no longer breaching high watermark. + assertEquals(countBlocksCalled.get(), 0); + assertEquals(countUnblockBlocksCalled.get(), 1); + } + private void assertNoLogging(DiskThresholdMonitor monitor, final Map diskUsages) throws IllegalAccessException { try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(DiskThresholdMonitor.class))) { mockAppender.addExpectation( @@ -756,10 +859,11 @@ private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, final M } } - private void assertSingleWarningMessage(DiskThresholdMonitor monitor, final Map diskUsages, String message) + private void assertMultipleWarningMessages(DiskThresholdMonitor monitor, final Map diskUsages, List messages) throws IllegalAccessException { - assertLogging(monitor, diskUsages, Level.WARN, message); - assertNoLogging(monitor, diskUsages); + for (int index = 0; index < messages.size(); index++) { + assertLogging(monitor, diskUsages, Level.WARN, messages.get(index)); + } } private void assertSingleInfoMessage(DiskThresholdMonitor monitor, final Map diskUsages, String message)