Skip to content

Commit

Permalink
Reverted TestGatewayAllocator change
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Dec 11, 2023
1 parent 2aa0845 commit 1ec2ba1
Showing 1 changed file with 5 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,16 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.FailedShard;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PrimaryShardAllocator;
import org.opensearch.gateway.PrimaryShardBatchAllocator;
import org.opensearch.gateway.ReplicaShardAllocator;
import org.opensearch.gateway.ReplicaShardBatchAllocator;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards;
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -62,13 +57,13 @@
* A gateway allocator implementation that keeps an in memory list of started shard allocation
* that are used as replies to the, normally async, fetch data requests. The in memory list
* is adapted when shards are started and failed.
*
* <p>
* Nodes leaving and joining the cluster do not change the list of shards the class tracks but
* rather serves as a filter to what is returned by fetch data. Concretely - fetch data will
* only return shards that were started on nodes that are currently part of the cluster.
*
* <p>
* For now only primary shard related data is fetched. Replica request always get an empty response.
*
* <p>
*
* This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do
* not have all the infrastructure required to use it.
Expand Down Expand Up @@ -103,52 +98,7 @@ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardR
)
);

return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>() {
{
put(shardId, ignoreNodes);
}
});
}
};

PrimaryShardBatchAllocator primaryBatchShardAllocator = new PrimaryShardBatchAllocator() {
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch> fetchData(
Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
Map<DiscoveryNode, TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch> foundShards = new HashMap<>();
HashMap<ShardId, Set<String>> shardsToIgnoreNodes = new HashMap<>();
for (Map.Entry<String, Map<ShardId, ShardRouting>> entry : knownAllocations.entrySet()) {
String nodeId = entry.getKey();
Map<ShardId, ShardRouting> shardsOnNode = entry.getValue();
HashMap<ShardId, TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard> adaptedResponse = new HashMap<>();

for (ShardRouting shardRouting : shardsEligibleForFetch) {
ShardId shardId = shardRouting.shardId();
Set<String> ignoreNodes = allocation.getIgnoreNodes(shardId);

if (shardsOnNode.containsKey(shardId) && ignoreNodes.contains(nodeId) == false && currentNodes.nodeExists(nodeId)) {
TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard nodeShard =
new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard(
shardRouting.allocationId().getId(),
shardRouting.primary(),
getReplicationCheckpoint(shardId, nodeId)
);
adaptedResponse.put(shardId, nodeShard);
shardsToIgnoreNodes.put(shardId, ignoreNodes);
}
foundShards.put(
currentNodes.get(nodeId),
new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch(
currentNodes.get(nodeId),
adaptedResponse
)
);
}
}
return new AsyncShardFetch.FetchResult<>(foundShards, shardsToIgnoreNodes);
return new AsyncShardFetch.FetchResult<>(shardId, foundShards, ignoreNodes);
}
};

Expand All @@ -161,28 +111,7 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n
protected AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation) {
// for now, just pretend no node has data
final ShardId shardId = shard.shardId();
return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>() {
{
put(shardId, allocation.getIgnoreNodes(shardId));
}
});
}

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
return true;
}
};

ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() {

@Override
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> fetchData(
Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), Collections.emptyMap());
return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), allocation.getIgnoreNodes(shardId));
}

@Override
Expand Down Expand Up @@ -228,12 +157,6 @@ public void allocateUnassigned(
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
}

@Override
public void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) {
currentNodes = allocation.nodes();
innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary);
}

/**
* manually add a specific shard to the allocations the gateway keeps track of
*/
Expand All @@ -248,34 +171,4 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) {
public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) {
shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint);
}

public Set<String> createAndUpdateBatches(RoutingAllocation allocation, boolean primary) {
return super.createAndUpdateBatches(allocation, primary);
}

public void safelyRemoveShardFromBatch(ShardRouting shard) {
super.safelyRemoveShardFromBatch(shard);
}

public void safelyRemoveShardFromBothBatch(ShardRouting shardRouting) {
super.safelyRemoveShardFromBothBatch(shardRouting);
}

public String getBatchId(ShardRouting shard, boolean primary) {
return super.getBatchId(shard, primary);
}

public Map<String, GatewayAllocator.ShardsBatch> getBatchIdToStartedShardBatch() {
return batchIdToStartedShardBatch;
}

public Map<String, GatewayAllocator.ShardsBatch> getBatchIdToStoreShardBatch() {
return batchIdToStoreShardBatch;
}

@Override
public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
setShardAllocators(primaryShardAllocator, replicaShardAllocator, primaryBatchShardAllocator, replicaBatchShardAllocator);
return super.explainUnassignedShardAllocation(unassignedShard, routingAllocation);
}
}

0 comments on commit 1ec2ba1

Please sign in to comment.