Skip to content

Commit

Permalink
Merge branch 'main' into pointrange_optimization
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi authored Aug 30, 2024
2 parents aa60388 + 1e9fdb4 commit 2d3575b
Show file tree
Hide file tree
Showing 26 changed files with 1,286 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.settings;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {

private static final String TEST_INDEX = "test_index";

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.FALSE)
.build();
}

public void testCreateFeatureFlagDisabled() {
Settings settings = Settings.builder().put(indexSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, false).build();
SettingsException settingsException = expectThrows(SettingsException.class, () -> createIndex(TEST_INDEX, settings));
assertTrue(settingsException.getMessage().contains("unknown setting"));
}

public void testUpdateFeatureFlagDisabled() {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

createIndex(TEST_INDEX, settings);
SettingsException settingsException = expectThrows(SettingsException.class, () -> {
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
});
assertTrue(settingsException.getMessage().contains("unknown setting"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.settings;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase {

private static final String TEST_INDEX = "test_index";

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

private final String expectedFailureMessage =
"To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0ms") // so that after we punt a node we can immediately try to
// reallocate after node left.
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

public void testCreateDocRepFails() {
Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();

IllegalArgumentException illegalArgumentException = expectThrows(
IllegalArgumentException.class,
() -> createIndex(TEST_INDEX, settings)
);
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testUpdateDocRepFails() {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
// create succeeds
createIndex(TEST_INDEX, settings);

// update fails
IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> {
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
});
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
// add 2 nodes for the replicas
internalCluster().startDataOnlyNodes(2);
ensureGreen(TEST_INDEX);

// assert shards are on separate nodes & all active
assertActiveShardCounts(numSearchReplicas, numWriterReplicas);

// stop the primary and ensure search shard is not promoted:
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName));
ensureYellowAndNoInitializingShards(TEST_INDEX);

assertActiveShardCounts(numSearchReplicas, 0); // 1 repl is inactive that was promoted to primary
// add back a node
internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);

}

public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 0;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// start a node for our search replica
String replica = internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);
assertActiveSearchShards(numSearchReplicas);
assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1);

// stop the primary and ensure search shard is not promoted:
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName));
ensureRed(TEST_INDEX);
assertActiveSearchShards(numSearchReplicas);
// while red our search shard is still searchable
assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1);
}

public void testSearchReplicaScaling() {
internalCluster().startNodes(2);
createIndex(TEST_INDEX);
ensureGreen(TEST_INDEX);
// assert settings
Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata();
int numSearchReplicas = Integer.parseInt(metadata.index(TEST_INDEX).getSettings().get(SETTING_NUMBER_OF_SEARCH_REPLICAS));
assertEquals(1, numSearchReplicas);

// assert cluster state & routing table
assertActiveSearchShards(1);

// Add another node and search replica
internalCluster().startDataOnlyNode();
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2))
.get();

ensureGreen(TEST_INDEX);
assertActiveSearchShards(2);

// remove all search shards
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0))
.get();
ensureGreen(TEST_INDEX);
assertActiveSearchShards(0);
}

/**
* Helper to assert counts of active shards for each type.
*/
private void assertActiveShardCounts(int expectedSearchReplicaCount, int expectedWriteReplicaCount) {
// assert routing table
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
// assert search replica count
int activeCount = expectedSearchReplicaCount + expectedWriteReplicaCount;
assertEquals(expectedSearchReplicaCount, indexShardRoutingTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count());
assertEquals(expectedWriteReplicaCount, indexShardRoutingTable.writerReplicas().stream().filter(ShardRouting::active).count());
assertEquals(
expectedWriteReplicaCount + expectedSearchReplicaCount,
indexShardRoutingTable.replicaShards().stream().filter(ShardRouting::active).count()
);

// assert routing nodes
ClusterState clusterState = getClusterState();
assertEquals(activeCount, clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary()).size());
assertEquals(expectedSearchReplicaCount, clusterState.getRoutingNodes().shards(r -> r.active() && r.isSearchOnly()).size());
assertEquals(
expectedWriteReplicaCount,
clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary() && !r.isSearchOnly()).size()
);
}

private void assertActiveSearchShards(int expectedSearchReplicaCount) {
assertActiveShardCounts(expectedSearchReplicaCount, 0);
}

private IndexShardRoutingTable getIndexShardRoutingTable() {
return getClusterState().routingTable().index(TEST_INDEX).shards().values().stream().findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
Expand Down Expand Up @@ -243,6 +244,22 @@ static Setting<Integer> buildNumberOfShardsSetting() {
Property.IndexScope
);

/**
* Setting to control the number of search only replicas for an index.
* A search only replica exists solely to perform read operations for a shard and are designed to achieve
* isolation from writers (primary shards). This means they are not primary eligible and do not have any direct communication
* with their primary. Search replicas require the use of Segment Replication on the index and poll their {@link SegmentReplicationSource} for
* updates. //TODO: Once physical isolation is introduced, reference the setting here.
*/
public static final String SETTING_NUMBER_OF_SEARCH_REPLICAS = "index.number_of_search_only_replicas";
public static final Setting<Integer> INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING = Setting.intSetting(
SETTING_NUMBER_OF_SEARCH_REPLICAS,
0,
0,
Property.Dynamic,
Property.IndexScope
);

public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size";
public static final Setting<Integer> INDEX_ROUTING_PARTITION_SIZE_SETTING = Setting.intSetting(
SETTING_ROUTING_PARTITION_SIZE,
Expand Down Expand Up @@ -649,6 +666,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {

private final int numberOfShards;
private final int numberOfReplicas;
private final int numberOfSearchOnlyReplicas;

private final Index index;
private final long version;
Expand Down Expand Up @@ -702,6 +720,7 @@ private IndexMetadata(
final State state,
final int numberOfShards,
final int numberOfReplicas,
final int numberOfSearchOnlyReplicas,
final Settings settings,
final Map<String, MappingMetadata> mappings,
final Map<String, AliasMetadata> aliases,
Expand Down Expand Up @@ -735,7 +754,8 @@ private IndexMetadata(
this.state = state;
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + 1);
this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas;
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
this.settings = settings;
this.mappings = Collections.unmodifiableMap(mappings);
this.customData = Collections.unmodifiableMap(customData);
Expand Down Expand Up @@ -838,6 +858,10 @@ public int getNumberOfReplicas() {
return numberOfReplicas;
}

public int getNumberOfSearchOnlyReplicas() {
return numberOfSearchOnlyReplicas;
}

public int getRoutingPartitionSize() {
return routingPartitionSize;
}
Expand Down Expand Up @@ -1358,6 +1382,11 @@ public Builder numberOfReplicas(int numberOfReplicas) {
return this;
}

public Builder numberOfSearchReplicas(int numberOfSearchReplicas) {
settings = Settings.builder().put(settings).put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas).build();
return this;
}

public Builder routingPartitionSize(int routingPartitionSize) {
settings = Settings.builder().put(settings).put(SETTING_ROUTING_PARTITION_SIZE, routingPartitionSize).build();
return this;
Expand Down Expand Up @@ -1554,6 +1583,7 @@ public IndexMetadata build() {
throw new IllegalArgumentException("must specify number of replicas for index [" + index + "]");
}
final int numberOfReplicas = INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int numberOfSearchReplicas = INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(settings);

int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings);
if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) {
Expand Down Expand Up @@ -1649,6 +1679,7 @@ public IndexMetadata build() {
state,
numberOfShards,
numberOfReplicas,
numberOfSearchReplicas,
tmpSettings,
mappings,
tmpAliases,
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,24 @@ public Builder updateNumberOfReplicas(final int numberOfReplicas, final String[]
return this;
}

/**
* Update the number of search replicas for the specified indices.
*
* @param numberOfSearchReplicas the number of search replicas
* @param indices the indices to update the number of replicas for
* @return the builder
*/
public Builder updateNumberOfSearchReplicas(final int numberOfSearchReplicas, final String[] indices) {
for (String index : indices) {
IndexMetadata indexMetadata = this.indices.get(index);
if (indexMetadata == null) {
throw new IndexNotFoundException(index);
}
put(IndexMetadata.builder(indexMetadata).numberOfSearchReplicas(numberOfSearchReplicas));
}
return this;
}

public Builder coordinationMetadata(CoordinationMetadata coordinationMetadata) {
this.coordinationMetadata = coordinationMetadata;
return this;
Expand Down
Loading

0 comments on commit 2d3575b

Please sign in to comment.