Skip to content

Commit

Permalink
Modify the psa transport class to reduce code duplication
Browse files Browse the repository at this point in the history
Signed-off-by: sudarshan baliga <[email protected]>
  • Loading branch information
sudarshan-baliga committed Jun 29, 2023
1 parent 0bfe583 commit ea48cb3
Show file tree
Hide file tree
Showing 6 changed files with 493 additions and 672 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604))
- Add PSA transport action for bulk async fetch of shards ([#5098](https://github.com/opensearch-project/OpenSearch/issues/5098))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -45,7 +44,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Pass localNode info to all plugins on node start ([#7919](https://github.com/opensearch-project/OpenSearch/pull/7919))
- Modified the existing async shard fetch transport action to use the helper functions added for bulk fetching ([#5098](https://github.com/opensearch-project/OpenSearch/issues/5098))
### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
* This class is responsible for fetching shard data from nodes. It is analogous to AsyncShardFetch class since it fetches
* the data in asynchronous manner too.
* @param <T>
*
* @opensearch.internal
*/
public abstract class AsyncShardsFetchPerNode<T extends BaseNodeResponse> implements Releasable {
/**
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(DiscoveryNode[] nodes, Map<ShardId, String> shardsIdMap, ActionListener<NodesResponse> listener);
void list(DiscoveryNode[] nodes, Map<ShardId, String> shardIdsWithCustomDataPath, ActionListener<NodesResponse> listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.gateway;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionType;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardInfo;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* This transport action is used to fetch all unassigned shard version from each node during primary allocation in {@link GatewayAllocator}.
* We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate
* shards after node or cluster restarts.
*
* @opensearch.internal
*/
public class TransportNodesBatchListGatewayStartedShards extends TransportNodesAction<
TransportNodesBatchListGatewayStartedShards.Request,
TransportNodesBatchListGatewayStartedShards.NodesGatewayStartedShards,
TransportNodesBatchListGatewayStartedShards.NodeRequest,
TransportNodesBatchListGatewayStartedShards.NodeGatewayStartedShardsBatch>
implements
AsyncShardsFetchPerNode.Lister<
TransportNodesBatchListGatewayStartedShards.NodesGatewayStartedShards,
TransportNodesBatchListGatewayStartedShards.NodeGatewayStartedShardsBatch> {

public static final String ACTION_NAME = "internal:gateway/local/bulk_started_shards";
public static final ActionType<NodesGatewayStartedShards> TYPE = new ActionType<>(ACTION_NAME, NodesGatewayStartedShards::new);

private final Settings settings;
private final NodeEnvironment nodeEnv;
private final IndicesService indicesService;
private final NamedXContentRegistry namedXContentRegistry;

@Inject
public TransportNodesBatchListGatewayStartedShards(
Settings settings,
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
NodeEnvironment env,
IndicesService indicesService,
NamedXContentRegistry namedXContentRegistry
) {
super(
ACTION_NAME,
threadPool,
clusterService,
transportService,
actionFilters,
Request::new,
NodeRequest::new,
ThreadPool.Names.FETCH_SHARD_STARTED,
NodeGatewayStartedShardsBatch.class
);
this.settings = settings;
this.nodeEnv = env;
this.indicesService = indicesService;
this.namedXContentRegistry = namedXContentRegistry;
}

@Override
public void list(DiscoveryNode[] nodes, Map<ShardId, String> shardIdsWithCustomDataPath, ActionListener<NodesGatewayStartedShards> listener) {
execute(new Request(nodes, shardIdsWithCustomDataPath), listener);
}

@Override
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest(request);
}

@Override
protected NodeGatewayStartedShardsBatch newNodeResponse(StreamInput in) throws IOException {
return new NodeGatewayStartedShardsBatch(in);
}

@Override
protected NodesGatewayStartedShards newResponse(
Request request,
List<NodeGatewayStartedShardsBatch> responses,
List<FailedNodeException> failures
) {
return new NodesGatewayStartedShards(clusterService.getClusterName(), responses, failures);
}

/**
* This function is similar to nodeoperation method of {@link TransportNodesListGatewayStartedShards} we loop over
* the shards here to fetch the shard result in bulk.
*
* @param request
* @return NodeGatewayStartedShardsBatch
*/
@Override
protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) {
Map<ShardId, NodeGatewayStartedShardInfo> shardsOnNode = new HashMap<>();
for (Map.Entry<ShardId, String> shardToCustomDataPathEntry : request.shardIdsWithCustomDataPath.entrySet()) {
final ShardId shardId = shardToCustomDataPathEntry.getKey();
try {
final NodeGatewayStartedShardInfo nodeGatewayStartedShardInfo = TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode(
logger,
shardId,
namedXContentRegistry,
nodeEnv,
indicesService,
shardToCustomDataPathEntry.getValue(),
settings,
clusterService
);
shardsOnNode.put(shardId, nodeGatewayStartedShardInfo);
} catch (Exception e) {
Exception shardInfoFetchException = new OpenSearchException("failed to load started shards", e);
shardsOnNode.put(shardId, new NodeGatewayStartedShardInfo(
null, false, null, shardInfoFetchException
));
}
}
return new NodeGatewayStartedShardsBatch(clusterService.localNode(), shardsOnNode);
}

/**
* The nodes request.
*
* @opensearch.internal
*/
public static class Request extends BaseNodesRequest<Request> {
private final Map<ShardId, String> shardIdsWithCustomDataPath;

public Request(StreamInput in) throws IOException {
super(in);
shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString);
}

public Request(DiscoveryNode[] nodes, Map<ShardId, String> shardIdStringMap) {
super(nodes);
this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdStringMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString);
}

public Map<ShardId, String> getShardIdsMap() {
return shardIdsWithCustomDataPath;
}
}

/**
* The nodes response.
*
* @opensearch.internal
*/
public static class NodesGatewayStartedShards extends BaseNodesResponse<NodeGatewayStartedShardsBatch> {

public NodesGatewayStartedShards(StreamInput in) throws IOException {
super(in);
}

public NodesGatewayStartedShards(
ClusterName clusterName,
List<NodeGatewayStartedShardsBatch> nodes,
List<FailedNodeException> failures
) {
super(clusterName, nodes, failures);
}

@Override
protected List<NodeGatewayStartedShardsBatch> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeGatewayStartedShardsBatch::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeGatewayStartedShardsBatch> nodes) throws IOException {
out.writeList(nodes);
}
}

/**
* The request.
*
* @opensearch.internal
*/
public static class NodeRequest extends TransportRequest {

private final Map<ShardId, String> shardIdsWithCustomDataPath;

public NodeRequest(StreamInput in) throws IOException {
super(in);
shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString);
}

public NodeRequest(Request request) {

this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsMap());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString);
}

}

public static class NodeGatewayStartedShardsBatch extends BaseNodeResponse {
private final Map<ShardId, NodeGatewayStartedShardInfo> nodeGatewayStartedShardsBatch;

public Map<ShardId, NodeGatewayStartedShardInfo> getNodeGatewayStartedShardsBatch() {
return nodeGatewayStartedShardsBatch;
}


public NodeGatewayStartedShardsBatch(StreamInput in) throws IOException {
super(in);
this.nodeGatewayStartedShardsBatch = in.readMap(ShardId::new, NodeGatewayStartedShardInfo::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(nodeGatewayStartedShardsBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o));
}

public NodeGatewayStartedShardsBatch(DiscoveryNode node, Map<ShardId, NodeGatewayStartedShardInfo> nodeGatewayStartedShardsBatch) {
super(node);
this.nodeGatewayStartedShardsBatch = nodeGatewayStartedShardsBatch;
}
}
}
Loading

0 comments on commit ea48cb3

Please sign in to comment.