diff --git a/CHANGELOG.md b/CHANGELOG.md index f4e2c43a715f6..3e6e94fde859d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604)) -- Add PSA transport action for bulk async fetch of shards ([#5098](https://github.com/opensearch-project/OpenSearch/issues/5098)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 @@ -45,7 +44,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - Pass localNode info to all plugins on node start ([#7919](https://github.com/opensearch-project/OpenSearch/pull/7919)) -- Modified the existing async shard fetch transport action to use the helper functions added for bulk fetching ([#5098](https://github.com/opensearch-project/OpenSearch/issues/5098)) ### Deprecated ### Removed diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java b/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java index 4d06bfeb52f8f..37bd7b97123e7 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java @@ -21,12 +21,14 @@ * This class is responsible for fetching shard data from nodes. It is analogous to AsyncShardFetch class since it fetches * the data in asynchronous manner too. * @param + * + * @opensearch.internal */ public abstract class AsyncShardsFetchPerNode implements Releasable { /** * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(DiscoveryNode[] nodes, Map shardsIdMap, ActionListener listener); + void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener); } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesBatchListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesBatchListGatewayStartedShards.java new file mode 100644 index 0000000000000..590e7e8fff4a2 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesBatchListGatewayStartedShards.java @@ -0,0 +1,287 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.gateway; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardInfo; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * This transport action is used to fetch all unassigned shard version from each node during primary allocation in {@link GatewayAllocator}. + * We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate + * shards after node or cluster restarts. + * + * @opensearch.internal + */ +public class TransportNodesBatchListGatewayStartedShards extends TransportNodesAction< + TransportNodesBatchListGatewayStartedShards.Request, + TransportNodesBatchListGatewayStartedShards.NodesGatewayStartedShards, + TransportNodesBatchListGatewayStartedShards.NodeRequest, + TransportNodesBatchListGatewayStartedShards.NodeGatewayStartedShardsBatch> + implements + AsyncShardsFetchPerNode.Lister< + TransportNodesBatchListGatewayStartedShards.NodesGatewayStartedShards, + TransportNodesBatchListGatewayStartedShards.NodeGatewayStartedShardsBatch> { + + public static final String ACTION_NAME = "internal:gateway/local/bulk_started_shards"; + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesGatewayStartedShards::new); + + private final Settings settings; + private final NodeEnvironment nodeEnv; + private final IndicesService indicesService; + private final NamedXContentRegistry namedXContentRegistry; + + @Inject + public TransportNodesBatchListGatewayStartedShards( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + NodeEnvironment env, + IndicesService indicesService, + NamedXContentRegistry namedXContentRegistry + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STARTED, + NodeGatewayStartedShardsBatch.class + ); + this.settings = settings; + this.nodeEnv = env; + this.indicesService = indicesService; + this.namedXContentRegistry = namedXContentRegistry; + } + + @Override + public void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener) { + execute(new Request(nodes, shardIdsWithCustomDataPath), listener); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeGatewayStartedShardsBatch newNodeResponse(StreamInput in) throws IOException { + return new NodeGatewayStartedShardsBatch(in); + } + + @Override + protected NodesGatewayStartedShards newResponse( + Request request, + List responses, + List failures + ) { + return new NodesGatewayStartedShards(clusterService.getClusterName(), responses, failures); + } + + /** + * This function is similar to nodeoperation method of {@link TransportNodesListGatewayStartedShards} we loop over + * the shards here to fetch the shard result in bulk. + * + * @param request + * @return NodeGatewayStartedShardsBatch + */ + @Override + protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { + Map shardsOnNode = new HashMap<>(); + for (Map.Entry shardToCustomDataPathEntry : request.shardIdsWithCustomDataPath.entrySet()) { + final ShardId shardId = shardToCustomDataPathEntry.getKey(); + try { + final NodeGatewayStartedShardInfo nodeGatewayStartedShardInfo = TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode( + logger, + shardId, + namedXContentRegistry, + nodeEnv, + indicesService, + shardToCustomDataPathEntry.getValue(), + settings, + clusterService + ); + shardsOnNode.put(shardId, nodeGatewayStartedShardInfo); + } catch (Exception e) { + Exception shardInfoFetchException = new OpenSearchException("failed to load started shards", e); + shardsOnNode.put(shardId, new NodeGatewayStartedShardInfo( + null, false, null, shardInfoFetchException + )); + } + } + return new NodeGatewayStartedShardsBatch(clusterService.localNode(), shardsOnNode); + } + + /** + * The nodes request. + * + * @opensearch.internal + */ + public static class Request extends BaseNodesRequest { + private final Map shardIdsWithCustomDataPath; + + public Request(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public Request(DiscoveryNode[] nodes, Map shardIdStringMap) { + super(nodes); + this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdStringMap); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + + public Map getShardIdsMap() { + return shardIdsWithCustomDataPath; + } + } + + /** + * The nodes response. + * + * @opensearch.internal + */ + public static class NodesGatewayStartedShards extends BaseNodesResponse { + + public NodesGatewayStartedShards(StreamInput in) throws IOException { + super(in); + } + + public NodesGatewayStartedShards( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeGatewayStartedShardsBatch::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + /** + * The request. + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + private final Map shardIdsWithCustomDataPath; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public NodeRequest(Request request) { + + this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsMap()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + + } + + public static class NodeGatewayStartedShardsBatch extends BaseNodeResponse { + private final Map nodeGatewayStartedShardsBatch; + + public Map getNodeGatewayStartedShardsBatch() { + return nodeGatewayStartedShardsBatch; + } + + + public NodeGatewayStartedShardsBatch(StreamInput in) throws IOException { + super(in); + this.nodeGatewayStartedShardsBatch = in.readMap(ShardId::new, NodeGatewayStartedShardInfo::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(nodeGatewayStartedShardsBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + + public NodeGatewayStartedShardsBatch(DiscoveryNode node, Map nodeGatewayStartedShardsBatch) { + super(node); + this.nodeGatewayStartedShardsBatch = nodeGatewayStartedShardsBatch; + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesBulkListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesBulkListGatewayStartedShards.java deleted file mode 100644 index 8403113dda8c1..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesBulkListGatewayStartedShards.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.gateway; - -import org.opensearch.OpenSearchException; -import org.opensearch.Version; -import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionType; -import org.opensearch.action.FailedNodeException; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.transport.TransportRequest; -import org.opensearch.action.support.nodes.BaseNodeResponse; -import org.opensearch.action.support.nodes.BaseNodesRequest; -import org.opensearch.action.support.nodes.BaseNodesResponse; -import org.opensearch.action.support.nodes.TransportNodesAction; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.settings.Settings; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.ShardId; -import org.opensearch.index.shard.ShardStateMetadata; -import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * This transport action is used to fetch all unassigned shard version from each node during primary allocation in {@link GatewayAllocator}. - * We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate - * shards after node or cluster restarts. - * - * @opensearch.internal - */ -public class TransportNodesBulkListGatewayStartedShards extends TransportNodesAction< - TransportNodesBulkListGatewayStartedShards.Request, - TransportNodesBulkListGatewayStartedShards.NodesGatewayStartedShards, - TransportNodesBulkListGatewayStartedShards.NodeRequest, - TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> - implements - AsyncShardsFetchPerNode.Lister< - TransportNodesBulkListGatewayStartedShards.NodesGatewayStartedShards, - TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> { - - public static final String ACTION_NAME = "internal:gateway/local/bulk_started_shards"; - public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesGatewayStartedShards::new); - - private final Settings settings; - private final NodeEnvironment nodeEnv; - private final IndicesService indicesService; - private final NamedXContentRegistry namedXContentRegistry; - - @Inject - public TransportNodesBulkListGatewayStartedShards( - Settings settings, - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - ActionFilters actionFilters, - NodeEnvironment env, - IndicesService indicesService, - NamedXContentRegistry namedXContentRegistry - ) { - super( - ACTION_NAME, - threadPool, - clusterService, - transportService, - actionFilters, - Request::new, - NodeRequest::new, - ThreadPool.Names.FETCH_SHARD_STARTED, - BulkOfNodeGatewayStartedShards.class - ); - this.settings = settings; - this.nodeEnv = env; - this.indicesService = indicesService; - this.namedXContentRegistry = namedXContentRegistry; - } - - @Override - public void list(DiscoveryNode[] nodes, Map shardsIdMap, ActionListener listener) { - execute(new Request(nodes, shardsIdMap), listener); - } - - @Override - protected NodeRequest newNodeRequest(Request request) { - return new NodeRequest(request); - } - - @Override - protected BulkOfNodeGatewayStartedShards newNodeResponse(StreamInput in) throws IOException { - return new BulkOfNodeGatewayStartedShards(in); - } - - @Override - protected NodesGatewayStartedShards newResponse( - Request request, - List responses, - List failures - ) { - return new NodesGatewayStartedShards(clusterService.getClusterName(), responses, failures); - } - - /** - * This function is similar to nodeoperation method of {@link TransportNodesListGatewayStartedShards} we loop over - * the shards here to fetch the shard result in bulk. - * - * @param request - * @return BulkOfNodeGatewayStartedShards - */ - @Override - protected BulkOfNodeGatewayStartedShards nodeOperation(NodeRequest request) { - Map shardsOnNode = new HashMap<>(); - for (Map.Entry shardToCustomDataPathEntry : request.shardIdsWithCustomDataPath.entrySet()) { - try { - final ShardId shardId = shardToCustomDataPathEntry.getKey(); - logger.trace("{} loading local shard state info", shardId); - ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( - logger, - namedXContentRegistry, - nodeEnv.availableShardPaths(shardId) - ); - if (shardStateMetadata != null) { - if (indicesService.getShardOrNull(shardId) == null) { - final String customDataPath = TransportNodesGatewayStartedShardHelper.getCustomDataPathForShard( - logger, - shardId, - shardToCustomDataPathEntry.getValue(), - settings, - clusterService - ); - // we don't have an open shard on the store, validate the files on disk are openable - Exception shardCorruptionException = TransportNodesGatewayStartedShardHelper.getShardCorruption( - logger, - nodeEnv, - shardId, - shardStateMetadata, - customDataPath - ); - if (shardCorruptionException != null) { - String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - shardsOnNode.put( - shardId, - new NodeGatewayStartedShards(allocationId, shardStateMetadata.primary, null, shardCorruptionException) - ); - continue; - } - } - logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); - String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - final IndexShard shard = indicesService.getShardOrNull(shardId); - shardsOnNode.put( - shardId, - new NodeGatewayStartedShards( - allocationId, - shardStateMetadata.primary, - shard != null ? shard.getLatestReplicationCheckpoint() : null - ) - ); - } else { - logger.trace("{} no local shard info found", shardId); - shardsOnNode.put(shardId, new NodeGatewayStartedShards(null, false, null)); - } - } catch (Exception e) { - throw new OpenSearchException("failed to load started shards", e); - } - } - return new BulkOfNodeGatewayStartedShards(clusterService.localNode(), shardsOnNode); - } - - /** - * The nodes request. - * - * @opensearch.internal - */ - public static class Request extends BaseNodesRequest { - private final Map shardIdsWithCustomDataPath; - - public Request(StreamInput in) throws IOException { - super(in); - shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); - } - - public Request(DiscoveryNode[] nodes, Map shardIdStringMap) { - super(nodes); - this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdStringMap); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); - } - - public Map getShardIdsMap() { - return shardIdsWithCustomDataPath; - } - } - - /** - * The nodes response. - * - * @opensearch.internal - */ - public static class NodesGatewayStartedShards extends BaseNodesResponse { - - public NodesGatewayStartedShards(StreamInput in) throws IOException { - super(in); - } - - public NodesGatewayStartedShards( - ClusterName clusterName, - List nodes, - List failures - ) { - super(clusterName, nodes, failures); - } - - @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readList(BulkOfNodeGatewayStartedShards::new); - } - - @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeList(nodes); - } - } - - /** - * The request. - * - * @opensearch.internal - */ - public static class NodeRequest extends TransportRequest { - - private final Map shardIdsWithCustomDataPath; - - public NodeRequest(StreamInput in) throws IOException { - super(in); - shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); - } - - public NodeRequest(Request request) { - - this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsMap()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); - } - - } - - /** - * The response as stored by TransportNodesListGatewayStartedShards(to maintain backward compatibility). - * - * @opensearch.internal - */ - public static class NodeGatewayStartedShards { - private final String allocationId; - private final boolean primary; - private final Exception storeException; - private final ReplicationCheckpoint replicationCheckpoint; - - public NodeGatewayStartedShards(StreamInput in) throws IOException { - allocationId = in.readOptionalString(); - primary = in.readBoolean(); - if (in.readBoolean()) { - storeException = in.readException(); - } else { - storeException = null; - } - if (in.getVersion().onOrAfter(Version.V_2_3_0) && in.readBoolean()) { - replicationCheckpoint = new ReplicationCheckpoint(in); - } else { - replicationCheckpoint = null; - } - } - - public NodeGatewayStartedShards(String allocationId, boolean primary, ReplicationCheckpoint replicationCheckpoint) { - this(allocationId, primary, replicationCheckpoint, null); - } - - public NodeGatewayStartedShards( - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - Exception storeException - ) { - this.allocationId = allocationId; - this.primary = primary; - this.replicationCheckpoint = replicationCheckpoint; - this.storeException = storeException; - } - - public String allocationId() { - return this.allocationId; - } - - public boolean primary() { - return this.primary; - } - - public ReplicationCheckpoint replicationCheckpoint() { - return this.replicationCheckpoint; - } - - public Exception storeException() { - return this.storeException; - } - - public void writeTo(StreamOutput out) throws IOException { - TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsWriteTo( - out, - allocationId, - primary, - storeException, - replicationCheckpoint - ); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - NodeGatewayStartedShards that = (NodeGatewayStartedShards) o; - - return primary == that.primary - && Objects.equals(allocationId, that.allocationId) - && Objects.equals(storeException, that.storeException) - && Objects.equals(replicationCheckpoint, that.replicationCheckpoint); - } - - @Override - public int hashCode() { - return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsHashCode( - allocationId, - primary, - storeException, - replicationCheckpoint - ); - } - - @Override - public String toString() { - return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsToString( - allocationId, - primary, - storeException, - replicationCheckpoint - ); - } - } - - public static class BulkOfNodeGatewayStartedShards extends BaseNodeResponse { - public Map getBulkOfNodeGatewayStartedShards() { - return bulkOfNodeGatewayStartedShards; - } - - private final Map bulkOfNodeGatewayStartedShards; - - public BulkOfNodeGatewayStartedShards(StreamInput in) throws IOException { - super(in); - this.bulkOfNodeGatewayStartedShards = in.readMap(ShardId::new, NodeGatewayStartedShards::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(bulkOfNodeGatewayStartedShards, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); - } - - public BulkOfNodeGatewayStartedShards(DiscoveryNode node, Map bulkOfNodeGatewayStartedShards) { - super(node); - this.bulkOfNodeGatewayStartedShards = bulkOfNodeGatewayStartedShards; - } - - } -} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java index f3fcaf29c00ad..b90a5cb4af699 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -14,176 +14,212 @@ import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.shard.ShardStateMetadata; import org.opensearch.index.store.Store; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; +import java.util.Objects; /** - * This class has the common code used in TransportNodesBulkListGatewayStartedShards and TransportNodesListGatewayStartedShards + * This class has the common code used in TransportNodesBatchListGatewayStartedShards and TransportNodesListGatewayStartedShards + * + * @opensearch.internal */ public class TransportNodesGatewayStartedShardHelper { /** - * Helper function for getting the data path of the shard that is used to look up information for this shard. - * If the dataPathInRequest passed to the method is not empty then same is returned. Else the custom data path is returned - * from the indexSettings fetched from the cluster state metadata for the specified shard. + * Class for storing the information about the shards fetched on the node. * - * @param logger - * @param shardId - * @param dataPathInRequest - * @param settings - * @param clusterService - * @return String + * @opensearch.internal */ - public static String getCustomDataPathForShard( - Logger logger, - ShardId shardId, - String dataPathInRequest, - Settings settings, - ClusterService clusterService - ) { - if (dataPathInRequest != null) return dataPathInRequest; - // TODO: Fallback for BWC with older OpenSearch versions. - // Remove once request.getCustomDataPath() always returns non-null - final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); - if (metadata != null) { - return new IndexSettings(metadata, settings).customDataPath(); - } else { - logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); - } - } + public static class NodeGatewayStartedShardInfo { + private final String allocationId; + private final boolean primary; + private final Exception storeException; + private final ReplicationCheckpoint replicationCheckpoint; - /** - * Helper function for checking if the shard file exists and is not corrupted. We return the specific exception if - * the shard file is corrupted. else null value is returned. - * - * @param logger - * @param nodeEnv - * @param shardId - * @param shardStateMetadata - * @param customDataPath - * @return Exception - */ - public static Exception getShardCorruption( - Logger logger, - NodeEnvironment nodeEnv, - ShardId shardId, - ShardStateMetadata shardStateMetadata, - String customDataPath - ) { - ShardPath shardPath = null; - try { - shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); - if (shardPath == null) { - throw new IllegalStateException(shardId + " no shard path found"); + public NodeGatewayStartedShardInfo(StreamInput in) throws IOException { + allocationId = in.readOptionalString(); + primary = in.readBoolean(); + if (in.readBoolean()) { + storeException = in.readException(); + } else { + storeException = null; + } + if (in.getVersion().onOrAfter(Version.V_2_3_0) && in.readBoolean()) { + replicationCheckpoint = new ReplicationCheckpoint(in); + } else { + replicationCheckpoint = null; } - Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); - } catch (Exception exception) { - final ShardPath finalShardPath = shardPath; - logger.trace( - () -> new ParameterizedMessage( - "{} can't open index for shard [{}] in path [{}]", - shardId, - shardStateMetadata, - (finalShardPath != null) ? finalShardPath.resolveIndex() : "" - ), - exception - ); - return exception; } - return null; - } - /** - * Helper function for getting the string representation of the NodeGatewayStartedShards object - * - * @param allocationId - * @param primary - * @param storeException - * @param replicationCheckpoint - * @return String - */ - public static String NodeGatewayStartedShardsToString( - String allocationId, - boolean primary, - Exception storeException, - ReplicationCheckpoint replicationCheckpoint - ) { - StringBuilder buf = new StringBuilder(); - buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary); - if (storeException != null) { - buf.append(",storeException=").append(storeException); + public NodeGatewayStartedShardInfo(String allocationId, boolean primary, ReplicationCheckpoint replicationCheckpoint) { + this(allocationId, primary, replicationCheckpoint, null); } - if (replicationCheckpoint != null) { - buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + + public NodeGatewayStartedShardInfo(String allocationId, boolean primary, ReplicationCheckpoint replicationCheckpoint, Exception storeException) { + this.allocationId = allocationId; + this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; + this.storeException = storeException; } - buf.append("]"); - return buf.toString(); - } - /** - * Helper function for computing the hashcode of the NodeGatewayStartedShards object - * - * @param allocationId - * @param primary - * @param storeException - * @param replicationCheckpoint - * @return int - */ - public static int NodeGatewayStartedShardsHashCode( - String allocationId, - boolean primary, - Exception storeException, - ReplicationCheckpoint replicationCheckpoint - ) { - int result = (allocationId != null ? allocationId.hashCode() : 0); - result = 31 * result + (primary ? 1 : 0); - result = 31 * result + (storeException != null ? storeException.hashCode() : 0); - result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0); - return result; - } + public String allocationId() { + return this.allocationId; + } - /** - * Helper function for NodeGatewayStartedShardsWriteTo method - * - * @param out - * @param allocationId - * @param primary - * @param storeException - * @param replicationCheckpoint - * @throws IOException - */ - public static void NodeGatewayStartedShardsWriteTo( - StreamOutput out, - String allocationId, - boolean primary, - Exception storeException, - ReplicationCheckpoint replicationCheckpoint - ) throws IOException { - out.writeOptionalString(allocationId); - out.writeBoolean(primary); - if (storeException != null) { - out.writeBoolean(true); - out.writeException(storeException); - } else { - out.writeBoolean(false); + public boolean primary() { + return this.primary; } - if (out.getVersion().onOrAfter(Version.V_2_3_0)) { - if (replicationCheckpoint != null) { + + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + + public Exception storeException() { + return this.storeException; + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(allocationId); + out.writeBoolean(primary); + if (storeException != null) { out.writeBoolean(true); - replicationCheckpoint.writeTo(out); + out.writeException(storeException); } else { out.writeBoolean(false); } + if (out.getVersion().onOrAfter(Version.V_2_3_0)) { + if (replicationCheckpoint != null) { + out.writeBoolean(true); + replicationCheckpoint.writeTo(out); + } else { + out.writeBoolean(false); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NodeGatewayStartedShardInfo that = (NodeGatewayStartedShardInfo) o; + + return primary == that.primary && Objects.equals(allocationId, that.allocationId) && Objects.equals(storeException, that.storeException) && Objects.equals(replicationCheckpoint, that.replicationCheckpoint); + } + + @Override + public int hashCode() { + int result = (allocationId != null ? allocationId.hashCode() : 0); + result = 31 * result + (primary ? 1 : 0); + result = 31 * result + (storeException != null ? storeException.hashCode() : 0); + result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0); + return result; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary); + if (storeException != null) { + buf.append(",storeException=").append(storeException); + } + if (replicationCheckpoint != null) { + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + } + buf.append("]"); + return buf.toString(); + } + } + + public static NodeGatewayStartedShardInfo getShardInfoOnLocalNode( + Logger logger, + final ShardId shardId, + NamedXContentRegistry namedXContentRegistry, + NodeEnvironment nodeEnv, + IndicesService indicesService, + String shardDataPathInRequest, + Settings settings, + ClusterService clusterService + ) throws IOException { + logger.trace("{} loading local shard state info", shardId); + ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( + logger, + namedXContentRegistry, + nodeEnv.availableShardPaths(shardId) + ); + if (shardStateMetadata != null) { + if (indicesService.getShardOrNull(shardId) == null + && shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) { + final String customDataPath; + if (shardDataPathInRequest != null) { + customDataPath = shardDataPathInRequest; + } else { + // TODO: Fallback for BWC with older OpenSearch versions. + // Remove once request.getCustomDataPath() always returns non-null + final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + // we don't have an open shard on the store, validate the files on disk are openable + ShardPath shardPath = null; + try { + shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + throw new IllegalStateException(shardId + " no shard path found"); + } + Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); + } catch (Exception exception) { + final ShardPath finalShardPath = shardPath; + logger.trace( + () -> new ParameterizedMessage( + "{} can't open index for shard [{}] in path [{}]", + shardId, + shardStateMetadata, + (finalShardPath != null) ? finalShardPath.resolveIndex() : "" + ), + exception + ); + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + return new NodeGatewayStartedShardInfo( + allocationId, + shardStateMetadata.primary, + null, + exception + ); + } + } + + logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + final IndexShard shard = indicesService.getShardOrNull(shardId); + return new NodeGatewayStartedShardInfo( + allocationId, + shardStateMetadata.primary, + shard != null ? shard.getLatestReplicationCheckpoint() : null + ); } + logger.trace("{} no local shard info found", shardId); + return new NodeGatewayStartedShardInfo(null, false, null); } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 90fda40da2349..f9d4ec24c40cc 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -33,7 +33,6 @@ package org.opensearch.gateway; import org.opensearch.OpenSearchException; -import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionType; import org.opensearch.action.FailedNodeException; @@ -52,9 +51,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.shard.IndexShard; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardInfo; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.shard.ShardStateMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; @@ -146,57 +144,26 @@ protected NodesGatewayStartedShards newResponse( protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { final ShardId shardId = request.getShardId(); - logger.trace("{} loading local shard state info", shardId); - ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( + final NodeGatewayStartedShardInfo nodeGatewayStartedShardInfo = TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode( logger, + shardId, namedXContentRegistry, - nodeEnv.availableShardPaths(request.shardId) + nodeEnv, + indicesService, + request.getCustomDataPath(), + settings, + clusterService + ); + return new NodeGatewayStartedShards( + clusterService.localNode(), + nodeGatewayStartedShardInfo ); - if (shardStateMetadata != null) { - if (indicesService.getShardOrNull(shardId) == null) { - final String customDataPath = TransportNodesGatewayStartedShardHelper.getCustomDataPathForShard( - logger, - shardId, - request.getCustomDataPath(), - settings, - clusterService - ); - // we don't have an open shard on the store, validate the files on disk are openable - Exception exception = TransportNodesGatewayStartedShardHelper.getShardCorruption( - logger, - nodeEnv, - shardId, - shardStateMetadata, - customDataPath - ); - if (exception != null) { - String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - return new NodeGatewayStartedShards( - clusterService.localNode(), - allocationId, - shardStateMetadata.primary, - null, - exception - ); - } - } - logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); - String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - final IndexShard shard = indicesService.getShardOrNull(shardId); - return new NodeGatewayStartedShards( - clusterService.localNode(), - allocationId, - shardStateMetadata.primary, - shard != null ? shard.getLatestReplicationCheckpoint() : null - ); - } - logger.trace("{} no local shard info found", shardId); - return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null); } catch (Exception e) { throw new OpenSearchException("failed to load started shards", e); } } + /** * The nodes request. * @@ -323,27 +290,17 @@ public String getCustomDataPath() { * @opensearch.internal */ public static class NodeGatewayStartedShards extends BaseNodeResponse { - private final String allocationId; - private final boolean primary; - private final Exception storeException; - private final ReplicationCheckpoint replicationCheckpoint; + private final NodeGatewayStartedShardInfo nodeGatewayStartedShardInfo; public NodeGatewayStartedShards(StreamInput in) throws IOException { super(in); - allocationId = in.readOptionalString(); - primary = in.readBoolean(); - if (in.readBoolean()) { - storeException = in.readException(); - } else { - storeException = null; - } - if (in.getVersion().onOrAfter(Version.V_2_3_0) && in.readBoolean()) { - replicationCheckpoint = new ReplicationCheckpoint(in); - } else { - replicationCheckpoint = null; - } + nodeGatewayStartedShardInfo = new NodeGatewayStartedShardInfo(in); } + public NodeGatewayStartedShards(DiscoveryNode localNode, NodeGatewayStartedShardInfo nodeGatewayStartedShardInfo){ + super(localNode); + this.nodeGatewayStartedShardInfo = nodeGatewayStartedShardInfo; + } public NodeGatewayStartedShards( DiscoveryNode node, String allocationId, @@ -361,75 +318,45 @@ public NodeGatewayStartedShards( Exception storeException ) { super(node); - this.allocationId = allocationId; - this.primary = primary; - this.replicationCheckpoint = replicationCheckpoint; - this.storeException = storeException; + nodeGatewayStartedShardInfo = new NodeGatewayStartedShardInfo(allocationId, primary, replicationCheckpoint, storeException); + } public String allocationId() { - return this.allocationId; + return this.nodeGatewayStartedShardInfo.allocationId(); } public boolean primary() { - return this.primary; + return this.nodeGatewayStartedShardInfo.primary(); } public ReplicationCheckpoint replicationCheckpoint() { - return this.replicationCheckpoint; + return this.nodeGatewayStartedShardInfo.replicationCheckpoint(); } public Exception storeException() { - return this.storeException; + return this.nodeGatewayStartedShardInfo.storeException(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsWriteTo( - out, - allocationId, - primary, - storeException, - replicationCheckpoint - ); + this.nodeGatewayStartedShardInfo.writeTo(out); } @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - NodeGatewayStartedShards that = (NodeGatewayStartedShards) o; - - return primary == that.primary - && Objects.equals(allocationId, that.allocationId) - && Objects.equals(storeException, that.storeException) - && Objects.equals(replicationCheckpoint, that.replicationCheckpoint); + return this.nodeGatewayStartedShardInfo.equals(o); } @Override public int hashCode() { - return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsHashCode( - allocationId, - primary, - storeException, - replicationCheckpoint - ); + return this.nodeGatewayStartedShardInfo.hashCode(); } @Override public String toString() { - return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsToString( - allocationId, - primary, - storeException, - replicationCheckpoint - ); + return this.nodeGatewayStartedShardInfo.toString(); } } }