orderedAllocationCandidates, int allocationsFound) {
this.orderedAllocationCandidates = orderedAllocationCandidates;
this.allocationsFound = allocationsFound;
}
@@ -530,93 +533,12 @@ protected static class NodesToAllocate {
* by the allocator for allocating to the node that holds the shard copy.
*/
private static class DecidedNode {
- final NodeShardState nodeShardState;
+ final NodeGatewayStartedShards nodeShardState;
final Decision decision;
- private DecidedNode(NodeShardState nodeShardState, Decision decision) {
+ private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) {
this.nodeShardState = nodeShardState;
this.decision = decision;
}
}
-
- /**
- * The NodeShardState class represents the state of a node shard in a distributed system.
- * It includes several key data points about the shard state, such as its allocation ID,
- * whether it's a primary shard, any store exception, the replication checkpoint, and the
- * DiscoveryNode it belongs to.
- */
- protected static class NodeShardState {
- // Allocation ID of the shard
- private final String allocationId;
- // Whether the shard is primary
- private final boolean primary;
- // Any store exception associated with the shard
- private final Exception storeException;
- // The replication checkpoint of the shard
- private final ReplicationCheckpoint replicationCheckpoint;
- // The DiscoveryNode the shard belongs to
- private final DiscoveryNode node;
-
- /**
- * Constructs a new NodeShardState with the given parameters.
- * @param node The DiscoveryNode the shard belongs to.
- * @param allocationId The allocation ID of the shard.
- * @param primary Whether the shard is a primary shard.
- * @param replicationCheckpoint The replication checkpoint of the shard.
- * @param storeException Any store exception associated with the shard.
- */
- public NodeShardState(
- DiscoveryNode node,
- String allocationId,
- boolean primary,
- ReplicationCheckpoint replicationCheckpoint,
- Exception storeException
- ) {
- this.node = node;
- this.allocationId = allocationId;
- this.primary = primary;
- this.replicationCheckpoint = replicationCheckpoint;
- this.storeException = storeException;
- }
-
- /**
- * Returns the allocation ID of the shard.
- * @return The allocation ID of the shard.
- */
- public String allocationId() {
- return this.allocationId;
- }
-
- /**
- * Returns whether the shard is a primary shard.
- * @return True if the shard is a primary shard, false otherwise.
- */
- public boolean primary() {
- return this.primary;
- }
-
- /**
- * Returns the replication checkpoint of the shard.
- * @return The replication checkpoint of the shard.
- */
- public ReplicationCheckpoint replicationCheckpoint() {
- return this.replicationCheckpoint;
- }
-
- /**
- * Returns any store exception associated with the shard.
- * @return The store exception associated with the shard, or null if there isn't one.
- */
- public Exception storeException() {
- return this.storeException;
- }
-
- /**
- * Returns the DiscoveryNode the shard belongs to.
- * @return The DiscoveryNode the shard belongs to.
- */
- public DiscoveryNode getNode() {
- return this.node;
- }
- }
}
diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java
index 7462062a0cd46..caab04d3fc434 100644
--- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java
+++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java
@@ -35,16 +35,21 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
+import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.FailedShard;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PrimaryShardAllocator;
+import org.opensearch.gateway.PrimaryShardBatchAllocator;
import org.opensearch.gateway.ReplicaShardAllocator;
+import org.opensearch.gateway.ReplicaShardBatchAllocator;
+import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards;
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
+import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import java.util.Collections;
import java.util.HashMap;
@@ -57,13 +62,13 @@
* A gateway allocator implementation that keeps an in memory list of started shard allocation
* that are used as replies to the, normally async, fetch data requests. The in memory list
* is adapted when shards are started and failed.
- *
+ *
* Nodes leaving and joining the cluster do not change the list of shards the class tracks but
* rather serves as a filter to what is returned by fetch data. Concretely - fetch data will
* only return shards that were started on nodes that are currently part of the cluster.
- *
+ *
* For now only primary shard related data is fetched. Replica request always get an empty response.
- *
+ *
*
* This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do
* not have all the infrastructure required to use it.
@@ -98,7 +103,52 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR
)
);
- return new AsyncShardFetch.FetchResult<>(shardId, foundShards, ignoreNodes);
+ return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>() {
+ {
+ put(shardId, ignoreNodes);
+ }
+ });
+ }
+ };
+
+ PrimaryShardBatchAllocator primaryBatchShardAllocator = new PrimaryShardBatchAllocator() {
+ @Override
+ protected AsyncShardFetch.FetchResult fetchData(
+ Set shardsEligibleForFetch,
+ Set inEligibleShards,
+ RoutingAllocation allocation
+ ) {
+ Map foundShards = new HashMap<>();
+ HashMap> shardsToIgnoreNodes = new HashMap<>();
+ for (Map.Entry> entry : knownAllocations.entrySet()) {
+ String nodeId = entry.getKey();
+ Map shardsOnNode = entry.getValue();
+ HashMap adaptedResponse = new HashMap<>();
+
+ for (ShardRouting shardRouting : shardsEligibleForFetch) {
+ ShardId shardId = shardRouting.shardId();
+ Set ignoreNodes = allocation.getIgnoreNodes(shardId);
+
+ if (shardsOnNode.containsKey(shardId) && ignoreNodes.contains(nodeId) == false && currentNodes.nodeExists(nodeId)) {
+ TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard nodeShard =
+ new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard(
+ shardRouting.allocationId().getId(),
+ shardRouting.primary(),
+ getReplicationCheckpoint(shardId, nodeId)
+ );
+ adaptedResponse.put(shardId, nodeShard);
+ shardsToIgnoreNodes.put(shardId, ignoreNodes);
+ }
+ foundShards.put(
+ currentNodes.get(nodeId),
+ new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch(
+ currentNodes.get(nodeId),
+ adaptedResponse
+ )
+ );
+ }
+ }
+ return new AsyncShardFetch.FetchResult<>(foundShards, shardsToIgnoreNodes);
}
};
@@ -111,7 +161,28 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n
protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) {
// for now, just pretend no node has data
final ShardId shardId = shard.shardId();
- return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), allocation.getIgnoreNodes(shardId));
+ return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>() {
+ {
+ put(shardId, allocation.getIgnoreNodes(shardId));
+ }
+ });
+ }
+
+ @Override
+ protected boolean hasInitiatedFetching(ShardRouting shard) {
+ return true;
+ }
+ };
+
+ ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() {
+
+ @Override
+ protected AsyncShardFetch.FetchResult fetchData(
+ Set shardsEligibleForFetch,
+ Set inEligibleShards,
+ RoutingAllocation allocation
+ ) {
+ return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), Collections.emptyMap());
}
@Override
@@ -157,6 +228,12 @@ public void allocateUnassigned(
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
}
+ @Override
+ public void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) {
+ currentNodes = allocation.nodes();
+ innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary);
+ }
+
/**
* manually add a specific shard to the allocations the gateway keeps track of
*/
@@ -171,4 +248,34 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) {
public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) {
shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint);
}
+
+ public Set createAndUpdateBatches(RoutingAllocation allocation, boolean primary) {
+ return super.createAndUpdateBatches(allocation, primary);
+ }
+
+ public void safelyRemoveShardFromBatch(ShardRouting shard) {
+ super.safelyRemoveShardFromBatch(shard);
+ }
+
+ public void safelyRemoveShardFromBothBatch(ShardRouting shardRouting) {
+ super.safelyRemoveShardFromBothBatch(shardRouting);
+ }
+
+ public String getBatchId(ShardRouting shard, boolean primary) {
+ return super.getBatchId(shard, primary);
+ }
+
+ public Map getBatchIdToStartedShardBatch() {
+ return batchIdToStartedShardBatch;
+ }
+
+ public Map getBatchIdToStoreShardBatch() {
+ return batchIdToStoreShardBatch;
+ }
+
+ @Override
+ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
+ setShardAllocators(primaryShardAllocator, replicaShardAllocator, primaryBatchShardAllocator, replicaBatchShardAllocator);
+ return super.explainUnassignedShardAllocation(unassignedShard, routingAllocation);
+ }
}