Skip to content

Commit

Permalink
Applied spotlesscheck
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Aug 30, 2023
1 parent 43beda5 commit 6c82091
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;

public abstract class AsyncBatchShardFetch<T extends BaseNodeResponse> implements Releasable {
Expand All @@ -46,13 +45,13 @@ public abstract class AsyncBatchShardFetch<T extends BaseNodeResponse> implement
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(DiscoveryNode[] nodes,Map<ShardId, String> shardIdsWithCustomDataPath, ActionListener<NodesResponse> listener);
void list(DiscoveryNode[] nodes, Map<ShardId, String> shardIdsWithCustomDataPath, ActionListener<NodesResponse> listener);
}

protected final Logger logger;
protected final String type;
private final String batchUUID;
protected Map<ShardId,String> shardsToCustomDataPathMap;
protected Map<ShardId, String> shardsToCustomDataPathMap;
private Map<ShardId, Set<String>> ignoredShardToNodes = new HashMap<>();
private final AsyncBatchShardFetch.Lister<BaseNodesResponse<T>, T> action;
private final Map<String, AsyncBatchShardFetch.NodeEntry<T>> cache = new HashMap<>();
Expand Down Expand Up @@ -362,7 +361,7 @@ public Map<DiscoveryNode, T> getData() {
* Process any changes needed to the allocation based on this fetch result.
*/
public void processAllocation(RoutingAllocation allocation) {
for(Map.Entry<ShardId, Set<String>> entry : ignoredShardToNodes.entrySet()) {
for (Map.Entry<ShardId, Set<String>> entry : ignoredShardToNodes.entrySet()) {
ShardId shardId = entry.getKey();
Set<String> ignoreNodes = entry.getValue();
if (ignoreNodes.isEmpty() == false) {
Expand Down
63 changes: 35 additions & 28 deletions server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, N
protected final Logger logger;
protected final String type;

protected final Map<ShardId,String> shardToCustomDataPath;
protected final Map<ShardId, String> shardToCustomDataPath;
private final Lister<BaseNodesResponse<T>, T> action;
private final Map<String, NodeEntry<T>> cache = new HashMap<>();
private final AtomicLong round = new AtomicLong();
Expand All @@ -103,7 +103,7 @@ protected AsyncShardFetch(
) {
this.logger = logger;
this.type = type;
shardToCustomDataPath =new HashMap<>();
shardToCustomDataPath = new HashMap<>();
shardToCustomDataPath.put(shardId, customDataPath);
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.logKey = "ShardId=[" + shardId.toString() + "]";
Expand All @@ -122,11 +122,10 @@ protected AsyncShardFetch(
this.type = type;
this.shardToCustomDataPath = shardToCustomDataPath;
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.logKey = "BatchID=[" + batchId+ "]";
this.logKey = "BatchID=[" + batchId + "]";
enableBatchMode = true;
}


@Override
public synchronized void close() {
this.closed = true;
Expand Down Expand Up @@ -157,11 +156,12 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
throw new IllegalStateException(logKey + ": can't fetch data on closed async fetch");
}

if(enableBatchMode == false){
if (enableBatchMode == false) {
// we will do assertions here on ignoreNodes
assert ignoreNodes.size() <=1 : "Can only have at-most one shard";
if(ignoreNodes.size() == 1) {
assert shardToCustomDataPath.containsKey(ignoreNodes.keySet().iterator().next()) : "ShardId should be same as initialised in fetcher";
assert ignoreNodes.size() <= 1 : "Can only have at-most one shard";
if (ignoreNodes.size() == 1) {
assert shardToCustomDataPath.containsKey(ignoreNodes.keySet().iterator().next())
: "ShardId should be same as initialised in fetcher";
}
}

Expand Down Expand Up @@ -221,9 +221,16 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
shardToIgnoreNodes.clear();
// if at least one node failed, make sure to have a protective reroute
// here, just case this round won't find anything, and we need to retry fetching data
if (failedNodes.isEmpty() == false || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) {
reroute(logKey, "nodes failed [" + failedNodes.size() + "], ignored ["
+ allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + "]");
if (failedNodes.isEmpty() == false
|| allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) {
reroute(
logKey,
"nodes failed ["
+ failedNodes.size()
+ "], ignored ["
+ allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum()
+ "]"
);
}

return new FetchResult<>(fetchData, allIgnoreNodesMap);
Expand Down Expand Up @@ -398,13 +405,13 @@ public void onFailure(Exception e) {
*/
public static class FetchResult<T extends BaseNodeResponse> {

private final Map<DiscoveryNode, T> data;
private final Map<ShardId, Set<String>> ignoredShardToNodes;
private final Map<DiscoveryNode, T> data;
private final Map<ShardId, Set<String>> ignoredShardToNodes;

public FetchResult(Map<DiscoveryNode, T> data, Map<ShardId, Set<String>> ignoreNodes) {
this.data = data;
this.ignoredShardToNodes = ignoreNodes;
}
public FetchResult(Map<DiscoveryNode, T> data, Map<ShardId, Set<String>> ignoreNodes) {
this.data = data;
this.ignoredShardToNodes = ignoreNodes;
}

/**
* Does the result actually contain data? If not, then there are on going fetch
Expand All @@ -423,20 +430,20 @@ public Map<DiscoveryNode, T> getData() {
return this.data;
}

/**
* Process any changes needed to the allocation based on this fetch result.
*/
public void processAllocation(RoutingAllocation allocation) {
for(Map.Entry<ShardId, Set<String>> entry : ignoredShardToNodes.entrySet()) {
ShardId shardId = entry.getKey();
Set<String> ignoreNodes = entry.getValue();
if (ignoreNodes.isEmpty() == false) {
ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId));
/**
* Process any changes needed to the allocation based on this fetch result.
*/
public void processAllocation(RoutingAllocation allocation) {
for (Map.Entry<ShardId, Set<String>> entry : ignoredShardToNodes.entrySet()) {
ShardId shardId = entry.getKey();
Set<String> ignoreNodes = entry.getValue();
if (ignoreNodes.isEmpty() == false) {
ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId));
}
}
}

}
}
}

/**
* A node entry, holding the state of the fetched data for a specific shard
Expand Down
17 changes: 11 additions & 6 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ protected void reroute(String logKey, String reason) {
);
}
}

class InternalPrimaryShardAllocator extends PrimaryShardAllocator {

private final TransportNodesListGatewayStartedShards startedAction;
Expand All @@ -295,9 +296,11 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
);
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(
allocation.nodes(),
new HashMap<>() {{
put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId()));
}}
new HashMap<>() {
{
put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId()));
}
}
);

if (shardState.hasData()) {
Expand Down Expand Up @@ -332,9 +335,11 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
);
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> shardStores = fetch.fetchData(
allocation.nodes(),
new HashMap<>() {{
put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId()));
}}
new HashMap<>() {
{
put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId()));
}
}
);
if (shardStores.hasData()) {
shardStores.processAllocation(allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ public TransportNodesListGatewayStartedShards(
}

@Override
public void list(Map<ShardId, String> shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener<NodesGatewayStartedShards> listener) {
public void list(
Map<ShardId, String> shardIdsWithCustomDataPath,
DiscoveryNode[] nodes,
ActionListener<NodesGatewayStartedShards> listener
) {
assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified";
final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next();
final String customDataPath = shardIdsWithCustomDataPath.get(shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,12 @@ public TransportNodesListShardStoreMetadata(
}

@Override
public void list(Map<ShardId, String> shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener<NodesStoreFilesMetadata> listener) {
assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified";
public void list(
Map<ShardId, String> shardIdsWithCustomDataPath,
DiscoveryNode[] nodes,
ActionListener<NodesStoreFilesMetadata> listener
) {
assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified";
final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next();
final String customDataPath = shardIdsWithCustomDataPath.get(shardId);
execute(new Request(shardId, customDataPath, nodes), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,9 +850,11 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
ShardRouting shard,
RoutingAllocation allocation
) {
return new AsyncShardFetch.FetchResult<>( data, new HashMap<>(){{
put(shardId, Collections.<String>emptySet());
}});
return new AsyncShardFetch.FetchResult<>(data, new HashMap<>() {
{
put(shardId, Collections.<String>emptySet());
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,11 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
);
}
}
return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>(){{
put(shardId, Collections.emptySet());
}});
return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>() {
{
put(shardId, Collections.emptySet());
}
});
}

@Override
Expand Down

0 comments on commit 6c82091

Please sign in to comment.