From 6ac635e7f32f281473685d69dbcce1a65d372e2d Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 24 May 2023 10:21:52 -0700 Subject: [PATCH 1/2] Segment Replication - Allow shard idle when there are no replicas an index. Signed-off-by: Marc Handalian --- .../opensearch/index/shard/IndexShard.java | 5 +- .../SegmentReplicationIndexShardTests.java | 79 +++++++++++++------ 2 files changed, 58 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e368ee82b1084..0b009714cf0df 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4224,10 +4224,11 @@ public boolean scheduledRefresh() { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false - && indexSettings.isSegRepEnabled() == false - // Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only + // Indices with segrep enabled will never wait on a refresh and ignore shard idle unless there are no replicas. Primary + // shards push out new segments only // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving // a new set of segments. + && (indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0) && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0c859c5f6a64a..c2056c8bd83be 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -220,37 +220,68 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } public void testIgnoreShardIdle() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + try (ReplicationGroup shards = createGroup(1, updatedSettings, new NRTReplicationEngineFactory())) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - primary.refresh("test"); + final int numDocs = shards.indexDocs(randomIntBetween(1, 10)); + // ensure search idle conditions are met. + assertTrue(primary.isSearchIdle()); + assertTrue(replica.isSearchIdle()); + + // invoke scheduledRefresh, returns true if refresh is immediately invoked. + assertTrue(primary.scheduledRefresh()); + // replica would always return false here as there is no indexed doc to refresh on. + assertFalse(replica.scheduledRefresh()); + + // assert there is no pending refresh + assertFalse(primary.hasRefreshPending()); + assertFalse(replica.hasRefreshPending()); + shards.refresh("test"); replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs); + } + } - primary.scheduledRefresh(); - replica.scheduledRefresh(); - - primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); - replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); - - // Update the search_idle setting, this will put both shards into search idle. - Settings updatedSettings = Settings.builder() - .put(settings) - .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) - .build(); - primary.indexSettings().getScopedSettings().applySettings(updatedSettings); - replica.indexSettings().getScopedSettings().applySettings(updatedSettings); - - primary.scheduledRefresh(); - replica.scheduledRefresh(); + public void testShardIdle_Docrep() throws Exception { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + .build(); + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + final int numDocs = shards.indexDocs(randomIntBetween(1, 10)); + // ensure search idle conditions are met. + assertTrue(primary.isSearchIdle()); + assertTrue(replica.isSearchIdle()); + assertFalse(primary.scheduledRefresh()); + assertFalse(replica.scheduledRefresh()); + assertTrue(primary.hasRefreshPending()); + assertTrue(replica.hasRefreshPending()); + shards.refresh("test"); + shards.assertAllEqual(numDocs); + } + } - // Shards without segrep will register a new RefreshListener on the engine and return true when registered, - // assert with segrep enabled that awaitShardSearchActive does not register a listener. - primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); - replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + public void testShardIdleWithNoReplicas() throws Exception { + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + try (ReplicationGroup shards = createGroup(0, updatedSettings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + shards.indexDocs(randomIntBetween(1, 10)); + // ensure search idle conditions are met. + assertTrue(primary.isSearchIdle()); + assertFalse(primary.scheduledRefresh()); + assertTrue(primary.hasRefreshPending()); } } From 50b03b316d42d842068e10d3ab8413019eb9569a Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 19 Jun 2023 09:56:36 -0700 Subject: [PATCH 2/2] Add validation to index.search.idle.after when segment replication is enabled. This change updates search idle setting to not be configurable on segrep indices with replicas. Signed-off-by: Marc Handalian --- .../action/search/TransportSearchIT.java | 29 ++++++++++++++++ .../org/opensearch/index/IndexSettings.java | 33 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java index 895d7ebea88b6..afc0fca8e41da 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java @@ -61,6 +61,7 @@ import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPlugin; import org.opensearch.rest.RestStatus; @@ -370,6 +371,34 @@ public void testSearchIdle() throws Exception { }); } + public void testSearchIdle_SegmentReplication() { + // fail with replica count > 0 + int numOfReplicas = 1; + internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1); + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500))); + + expectThrows(IllegalArgumentException.class, () -> { + assertAcked(prepareCreate("test").setSettings(settings).setMapping("created_date", "type=date,format=yyyy-MM-dd")); + }); + + // setting allowed with 0 replicas. + settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + assertAcked(prepareCreate("test").setSettings(settings).setMapping("created_date", "type=date,format=yyyy-MM-dd")); + + // add a replica + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + } + public void testCircuitBreakerReduceFail() throws Exception { int numShards = randomIntBetween(1, 10); indexSomeDocs("test", numShards, numShards * 3); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 9c6613495ba80..b186f56ba8577 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -54,8 +54,10 @@ import org.opensearch.search.pipeline.SearchPipelineService; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -70,6 +72,7 @@ import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING; import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION; +import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; /** * This class encapsulates all index level settings and handles settings updates. @@ -122,6 +125,36 @@ public final class IndexSettings { "index.search.idle.after", TimeValue.timeValueSeconds(30), TimeValue.timeValueMinutes(0), + new Setting.Validator<>() { + + @Override + public void validate(TimeValue value) { + + } + + @Override + public void validate(TimeValue value, Map, Object> settings, boolean isPresent) { + if (isPresent) { + final Object clusterSettingReplicationType = settings.get(CLUSTER_REPLICATION_TYPE_SETTING); + final Object replicationType = settings.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING); + int numReplicas = (int) settings.get(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING); + if ((ReplicationType.SEGMENT.equals(clusterSettingReplicationType) || ReplicationType.SEGMENT.equals(replicationType)) + && numReplicas > 0) { + throw new IllegalArgumentException("Shard idle is disabled for indices with replicas using Segment Replication"); + } + } + } + + @Override + public Iterator> settings() { + final List> settings = List.of( + CLUSTER_REPLICATION_TYPE_SETTING, + IndexMetadata.INDEX_REPLICATION_TYPE_SETTING, + IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING + ); + return settings.iterator(); + } + }, Property.IndexScope, Property.Dynamic );