Skip to content

Commit

Permalink
[Metadata Immutability] Change different indices lookup objects from …
Browse files Browse the repository at this point in the history
…array type to lists

Changed the arrays to immutable List instances, added new versions of the getters which returns List instances.

Resolves opensearch-project#8647

Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]>
  • Loading branch information
akolarkunnu committed Jul 11, 2024
1 parent 2d8c68c commit 045617f
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/))
- Deprecate CamelCase `PathHierarchy` tokenizer name in favor to lowercase `path_hierarchy` ([#10894](https://github.com/opensearch-project/OpenSearch/pull/10894))
- Breaking change: Do not request "search_pipelines" metrics by default in NodesInfoRequest ([#12497](https://github.com/opensearch-project/OpenSearch/pull/12497))
- Breaking change: Modify the utility APIs in the Metadata to get different indices ([#14557](https://github.com/opensearch-project/OpenSearch/pull/14557))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;
Expand Down Expand Up @@ -215,13 +216,13 @@ public ClusterHealthResponse(StreamInput in) throws IOException {
}

/** needed for plugins BWC */
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
public ClusterHealthResponse(String clusterName, Set<String> concreteIndices, ClusterState clusterState) {
this(clusterName, concreteIndices, clusterState, -1, -1, -1, TimeValue.timeValueHours(0));
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
Set<String> concreteIndices,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
Expand All @@ -242,7 +243,7 @@ public ClusterHealthResponse(
String clusterName,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
Set<String> concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
Expand All @@ -71,6 +70,8 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -490,7 +491,7 @@ private ClusterHealthResponse clusterHealth(
logger.trace("Calculating health based on state version [{}]", clusterState.version());
}

String[] concreteIndices;
Set<String> concreteIndices;
if (request.level().equals(ClusterHealthRequest.Level.AWARENESS_ATTRIBUTES)) {
String awarenessAttribute = request.getAwarenessAttribute();
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
Expand All @@ -508,12 +509,12 @@ private ClusterHealthResponse clusterHealth(
}

try {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
concreteIndices = Set.of(indexNameExpressionResolver.concreteIndexNames(clusterState, request));
} catch (IndexNotFoundException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
Collections.emptySet(),
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ protected void clusterManagerOperation(Request request, ClusterState state, Acti
}
ClusterStateHealth streamHealth = new ClusterStateHealth(
state,
dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new)
dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toSet())
);
dataStreamInfos.add(new Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Cluster state health information
Expand Down Expand Up @@ -80,9 +81,9 @@ public ClusterStateHealth(final ClusterState clusterState) {
* Creates a new <code>ClusterStateHealth</code> instance considering the current cluster state and the provided index names.
*
* @param clusterState The current cluster state. Must not be null.
* @param concreteIndices An array of index names to consider. Must not be null but may be empty.
* @param concreteIndices A set of index names to consider. Must not be null but may be empty.
*/
public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices) {
public ClusterStateHealth(final ClusterState clusterState, final Set<String> concreteIndices) {
numberOfNodes = clusterState.nodes().getSize();
numberOfDataNodes = clusterState.nodes().getDataNodes().size();
hasDiscoveredClusterManager = clusterState.nodes().getClusterManagerNodeId() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ public Map<String, Set<String>> resolveSearchRoutingAllIndices(Metadata metadata
if (routing != null) {
Set<String> r = Sets.newHashSet(Strings.splitStringByCommaToArray(routing));
Map<String, Set<String>> routings = new HashMap<>();
String[] concreteIndices = metadata.getConcreteAllIndices();
Set<String> concreteIndices = metadata.getConcreteAllIndices();
for (String index : concreteIndices) {
routings.put(index, r);
}
Expand Down Expand Up @@ -746,7 +746,7 @@ static boolean isExplicitAllPattern(Collection<String> aliasesOrIndices) {
*/
boolean isPatternMatchingAllIndices(Metadata metadata, String[] indicesOrAliases, String[] concreteIndices) {
// if we end up matching on all indices, check, if its a wildcard parameter, or a "-something" structure
if (concreteIndices.length == metadata.getConcreteAllIndices().length && indicesOrAliases.length > 0) {
if (concreteIndices.length == metadata.getConcreteAllIndices().size() && indicesOrAliases.length > 0) {

// we might have something like /-test1,+test1 that would identify all indices
// or something like /-test1 with test1 index missing and IndicesOptions.lenient()
Expand Down Expand Up @@ -1182,17 +1182,17 @@ private boolean isEmptyOrTrivialWildcard(List<String> expressions) {

private static List<String> resolveEmptyOrTrivialWildcard(IndicesOptions options, Metadata metadata) {
if (options.expandWildcardsOpen() && options.expandWildcardsClosed() && options.expandWildcardsHidden()) {
return Arrays.asList(metadata.getConcreteAllIndices());
return List.copyOf(metadata.getConcreteAllIndices());
} else if (options.expandWildcardsOpen() && options.expandWildcardsClosed()) {
return Arrays.asList(metadata.getConcreteVisibleIndices());
return metadata.getConcreteVisibleIndices();
} else if (options.expandWildcardsOpen() && options.expandWildcardsHidden()) {
return Arrays.asList(metadata.getConcreteAllOpenIndices());
return metadata.getConcreteAllOpenIndices();
} else if (options.expandWildcardsOpen()) {
return Arrays.asList(metadata.getConcreteVisibleOpenIndices());
return metadata.getConcreteVisibleOpenIndices();
} else if (options.expandWildcardsClosed() && options.expandWildcardsHidden()) {
return Arrays.asList(metadata.getConcreteAllClosedIndices());
return metadata.getConcreteAllClosedIndices();
} else if (options.expandWildcardsClosed()) {
return Arrays.asList(metadata.getConcreteVisibleClosedIndices());
return metadata.getConcreteVisibleClosedIndices();
} else {
return Collections.emptyList();
}
Expand Down
83 changes: 36 additions & 47 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
private final int totalOpenIndexShards;

private final String[] allIndices;
private final String[] visibleIndices;
private final String[] allOpenIndices;
private final String[] visibleOpenIndices;
private final String[] allClosedIndices;
private final String[] visibleClosedIndices;
private final Set<String> allIndices;
private final List<String> visibleIndices;
private final List<String> allOpenIndices;
private final List<String> visibleOpenIndices;
private final List<String> allClosedIndices;
private final List<String> visibleClosedIndices;

private final SortedMap<String, IndexAbstraction> indicesLookup;

Expand All @@ -291,12 +291,12 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
final Map<String, IndexMetadata> indices,
final Map<String, IndexTemplateMetadata> templates,
final Map<String, Custom> customs,
String[] allIndices,
String[] visibleIndices,
String[] allOpenIndices,
String[] visibleOpenIndices,
String[] allClosedIndices,
String[] visibleClosedIndices,
Set<String> allIndices,
List<String> visibleIndices,
List<String> allOpenIndices,
List<String> visibleOpenIndices,
List<String> allClosedIndices,
List<String> visibleClosedIndices,
SortedMap<String, IndexAbstraction> indicesLookup
) {
this.clusterUUID = clusterUUID;
Expand All @@ -321,12 +321,12 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
this.totalNumberOfShards = totalNumberOfShards;
this.totalOpenIndexShards = totalOpenIndexShards;

this.allIndices = allIndices;
this.visibleIndices = visibleIndices;
this.allOpenIndices = allOpenIndices;
this.visibleOpenIndices = visibleOpenIndices;
this.allClosedIndices = allClosedIndices;
this.visibleClosedIndices = visibleClosedIndices;
this.allIndices = Collections.unmodifiableSet(allIndices);
this.visibleIndices = Collections.unmodifiableList(visibleIndices);
this.allOpenIndices = Collections.unmodifiableList(allOpenIndices);
this.visibleOpenIndices = Collections.unmodifiableList(visibleOpenIndices);
this.allClosedIndices = Collections.unmodifiableList(allClosedIndices);
this.visibleClosedIndices = Collections.unmodifiableList(visibleClosedIndices);
this.indicesLookup = indicesLookup;
}

Expand Down Expand Up @@ -603,42 +603,42 @@ private static String mergePaths(String path, String field) {
/**
* Returns all the concrete indices.
*/
public String[] getConcreteAllIndices() {
public Set<String> getConcreteAllIndices() {
return allIndices;
}

/**
* Returns all the concrete indices that are not hidden.
*/
public String[] getConcreteVisibleIndices() {
public List<String> getConcreteVisibleIndices() {
return visibleIndices;
}

/**
* Returns all of the concrete indices that are open.
*/
public String[] getConcreteAllOpenIndices() {
public List<String> getConcreteAllOpenIndices() {
return allOpenIndices;
}

/**
* Returns all of the concrete indices that are open and not hidden.
*/
public String[] getConcreteVisibleOpenIndices() {
public List<String> getConcreteVisibleOpenIndices() {
return visibleOpenIndices;
}

/**
* Returns all of the concrete indices that are closed.
*/
public String[] getConcreteAllClosedIndices() {
public List<String> getConcreteAllClosedIndices() {
return allClosedIndices;
}

/**
* Returns all of the concrete indices that are closed and not hidden.
*/
public String[] getConcreteVisibleClosedIndices() {
public List<String> getConcreteVisibleClosedIndices() {
return visibleClosedIndices;
}

Expand Down Expand Up @@ -1576,12 +1576,12 @@ protected Metadata buildMetadataWithPreviousIndicesLookups() {
indices,
templates,
customs,
Arrays.copyOf(previousMetadata.allIndices, previousMetadata.allIndices.length),
Arrays.copyOf(previousMetadata.visibleIndices, previousMetadata.visibleIndices.length),
Arrays.copyOf(previousMetadata.allOpenIndices, previousMetadata.allOpenIndices.length),
Arrays.copyOf(previousMetadata.visibleOpenIndices, previousMetadata.visibleOpenIndices.length),
Arrays.copyOf(previousMetadata.allClosedIndices, previousMetadata.allClosedIndices.length),
Arrays.copyOf(previousMetadata.visibleClosedIndices, previousMetadata.visibleClosedIndices.length),
previousMetadata.allIndices,
previousMetadata.visibleIndices,
previousMetadata.allOpenIndices,
previousMetadata.visibleOpenIndices,
previousMetadata.allClosedIndices,
previousMetadata.visibleClosedIndices,
Collections.unmodifiableSortedMap(previousMetadata.indicesLookup)
);
}
Expand Down Expand Up @@ -1677,17 +1677,6 @@ protected Metadata buildMetadataWithRecomputedIndicesLookups() {

validateDataStreams(indicesLookup, (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE));

// build all concrete indices arrays:
// TODO: I think we can remove these arrays. it isn't worth the effort, for operations on all indices.
// When doing an operation across all indices, most of the time is spent on actually going to all shards and
// do the required operations, the bottleneck isn't resolving expressions into concrete indices.
String[] allIndicesArray = allIndices.toArray(Strings.EMPTY_ARRAY);
String[] visibleIndicesArray = visibleIndices.toArray(Strings.EMPTY_ARRAY);
String[] allOpenIndicesArray = allOpenIndices.toArray(Strings.EMPTY_ARRAY);
String[] visibleOpenIndicesArray = visibleOpenIndices.toArray(Strings.EMPTY_ARRAY);
String[] allClosedIndicesArray = allClosedIndices.toArray(Strings.EMPTY_ARRAY);
String[] visibleClosedIndicesArray = visibleClosedIndices.toArray(Strings.EMPTY_ARRAY);

return new Metadata(
clusterUUID,
clusterUUIDCommitted,
Expand All @@ -1699,12 +1688,12 @@ protected Metadata buildMetadataWithRecomputedIndicesLookups() {
indices,
templates,
customs,
allIndicesArray,
visibleIndicesArray,
allOpenIndicesArray,
visibleOpenIndicesArray,
allClosedIndicesArray,
visibleClosedIndicesArray,
allIndices,
visibleIndices,
allOpenIndices,
visibleOpenIndices,
allClosedIndices,
visibleClosedIndices,
indicesLookup
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public RemoteRestoreResult restore(
}
} else {
List<String> filteredIndices = filterIndices(
List.of(currentState.metadata().getConcreteAllIndices()),
List.copyOf(currentState.metadata().getConcreteAllIndices()),
indexNames,
IndicesOptions.fromOptions(true, true, true, true)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -93,7 +94,7 @@ public void testClusterHealth() throws IOException {
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
ClusterHealthResponse clusterHealth = new ClusterHealthResponse(
"bla",
new String[] { Metadata.ALL },
Set.of(Metadata.ALL),
clusterState,
pendingTasks,
inFlight,
Expand Down Expand Up @@ -121,7 +122,7 @@ public void testClusterHealthVerifyClusterManagerNodeDiscovery() throws IOExcept
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
ClusterHealthResponse clusterHealth = new ClusterHealthResponse(
"bla",
new String[] { Metadata.ALL },
Set.of(Metadata.ALL),
clusterState,
pendingTasks,
inFlight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;

import static org.hamcrest.core.IsEqual.equalTo;

public class TransportClusterHealthActionTests extends OpenSearchTestCase {

public void testWaitForInitializingShards() throws Exception {
final String[] indices = { "test" };
final Set<String> indices = Set.of("test");
final ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForNoInitializingShards(true);
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 0);
Expand All @@ -76,7 +77,7 @@ public void testWaitForInitializingShards() throws Exception {
}

public void testWaitForAllShards() {
final String[] indices = { "test" };
final Set<String> indices = Set.of("test");
final ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForActiveShards(ActiveShardCount.ALL);

Expand Down
Loading

0 comments on commit 045617f

Please sign in to comment.