diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
index 50774f7e0cb1c..3d129d4794a10 100644
--- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
+++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
@@ -32,9 +32,6 @@
package org.opensearch.gateway;
import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.opensearch.ExceptionsHelper;
-import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
@@ -43,21 +40,22 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
+import org.opensearch.common.logging.Loggers;
import org.opensearch.core.action.ActionListener;
-import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;
-import org.opensearch.transport.ReceiveTimeoutTransportException;
+import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import reactor.util.annotation.NonNull;
+
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
@@ -65,11 +63,9 @@
* Allows to asynchronously fetch shard related data from other nodes for allocation, without blocking
* the cluster update thread.
*
- * The async fetch logic maintains a map of which nodes are being fetched from in an async manner,
- * and once the results are back, it makes sure to schedule a reroute to make sure those results will
- * be taken into account.
+ * The async fetch logic maintains a cache {@link AsyncShardFetchCache} which is filled in async manner when nodes respond back.
+ * It also schedules a reroute to make sure those results will be taken into account.
*
- * It comes in two modes, to single fetch a shard or fetch a batch of shards.
* @opensearch.internal
*/
public abstract class AsyncShardFetch implements Releasable {
@@ -86,14 +82,12 @@ public interface Lister, N
protected final String type;
protected final Map shardAttributesMap;
private final Lister, T> action;
- private final Map> cache = new HashMap<>();
+ private final AsyncShardFetchCache cache;
private final AtomicLong round = new AtomicLong();
private boolean closed;
private final String reroutingKey;
private final Map> shardToIgnoreNodes = new HashMap<>();
- private final boolean enableBatchMode;
-
@SuppressWarnings("unchecked")
protected AsyncShardFetch(
Logger logger,
@@ -108,17 +102,17 @@ protected AsyncShardFetch(
shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
this.action = (Lister, T>) action;
this.reroutingKey = "ShardId=[" + shardId.toString() + "]";
- enableBatchMode = false;
+ cache = new ShardCache<>(logger, reroutingKey, type);
}
/**
* Added to fetch a batch of shards from nodes
*
- * @param logger Logger
- * @param type type of action
+ * @param logger Logger
+ * @param type type of action
* @param shardAttributesMap Map of {@link ShardId} to {@link ShardAttributes} to perform fetching on them a
- * @param action Transport Action
- * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification
+ * @param action Transport Action
+ * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification
*/
@SuppressWarnings("unchecked")
protected AsyncShardFetch(
@@ -133,7 +127,7 @@ protected AsyncShardFetch(
this.shardAttributesMap = shardAttributesMap;
this.action = (Lister, T>) action;
this.reroutingKey = "BatchID=[" + batchId + "]";
- enableBatchMode = true;
+ cache = new ShardCache<>(logger, reroutingKey, type);
}
@Override
@@ -141,19 +135,6 @@ public synchronized void close() {
this.closed = true;
}
- /**
- * Returns the number of async fetches that are currently ongoing.
- */
- public synchronized int getNumberOfInFlightFetches() {
- int count = 0;
- for (NodeEntry nodeEntry : cache.values()) {
- if (nodeEntry.isFetching()) {
- count++;
- }
- }
- return count;
- }
-
/**
* Fetches the data for the relevant shard. If there any ongoing async fetches going on, or new ones have
* been initiated by this call, the result will have no data.
@@ -166,7 +147,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map 1) {
throw new IllegalStateException(
@@ -187,48 +168,24 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> nodesToFetch = findNodesToFetch(cache);
- if (nodesToFetch.isEmpty() == false) {
+ cache.fillShardCacheWithDataNodes(nodes);
+ List nodeIds = cache.findNodesToFetch();
+ if (nodeIds.isEmpty() == false) {
// mark all node as fetching and go ahead and async fetch them
// use a unique round id to detect stale responses in processAsyncFetch
final long fetchingRound = round.incrementAndGet();
- for (NodeEntry nodeEntry : nodesToFetch) {
- nodeEntry.markAsFetching(fetchingRound);
- }
- DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream()
- .map(NodeEntry::getNodeId)
- .map(nodes::get)
- .toArray(DiscoveryNode[]::new);
+ cache.markAsFetching(nodeIds, fetchingRound);
+ DiscoveryNode[] discoNodesToFetch = nodeIds.stream().map(nodes::get).toArray(DiscoveryNode[]::new);
asyncFetch(discoNodesToFetch, fetchingRound);
}
// if we are still fetching, return null to indicate it
- if (hasAnyNodeFetching(cache)) {
+ if (cache.hasAnyNodeFetching()) {
return new FetchResult<>(null, emptyMap());
} else {
// nothing to fetch, yay, build the return value
- Map fetchData = new HashMap<>();
Set failedNodes = new HashSet<>();
- for (Iterator>> it = cache.entrySet().iterator(); it.hasNext();) {
- Map.Entry> entry = it.next();
- String nodeId = entry.getKey();
- NodeEntry nodeEntry = entry.getValue();
-
- DiscoveryNode node = nodes.get(nodeId);
- if (node != null) {
- if (nodeEntry.isFailed()) {
- // if its failed, remove it from the list of nodes, so if this run doesn't work
- // we try again next round to fetch it again
- it.remove();
- failedNodes.add(nodeEntry.getNodeId());
- } else {
- if (nodeEntry.getValue() != null) {
- fetchData.put(node, nodeEntry.getValue());
- }
- }
- }
- }
+ Map fetchData = cache.getCacheData(nodes, failedNodes);
Map> allIgnoreNodesMap = unmodifiableMap(new HashMap<>(shardToIgnoreNodes));
// clear the nodes to ignore, we had a successful run in fetching everything we can
@@ -268,77 +225,18 @@ protected synchronized void processAsyncFetch(List responses, List nodeEntry = cache.get(response.getNode().getId());
- if (nodeEntry != null) {
- if (nodeEntry.getFetchingRound() != fetchingRound) {
- assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
- logger.trace(
- "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})",
- reroutingKey,
- nodeEntry.getNodeId(),
- type,
- nodeEntry.getFetchingRound(),
- fetchingRound
- );
- } else if (nodeEntry.isFailed()) {
- logger.trace(
- "{} node {} has failed for [{}] (failure [{}])",
- reroutingKey,
- nodeEntry.getNodeId(),
- type,
- nodeEntry.getFailure()
- );
- } else {
- // if the entry is there, for the right fetching round and not marked as failed already, process it
- logger.trace("{} marking {} as done for [{}], result is [{}]", reroutingKey, nodeEntry.getNodeId(), type, response);
- nodeEntry.doneFetching(response);
- }
- }
- }
+ cache.processResponses(responses, fetchingRound);
}
if (failures != null) {
- for (FailedNodeException failure : failures) {
- logger.trace("{} processing failure {} for [{}]", reroutingKey, failure, type);
- NodeEntry nodeEntry = cache.get(failure.nodeId());
- if (nodeEntry != null) {
- if (nodeEntry.getFetchingRound() != fetchingRound) {
- assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
- logger.trace(
- "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})",
- reroutingKey,
- nodeEntry.getNodeId(),
- type,
- nodeEntry.getFetchingRound(),
- fetchingRound
- );
- } else if (nodeEntry.isFailed() == false) {
- // if the entry is there, for the right fetching round and not marked as failed already, process it
- Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
- // if the request got rejected or timed out, we need to try it again next time...
- if (unwrappedCause instanceof OpenSearchRejectedExecutionException
- || unwrappedCause instanceof ReceiveTimeoutTransportException
- || unwrappedCause instanceof OpenSearchTimeoutException) {
- nodeEntry.restartFetching();
- } else {
- logger.warn(
- () -> new ParameterizedMessage(
- "{}: failed to list shard for {} on node [{}]",
- reroutingKey,
- type,
- failure.nodeId()
- ),
- failure
- );
- nodeEntry.doneFetching(failure.getCause());
- }
- }
- }
- }
+ cache.processFailures(failures, fetchingRound);
}
reroute(reroutingKey, "post_response");
}
+ public synchronized int getNumberOfInFlightFetches() {
+ return cache.getInflightFetches();
+ }
+
/**
* Implement this in order to scheduled another round that causes a call to fetch data.
*/
@@ -351,47 +249,6 @@ synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
}
- /**
- * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
- * it nodes that are no longer part of the state.
- */
- private void fillShardCacheWithDataNodes(Map> shardCache, DiscoveryNodes nodes) {
- // verify that all current data nodes are there
- for (final DiscoveryNode node : nodes.getDataNodes().values()) {
- if (shardCache.containsKey(node.getId()) == false) {
- shardCache.put(node.getId(), new NodeEntry(node.getId()));
- }
- }
- // remove nodes that are not longer part of the data nodes set
- shardCache.keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId));
- }
-
- /**
- * Finds all the nodes that need to be fetched. Those are nodes that have no
- * data, and are not in fetch mode.
- */
- private List> findNodesToFetch(Map> shardCache) {
- List> nodesToFetch = new ArrayList<>();
- for (NodeEntry nodeEntry : shardCache.values()) {
- if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
- nodesToFetch.add(nodeEntry);
- }
- }
- return nodesToFetch;
- }
-
- /**
- * Are there any nodes that are fetching data?
- */
- private boolean hasAnyNodeFetching(Map> shardCache) {
- for (NodeEntry nodeEntry : shardCache.values()) {
- if (nodeEntry.isFetching()) {
- return true;
- }
- }
- return false;
- }
-
/**
* Async fetches data for the provided shard with the set of nodes that need to be fetched from.
*/
@@ -415,6 +272,72 @@ public void onFailure(Exception e) {
});
}
+ /**
+ * Cache implementation of transport actions returning single shard related data in the response.
+ * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or
+ * {@link TransportNodesListShardStoreMetadata}.
+ *
+ * @param Response type of transport action.
+ */
+ static class ShardCache extends AsyncShardFetchCache {
+
+ private final Map> cache;
+
+ public ShardCache(Logger logger, String logKey, String type) {
+ super(Loggers.getLogger(logger, "_" + logKey), type);
+ cache = new HashMap<>();
+ }
+
+ @Override
+ public void initData(DiscoveryNode node) {
+ cache.put(node.getId(), new NodeEntry<>(node.getId()));
+ }
+
+ @Override
+ public void putData(DiscoveryNode node, K response) {
+ cache.get(node.getId()).doneFetching(response);
+ }
+
+ @Override
+ public K getData(DiscoveryNode node) {
+ return cache.get(node.getId()).getValue();
+ }
+
+ @NonNull
+ @Override
+ public Map getCache() {
+ return cache;
+ }
+
+ @Override
+ public void deleteShard(ShardId shardId) {
+ cache.clear(); // single shard cache can clear the full map
+ }
+
+ /**
+ * A node entry, holding the state of the fetched data for a specific shard
+ * for a giving node.
+ */
+ static class NodeEntry extends AsyncShardFetchCache.BaseNodeEntry {
+ @Nullable
+ private U value;
+
+ void doneFetching(U value) {
+ super.doneFetching();
+ this.value = value;
+ }
+
+ NodeEntry(String nodeId) {
+ super(nodeId);
+ }
+
+ U getValue() {
+ return value;
+ }
+
+ }
+ }
+
/**
* The result of a fetch operation. Make sure to first check {@link #hasData()} before
* fetching the actual data.
@@ -460,83 +383,4 @@ public void processAllocation(RoutingAllocation allocation) {
}
}
-
- /**
- * A node entry, holding the state of the fetched data for a specific shard
- * for a giving node.
- */
- static class NodeEntry {
- private final String nodeId;
- private boolean fetching;
- @Nullable
- private T value;
- private boolean valueSet;
- private Throwable failure;
- private long fetchingRound;
-
- NodeEntry(String nodeId) {
- this.nodeId = nodeId;
- }
-
- String getNodeId() {
- return this.nodeId;
- }
-
- boolean isFetching() {
- return fetching;
- }
-
- void markAsFetching(long fetchingRound) {
- assert fetching == false : "double marking a node as fetching";
- this.fetching = true;
- this.fetchingRound = fetchingRound;
- }
-
- void doneFetching(T value) {
- assert fetching : "setting value but not in fetching mode";
- assert failure == null : "setting value when failure already set";
- this.valueSet = true;
- this.value = value;
- this.fetching = false;
- }
-
- void doneFetching(Throwable failure) {
- assert fetching : "setting value but not in fetching mode";
- assert valueSet == false : "setting failure when already set value";
- assert failure != null : "setting failure can't be null";
- this.failure = failure;
- this.fetching = false;
- }
-
- void restartFetching() {
- assert fetching : "restarting fetching, but not in fetching mode";
- assert valueSet == false : "value can't be set when restarting fetching";
- assert failure == null : "failure can't be set when restarting fetching";
- this.fetching = false;
- }
-
- boolean isFailed() {
- return failure != null;
- }
-
- boolean hasData() {
- return valueSet || failure != null;
- }
-
- Throwable getFailure() {
- assert hasData() : "getting failure when data has not been fetched";
- return failure;
- }
-
- @Nullable
- T getValue() {
- assert failure == null : "trying to fetch value, but its marked as failed, check isFailed";
- assert valueSet : "value is not set, hasn't been fetched yet";
- return value;
- }
-
- long getFetchingRound() {
- return fetchingRound;
- }
- }
}
diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java
new file mode 100644
index 0000000000000..3140ceef4f3ee
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java
@@ -0,0 +1,316 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.opensearch.ExceptionsHelper;
+import org.opensearch.OpenSearchTimeoutException;
+import org.opensearch.action.FailedNodeException;
+import org.opensearch.action.support.nodes.BaseNodeResponse;
+import org.opensearch.cluster.node.DiscoveryNode;
+import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.transport.ReceiveTimeoutTransportException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import reactor.util.annotation.NonNull;
+
+/**
+ * AsyncShardFetchCache will operate on the node level cache which is map of String and BaseNodeEntry. initData,
+ * putData and getData needs to be called for all the nodes. This class is responsible for managing the flow for all
+ * the nodes.
+ * It'll also give useful insights like how many ongoing fetches are happening, how many nodes are left for fetch or
+ * mark some node in fetching mode. All of these functionalities require checking the cache information and respond
+ * accordingly.
+ *
+ * initData : how to initialize an entry of shard cache for a node.
+ * putData : how to store the response of transport action in the cache.
+ * getData : how to get the stored data for any shard allocators like {@link PrimaryShardAllocator} or
+ * {@link ReplicaShardAllocator}
+ * deleteShard : how to clean up the stored data from cache for a shard.
+ *
+ * @param Response type of transport action which has the data to be stored in the cache.
+ *
+ * @opensearch.internal
+ */
+public abstract class AsyncShardFetchCache {
+ private final Logger logger;
+ private final String type;
+
+ protected AsyncShardFetchCache(Logger logger, String type) {
+ this.logger = logger;
+ this.type = type;
+ }
+
+ abstract void initData(DiscoveryNode node);
+
+ abstract void putData(DiscoveryNode node, K response);
+
+ abstract K getData(DiscoveryNode node);
+
+ @NonNull
+ abstract Map getCache();
+
+ /**
+ * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will
+ * automatically be cleaned up once shards are assigned.
+ *
+ * @param shardId for which we need to free up the cached data.
+ */
+ abstract void deleteShard(ShardId shardId);
+
+ /**
+ * Returns the number of fetches that are currently ongoing.
+ */
+ int getInflightFetches() {
+ int count = 0;
+ for (BaseNodeEntry nodeEntry : getCache().values()) {
+ if (nodeEntry.isFetching()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
+ * it nodes that are no longer part of the state.
+ */
+ void fillShardCacheWithDataNodes(DiscoveryNodes nodes) {
+ // verify that all current data nodes are there
+ for (final DiscoveryNode node : nodes.getDataNodes().values()) {
+ if (getCache().containsKey(node.getId()) == false) {
+ initData(node);
+ }
+ }
+ // remove nodes that are not longer part of the data nodes set
+ getCache().keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId));
+ }
+
+ /**
+ * Finds all the nodes that need to be fetched. Those are nodes that have no
+ * data, and are not in fetch mode.
+ */
+ List findNodesToFetch() {
+ List nodesToFetch = new ArrayList<>();
+ for (BaseNodeEntry nodeEntry : getCache().values()) {
+ if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
+ nodesToFetch.add(nodeEntry.getNodeId());
+ }
+ }
+ return nodesToFetch;
+ }
+
+ /**
+ * Are there any nodes that are fetching data?
+ */
+ boolean hasAnyNodeFetching() {
+ for (BaseNodeEntry nodeEntry : getCache().values()) {
+ if (nodeEntry.isFetching()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get the data from cache, ignore the failed entries. Use getData functional interface to get the data, as
+ * different implementations may have different ways to populate the data from cache.
+ *
+ * @param nodes Discovery nodes for which we need to return the cache data.
+ * @param failedNodes return failedNodes with the nodes where fetch has failed.
+ * @return Map of cache data for every DiscoveryNode.
+ */
+ Map getCacheData(DiscoveryNodes nodes, Set failedNodes) {
+ Map fetchData = new HashMap<>();
+ for (Iterator extends Map.Entry> it = getCache().entrySet().iterator(); it.hasNext();) {
+ Map.Entry entry = (Map.Entry) it.next();
+ String nodeId = entry.getKey();
+ BaseNodeEntry nodeEntry = entry.getValue();
+
+ DiscoveryNode node = nodes.get(nodeId);
+ if (node != null) {
+ if (nodeEntry.isFailed()) {
+ // if its failed, remove it from the list of nodes, so if this run doesn't work
+ // we try again next round to fetch it again
+ it.remove();
+ failedNodes.add(nodeEntry.getNodeId());
+ } else {
+ K nodeResponse = getData(node);
+ if (nodeResponse != null) {
+ fetchData.put(node, nodeResponse);
+ }
+ }
+ }
+ }
+ return fetchData;
+ }
+
+ void processResponses(List responses, long fetchingRound) {
+ for (K response : responses) {
+ BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
+ if (nodeEntry != null) {
+ if (validateNodeResponse(nodeEntry, fetchingRound)) {
+ // if the entry is there, for the right fetching round and not marked as failed already, process it
+ logger.trace("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response);
+ putData(response.getNode(), response);
+ }
+ }
+ }
+ }
+
+ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) {
+ if (nodeEntry.getFetchingRound() != fetchingRound) {
+ assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
+ logger.trace(
+ "received response for [{}] from node {} for an older fetching round (expected: {} but was: {})",
+ nodeEntry.getNodeId(),
+ type,
+ nodeEntry.getFetchingRound(),
+ fetchingRound
+ );
+ return false;
+ } else if (nodeEntry.isFailed()) {
+ logger.trace("node {} has failed for [{}] (failure [{}])", nodeEntry.getNodeId(), type, nodeEntry.getFailure());
+ return false;
+ }
+ return true;
+ }
+
+ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) {
+ if (nodeEntry.getFetchingRound() != fetchingRound) {
+ assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
+ logger.trace(
+ "received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})",
+ nodeEntry.getNodeId(),
+ type,
+ nodeEntry.getFetchingRound(),
+ fetchingRound
+ );
+ } else if (nodeEntry.isFailed() == false) {
+ // if the entry is there, for the right fetching round and not marked as failed already, process it
+ Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
+ // if the request got rejected or timed out, we need to try it again next time...
+ if (retryableException(unwrappedCause)) {
+ nodeEntry.restartFetching();
+ } else {
+ logger.warn(() -> new ParameterizedMessage("failed to list shard for {} on node [{}]", type, failure.nodeId()), failure);
+ nodeEntry.doneFetching(failure.getCause());
+ }
+ }
+ }
+
+ boolean retryableException(Throwable unwrappedCause) {
+ return unwrappedCause instanceof OpenSearchRejectedExecutionException
+ || unwrappedCause instanceof ReceiveTimeoutTransportException
+ || unwrappedCause instanceof OpenSearchTimeoutException;
+ }
+
+ void processFailures(List failures, long fetchingRound) {
+ for (FailedNodeException failure : failures) {
+ logger.trace("processing failure {} for [{}]", failure, type);
+ BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());
+ if (nodeEntry != null) {
+ handleNodeFailure(nodeEntry, failure, fetchingRound);
+ }
+ }
+ }
+
+ /**
+ * Common function for removing whole node entry.
+ *
+ * @param nodeId nodeId to be cleaned.
+ */
+ void remove(String nodeId) {
+ this.getCache().remove(nodeId);
+ }
+
+ void markAsFetching(List nodeIds, long fetchingRound) {
+ for (String nodeId : nodeIds) {
+ getCache().get(nodeId).markAsFetching(fetchingRound);
+ }
+ }
+
+ /**
+ * A node entry, holding only node level fetching related information.
+ * Actual metadata of shard is stored in child classes.
+ */
+ static class BaseNodeEntry {
+ private final String nodeId;
+ private boolean fetching;
+ private boolean valueSet;
+ private Throwable failure;
+ private long fetchingRound;
+
+ BaseNodeEntry(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ String getNodeId() {
+ return this.nodeId;
+ }
+
+ boolean isFetching() {
+ return fetching;
+ }
+
+ void markAsFetching(long fetchingRound) {
+ assert fetching == false : "double marking a node as fetching";
+ this.fetching = true;
+ this.fetchingRound = fetchingRound;
+ }
+
+ void doneFetching() {
+ assert fetching : "setting value but not in fetching mode";
+ assert failure == null : "setting value when failure already set";
+ this.valueSet = true;
+ this.fetching = false;
+ }
+
+ void doneFetching(Throwable failure) {
+ assert fetching : "setting value but not in fetching mode";
+ assert valueSet == false : "setting failure when already set value";
+ assert failure != null : "setting failure can't be null";
+ this.failure = failure;
+ this.fetching = false;
+ }
+
+ void restartFetching() {
+ assert fetching : "restarting fetching, but not in fetching mode";
+ assert valueSet == false : "value can't be set when restarting fetching";
+ assert failure == null : "failure can't be set when restarting fetching";
+ this.fetching = false;
+ }
+
+ boolean isFailed() {
+ return failure != null;
+ }
+
+ boolean hasData() {
+ return valueSet || failure != null;
+ }
+
+ Throwable getFailure() {
+ assert hasData() : "getting failure when data has not been fetched";
+ return failure;
+ }
+
+ long getFetchingRound() {
+ return fetchingRound;
+ }
+ }
+}