Skip to content

Commit

Permalink
Control concurrency and handle retries
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Nov 8, 2022
1 parent 97a506a commit c63621f
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;
private boolean retryOnClusterManagerChange;

private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

Expand All @@ -41,14 +42,20 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
this(decommissionAttribute, false);
}

public DecommissionRequest(DecommissionAttribute decommissionAttribute, boolean retryOnClusterManagerChange) {
this.decommissionAttribute = decommissionAttribute;
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
}

public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
this.retryOnClusterManagerChange = in.readBoolean();
}

@Override
Expand All @@ -57,6 +64,7 @@ public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
out.writeBoolean(retryOnClusterManagerChange);
}

/**
Expand Down Expand Up @@ -96,6 +104,24 @@ public boolean isNoDelay() {
return noDelay;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return this request
*/
public DecommissionRequest setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
return this;
}

/**
* @return Returns whether decommission is retry eligible on cluster manager change
*/
public boolean retryOnClusterManagerChange() {
return this.retryOnClusterManagerChange;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -122,6 +148,12 @@ public ActionRequestValidationException validate() {

@Override
public String toString() {
return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}';
return "DecommissionRequest{" +
"decommissionAttribute=" + decommissionAttribute +
", retryOnClusterManagerChange=" + retryOnClusterManagerChange +
", delayTimeout=" + delayTimeout +
", noDelay=" + noDelay +
", clusterManagerNodeTimeout=" + clusterManagerNodeTimeout +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,15 @@ public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
request.setNoDelay(noDelay);
return this;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return current object
*/
public DecommissionRequestBuilder setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
request.setRetryOnClusterManagerChange(retryOnClusterManagerChange);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
Expand Down Expand Up @@ -74,6 +77,57 @@ public class DecommissionController {
this.threadPool = threadPool;
}

/**
* This method sends a transport call to retry decommission action, given that -
* 1. cluster_manager_node_timeout is not timed out
* 2. And executed when there was a cluster manager change
*
* @param decommissionRequest decommission request object
* @param startTime start time of previous request
* @param listener callback for the retry action
*/
public void retryDecommissionAction(
DecommissionRequest decommissionRequest,
long startTime,
ActionListener<DecommissionResponse> listener
) {
final long remainingTimeoutMS = decommissionRequest.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime);
if (remainingTimeoutMS <= 0) {
String errorMsg = "cluster manager node timed out before retrying [" + DecommissionAction.NAME + "] for attribute [" + decommissionRequest.getDecommissionAttribute() + "] after cluster manager change";
logger.debug(errorMsg);
listener.onFailure(new OpenSearchTimeoutException(errorMsg));
return;
}
decommissionRequest.setRetryOnClusterManagerChange(true);
decommissionRequest.clusterManagerNodeTimeout(TimeValue.timeValueMillis(remainingTimeoutMS));
transportService.sendRequest(
transportService.getLocalNode(),
DecommissionAction.NAME,
decommissionRequest,
new TransportResponseHandler<DecommissionResponse>() {
@Override
public void handleResponse(DecommissionResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public DecommissionResponse read(StreamInput in) throws IOException {
return new DecommissionResponse(in);
}
}
);
}

/**
* This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor}
* Once the tasks are submitted, it waits for an expected cluster state to guarantee
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class DecommissionService {
private final TransportService transportService;
private final ThreadPool threadPool;
private final DecommissionController decommissionController;
private final long startTime;
private volatile List<String> awarenessAttributes;
private volatile Map<String, List<String>> forcedAwarenessAttributes;
private volatile int maxVotingConfigExclusions;
Expand All @@ -91,6 +92,7 @@ public DecommissionService(
this.transportService = transportService;
this.threadPool = threadPool;
this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool);
this.startTime = threadPool.relativeTimeInMillis();
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);

Expand Down Expand Up @@ -127,7 +129,7 @@ private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) {
* Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT}
* Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration
*
* @param decommissionRequest decommission request Object
* @param decommissionRequest request for decommission action
* @param listener register decommission listener
*/
public void startDecommissionAction(
Expand All @@ -146,6 +148,7 @@ public ClusterState execute(ClusterState currentState) {
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed and attribute is weighed away
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata);
ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute);

ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute);
Expand Down Expand Up @@ -238,18 +241,22 @@ public void onNewClusterState(ClusterState state) {
drainNodesWithDecommissionedAttribute(decommissionRequest);
}
} else {
// explicitly calling listener.onFailure with NotClusterManagerException as the local node is not leader
// this will ensures that request is retried until cluster manager times out
logger.info(
"local node is not eligible to process the request, "
+ "throwing NotClusterManagerException to attempt a retry on an eligible node"
);
listener.onFailure(
new NotClusterManagerException(
"node ["
+ transportService.getLocalNode().toString()
+ "] not eligible to execute decommission request. Will retry until timeout."
)
// since the local node is no longer cluster manager which could've happened due to leader abdication,
// hence retrying the decommission action until it times out
logger.info("local node is not eligible to process the request, retrying the transport action until it times out");
decommissionController.retryDecommissionAction(
decommissionRequest,
startTime,
ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
logger.debug(
() -> new ParameterizedMessage(
"failed to retry decommission action for attribute [{}]",
decommissionRequest.getDecommissionAttribute()
),
t
);
delegatedListener.onFailure(t);
})
);
}
}
Expand Down Expand Up @@ -498,6 +505,21 @@ private static void ensureEligibleRequest(
}
}

private static void ensureEligibleRetry(
DecommissionRequest decommissionRequest,
DecommissionAttributeMetadata decommissionAttributeMetadata
) {
if (decommissionAttributeMetadata != null) {
if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT)
&& decommissionRequest.retryOnClusterManagerChange() == false) {
throw new DecommissioningFailedException(
decommissionRequest.getDecommissionAttribute(),
"concurrent request received to decommission attribute"
);
}
}
}

private ActionListener<DecommissionStatus> statusUpdateListener() {
return new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ DecommissionRequest createRequest(RestRequest request) throws IOException {
TimeValue delayTimeout = request.paramAsTime("delay_timeout", DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT);
decommissionRequest.setDelayTimeout(delayTimeout);
}
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue));
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue))
.setRetryOnClusterManagerChange(false);
}
}

0 comments on commit c63621f

Please sign in to comment.