Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ShardBatchCache to support caching for TransportNodesListGatewayStartedShardsBatch #12504

Merged
merged 37 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e87bfdd
Add PrimaryShardBatchAllocator to take allocation decisions for a bat…
Jul 27, 2023
6c38303
Add unit tests and refactor PrimaryShardBatchAllocator
Sep 21, 2023
1a68940
Throw exception for single shard calls
Dec 4, 2023
dc7eb43
Modify according to transport PRs.
Dec 13, 2023
e0c4943
Use List instead of Set
Feb 27, 2024
4468818
Add ShardBatchCache to handle the responses of batch trasport calls
Feb 29, 2024
3bd85a4
Handle shard failures by triggereing a reroute
Mar 1, 2024
bb7d560
Send unhandled exception as it is and send null for known exceptions
Mar 1, 2024
dab9112
Move common functionalities to BaseShardResponse
Mar 1, 2024
d8c5a8d
Add unit test for ShardBatchCache implementation
Mar 5, 2024
483d75f
Move ShardBatchCache as inner class of AsyncShardBatchFetch
Mar 11, 2024
afffde6
Merge remote-tracking branch 'origin/main' into shard-batch-cache
Mar 11, 2024
2902967
Use failed shards data only when all nodes fetching is done
Mar 11, 2024
d26f8b2
Modify description to use existing Transport
Mar 12, 2024
e1217c6
Fix DCO
Mar 14, 2024
fe1ed1e
Merge remote-tracking branch 'origin/main' into shard-batch-cache
Mar 14, 2024
986cea9
Merge remote-tracking branch 'origin/main' into async-shard-fetch-bat…
Mar 14, 2024
aa3f82d
Move GatewayShardStarted in helper class to avoid dependency on Trans…
Mar 15, 2024
b227828
Merge remote-tracking branch 'origin/main' into async-shard-fetch-bat…
Mar 15, 2024
e0cff1a
spotless apply
Mar 15, 2024
6112f4b
Add java doc on new class
Mar 15, 2024
e798d7a
Remove code duplication
Mar 15, 2024
f702715
Merge remote-tracking branch 'origin/main' into async-shard-fetch-bat…
Mar 16, 2024
d372c54
rename class to older style
Mar 18, 2024
557e0f8
Merge remote-tracking branch 'origin/main' into shard-batch-cache
Mar 18, 2024
20f6a27
Merge remote-tracking branch 'origin/async-shard-fetch-batching' into…
Mar 18, 2024
4c75c49
Move BaseShardResponse methods to respective transport classes
Mar 18, 2024
a893cb8
Remove PSBA and test files, modify exception handling
Mar 19, 2024
3becfc2
Merge remote-tracking branch 'origin/main' into shard-batch-cache
Mar 19, 2024
e07ddc8
spotless apply
Mar 19, 2024
f8093b1
java doc
Mar 19, 2024
ba6cbb4
Use single exception parameter, modify exception handling in batch mode
Apr 2, 2024
4246256
Add factory class to create new response objects
Apr 4, 2024
c0ed643
Merge remote-tracking branch 'origin/main' into shard-batch-cache
Apr 4, 2024
d61477f
Save non empty response in else condition
Apr 8, 2024
65229fc
Cleanup java doc and review comments changes
Apr 9, 2024
08763ed
Mark clearShard method as synchronized
Apr 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static Map<ShardId, ShardAttributes> prepareRequestMap(String[] indices,
);
for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) {
final ShardId shardId = new ShardId(index, shardIdNum);
shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
shardIdShardAttributesMap.put(shardId, new ShardAttributes(customDataPath));
}
}
return shardIdShardAttributesMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;

import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

/**
* Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch
* part using the base class {@link AsyncShardFetch}. Other functionalities needed for a batch are only written here.
* Cleanup of failed shards is necessary in a batch and based on that a reroute should be triggered to take care of
* those in the next run. This separation also takes care of the extra generic type V which is only needed for batch
amkhar marked this conversation as resolved.
Show resolved Hide resolved
* transport actions like {@link TransportNodesListGatewayStartedShardsBatch}.
*
* @param <T> Response type of the transport action.
* @param <V> Data type of shard level response.
*
* @opensearch.internal
*/
public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extends AsyncShardFetch<T> {

@SuppressWarnings("unchecked")
AsyncShardBatchFetch(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
AsyncShardFetch.Lister<? extends BaseNodesResponse<T>, T> action,
String batchId,
Class<V> clazz,
V emptyResponse,
Function<V, Boolean> isEmptyResponse,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(
logger,
type,
shardAttributesMap,
action,
batchId,
new ShardBatchCache<>(
logger,
type,
shardAttributesMap,
"BatchID=[" + batchId + "]",
clazz,
emptyResponse,
amkhar marked this conversation as resolved.
Show resolved Hide resolved
isEmptyResponse,
amkhar marked this conversation as resolved.
Show resolved Hide resolved
responseFactory
)
);
}

/**
* Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's
* assigned or failed.
*
* @param shardId shardId to be removed from the batch.
*/
public void clearShard(ShardId shardId) {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
this.shardAttributesMap.remove(shardId);
this.cache.deleteShard(shardId);
}

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
* {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching
* approach.
*
* @param <T> Response type of transport action.
* @param <V> Data type of shard level response.
*/
static class ShardBatchCache<T extends BaseNodeResponse, V> extends AsyncShardFetchCache<T> {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, NodeEntry<V>> cache;
private final Map<ShardId, Integer> shardIdToArray;
private final int batchSize;
private final Class<V> shardResponseClass;
private final ShardBatchResponseFactory<T, V> responseFactory;
private final V emptyResponse;
private final Function<V, Boolean> isEmpty;
private final Logger logger;

public ShardBatchCache(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
String logKey,
Class<V> clazz,
V emptyResponse,
Function<V, Boolean> isEmptyResponse,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(Loggers.getLogger(logger, "_" + logKey), type);
this.batchSize = shardAttributesMap.size();
this.isEmpty = isEmptyResponse;
cache = new HashMap<>();
shardIdToArray = new HashMap<>();
fillShardIdKeys(shardAttributesMap.keySet());
this.shardResponseClass = clazz;
this.emptyResponse = emptyResponse;
this.logger = logger;
this.responseFactory = responseFactory;
}

@Override
public Map<String, ? extends BaseNodeEntry> getCache() {
return cache;
}

@Override
public void deleteShard(ShardId shardId) {
if (shardIdToArray.containsKey(shardId)) {
Integer shardIdIndex = shardIdToArray.remove(shardId);
for (String nodeId : cache.keySet()) {
cache.get(nodeId).clearShard(shardIdIndex);
}
}
}

@Override
public void initData(DiscoveryNode node) {
cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, isEmpty));
}

/**
* Put the response received from data nodes into the cache.
* Get shard level data from batch, then filter out if any shards received failures.
* After that complete storing the data at node level and mark fetching as done.
*
* @param node node from which we got the response.
* @param response shard metadata coming from node.
*/
@Override
public void putData(DiscoveryNode node, T response) {
NodeEntry<V> nodeEntry = cache.get(node.getId());
Map<ShardId, V> batchResponse = responseFactory.getShardBatchData(response);
nodeEntry.doneFetching(batchResponse, shardIdToArray);
}

@Override
public T getData(DiscoveryNode node) {
return this.responseFactory.getNewResponse(node, getBatchData(cache.get(node.getId())));
}

private HashMap<ShardId, V> getBatchData(NodeEntry<V> nodeEntry) {
V[] nodeShardEntries = nodeEntry.getData();
boolean[] emptyResponses = nodeEntry.getEmptyShardResponse();
HashMap<ShardId, V> shardData = new HashMap<>();
for (Map.Entry<ShardId, Integer> shardIdIndex : shardIdToArray.entrySet()) {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
ShardId shardId = shardIdIndex.getKey();
Integer arrIndex = shardIdIndex.getValue();
if (emptyResponses[arrIndex]) {
shardData.put(shardId, emptyResponse);
} else if (nodeShardEntries[arrIndex] != null) {
// ignore null responses here
shardData.put(shardId, nodeShardEntries[arrIndex]);
}
}
return shardData;
}

private void fillShardIdKeys(Set<ShardId> shardIds) {
int shardIdIndex = 0;
for (ShardId shardId : shardIds) {
this.shardIdToArray.putIfAbsent(shardId, shardIdIndex++);
}
}

/**
* A node entry, holding the state of the fetched data for a specific shard
* for a giving node.
*/
static class NodeEntry<V> extends BaseNodeEntry {
private final V[] shardData;
private final boolean[] emptyShardResponse; // we can not rely on null entries of the shardData array,
// those null entries means that we need to ignore those entries. Empty responses on the other hand are
// actually needed in allocation/explain API response. So instead of storing full empty response object
// in cache, it's better to just store a boolean and create that object on the fly just before
// decision-making.
private final Function<V, Boolean> isEmpty;

NodeEntry(String nodeId, Class<V> clazz, int batchSize, Function<V, Boolean> isEmptyResponse) {
super(nodeId);
this.shardData = (V[]) Array.newInstance(clazz, batchSize);
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
this.emptyShardResponse = new boolean[batchSize];
this.isEmpty = isEmptyResponse;
}

void doneFetching(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
fillShardData(shardDataFromNode, shardIdKey);
super.doneFetching();
}

void clearShard(Integer shardIdIndex) {
this.shardData[shardIdIndex] = null;
emptyShardResponse[shardIdIndex] = false;
}

V[] getData() {
return this.shardData;
}

boolean[] getEmptyShardResponse() {
return emptyShardResponse;
}

private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
for (Map.Entry<ShardId, V> shardData : shardDataFromNode.entrySet()) {
if (shardData.getValue() != null) {
ShardId shardId = shardData.getKey();
if (isEmpty.apply(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, N
protected final String type;
protected final Map<ShardId, ShardAttributes> shardAttributesMap;
private final Lister<BaseNodesResponse<T>, T> action;
private final AsyncShardFetchCache<T> cache;
protected final AsyncShardFetchCache<T> cache;
private final AtomicLong round = new AtomicLong();
private boolean closed;
private final String reroutingKey;
final String reroutingKey;
private final Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

@SuppressWarnings("unchecked")
Expand All @@ -99,7 +99,7 @@ protected AsyncShardFetch(
this.logger = logger;
this.type = type;
shardAttributesMap = new HashMap<>();
shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
shardAttributesMap.put(shardId, new ShardAttributes(customDataPath));
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.reroutingKey = "ShardId=[" + shardId.toString() + "]";
cache = new ShardCache<>(logger, reroutingKey, type);
Expand All @@ -120,14 +120,15 @@ protected AsyncShardFetch(
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
Lister<? extends BaseNodesResponse<T>, T> action,
String batchId
String batchId,
AsyncShardFetchCache<T> cache
) {
this.logger = logger;
this.type = type;
this.shardAttributesMap = shardAttributesMap;
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.reroutingKey = "BatchID=[" + batchId + "]";
cache = new ShardCache<>(logger, reroutingKey, type);
this.cache = cache;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* @opensearch.internal
*/
public abstract class AsyncShardFetchCache<K extends BaseNodeResponse> {

private final Logger logger;
private final String type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;

Expand Down Expand Up @@ -132,9 +133,7 @@ private static List<NodeGatewayStartedShard> adaptToNodeShardStates(

// build data for a shard from all the nodes
nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> {
TransportNodesGatewayStartedShardHelper.GatewayStartedShard shardData = nodeGatewayStartedShardsBatch
.getNodeGatewayStartedShardsBatch()
.get(unassignedShard.shardId());
GatewayStartedShard shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId());
nodeShardStates.add(
new NodeGatewayStartedShard(
shardData.allocationId(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;

import java.util.Map;

/**
* A factory class to create new responses of batch transport actions like
* {@link TransportNodesListGatewayStartedShardsBatch} or {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}
*
* @param <T> Node level response returned by batch transport actions.
* @param <V> Shard level metadata returned by batch transport actions.
*/
public class ShardBatchResponseFactory<T extends BaseNodeResponse, V> {
private final boolean primary;

public ShardBatchResponseFactory(boolean primary) {
this.primary = primary;
}

public T getNewResponse(DiscoveryNode node, Map<ShardId, V> shardData) {
if (primary) {
return (T) new NodeGatewayStartedShardsBatch(node, (Map<ShardId, GatewayStartedShard>) shardData);
} else {
return (T) new NodeStoreFilesMetadataBatch(node, (Map<ShardId, NodeStoreFilesMetadata>) shardData);
}
}

public Map<ShardId, V> getShardBatchData(T response) {
if (primary) {
return (Map<ShardId, V>) ((NodeGatewayStartedShardsBatch) response).getNodeGatewayStartedShardsBatch();
} else {
return (Map<ShardId, V>) ((NodeStoreFilesMetadataBatch) response).getNodeStoreFilesMetadataBatch();
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
* @opensearch.internal
*/
public class TransportNodesGatewayStartedShardHelper {
public static final String INDEX_NOT_FOUND = "node doesn't have meta data for index";

public static GatewayStartedShard getShardInfoOnLocalNode(
Logger logger,
final ShardId shardId,
Expand Down Expand Up @@ -72,7 +74,7 @@ public static GatewayStartedShard getShardInfoOnLocalNode(
customDataPath = new IndexSettings(metadata, settings).customDataPath();
} else {
logger.trace("{} node doesn't have meta data for the requests index", shardId);
throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex());
throw new OpenSearchException(INDEX_NOT_FOUND + " " + shardId.getIndex());
}
}
// we don't have an open shard on the store, validate the files on disk are openable
Expand Down Expand Up @@ -230,6 +232,13 @@ public String toString() {
buf.append("]");
return buf.toString();
}

public static Boolean isEmpty(GatewayStartedShard gatewayStartedShard) {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
return gatewayStartedShard.allocationId() == null
&& gatewayStartedShard.primary() == false
&& gatewayStartedShard.storeException() == null
&& gatewayStartedShard.replicationCheckpoint() == null;
}
}

/**
Expand Down
Loading
Loading