Skip to content

Commit

Permalink
refactored NodeAction methods to avoid naming on TermCheck verificati…
Browse files Browse the repository at this point in the history
…on details
  • Loading branch information
rajiv-kv committed Mar 8, 2024
1 parent e5f41dc commit 8770af1
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi
}

@Override
protected boolean checkTermVersion() {
protected boolean canUseLocalNodeClusterState() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@
public class GetTermVersionResponse extends ActionResponse {

private final ClusterName clusterName;
private final String stateUUID;
private final String clusterUUID;
private final long term;
private final long version;

public GetTermVersionResponse(ClusterName clusterName, String stateUUID, long term, long version) {
public GetTermVersionResponse(ClusterName clusterName, String clusterUUID, long term, long version) {
this.clusterName = clusterName;
this.stateUUID = stateUUID;
this.clusterUUID = clusterUUID;
this.term = term;
this.version = version;
}

public GetTermVersionResponse(StreamInput in) throws IOException {
super(in);
this.clusterName = new ClusterName(in);
this.stateUUID = in.readString();
this.clusterUUID = in.readString();
this.term = in.readLong();
this.version = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
out.writeString(stateUUID);
out.writeString(clusterUUID);
out.writeLong(term);
out.writeLong(version);
}
Expand All @@ -63,13 +63,13 @@ public ClusterName getClusterName() {
return clusterName;
}

public String getStateUUID() {
return stateUUID;
public String getClusterUUID() {
return clusterUUID;
}

public boolean matches(ClusterState clusterState) {
return clusterName.equals(clusterState.getClusterName())
&& stateUUID.equals(clusterState.stateUUID())
&& clusterUUID.equals(clusterState.metadata().clusterUUID())
&& term == clusterState.term()
&& version == clusterState.version();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ protected void clusterManagerOperation(
}

private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) {
logger.trace("Serving cluster term version request using term {} and version {}", state.term(), state.version());
return new GetTermVersionResponse(state.getClusterName(), state.stateUUID(), state.term(), state.getVersion());
return new GetTermVersionResponse(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static org.opensearch.Version.CURRENT;
import static org.opensearch.Version.V_3_0_0;

/**
* A base class for operations that needs to be performed on the cluster-manager node.
Expand Down Expand Up @@ -272,9 +274,12 @@ protected void doStart(ClusterState clusterState) {
retryOnMasterChange(clusterState, null);
} else {
DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode();
boolean shouldCheckTerm = clusterManagerNode.getVersion().onOrAfter(CURRENT) && checkTermVersion();
if (shouldCheckTerm) {
execOnClusterManagerOnTermMismatch(clusterManagerNode, clusterState);
if (clusterManagerNode.getVersion().onOrAfter(V_3_0_0) && canUseLocalNodeClusterState()) {
BiConsumer<DiscoveryNode, ClusterState> executeOnLocalOrClusterManager = clusterStateLatestChecker(
this::executeOnLocalNode,
this::executeOnClusterManager
);
executeOnLocalOrClusterManager.accept(clusterManagerNode, clusterState);
} else {
executeOnClusterManager(clusterManagerNode, clusterState);
}
Expand Down Expand Up @@ -333,7 +338,8 @@ private ActionListener<Response> getDelegateForLocalExecute(ClusterState cluster
if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) {
logger.debug(
() -> new ParameterizedMessage(
"master could not publish cluster state or " + "stepped down before publishing action [{}], scheduling a retry",
"cluster-manager could not publish cluster state or "
+ "stepped down before publishing action [{}], scheduling a retry",
actionName
),
t
Expand All @@ -346,44 +352,58 @@ private ActionListener<Response> getDelegateForLocalExecute(ClusterState cluster
});
}

private void execOnClusterManagerOnTermMismatch(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
transportService.sendRequest(
clusterManagerNode,
GetTermVersionAction.NAME,
new GetTermVersionRequest(),
new TransportResponseHandler<GetTermVersionResponse>() {
@Override
public void handleResponse(GetTermVersionResponse response) {
boolean shouldExecuteOnClusterManger = !response.matches(clusterState);
if (shouldExecuteOnClusterManger) {
executeOnClusterManager(clusterManagerNode, clusterState);
} else {
Runnable runTask = ActionRunnable.wrap(
getDelegateForLocalExecute(clusterState),
l -> clusterManagerOperation(task, request, clusterState, l)
protected BiConsumer<DiscoveryNode, ClusterState> clusterStateLatestChecker(
Consumer<ClusterState> onLatestWithLocalState,
BiConsumer<DiscoveryNode, ClusterState> onMisMatchWithLocalState
) {
return (clusterManagerNode, clusterState) -> {
transportService.sendRequest(
clusterManagerNode,
GetTermVersionAction.NAME,
new GetTermVersionRequest(),
new TransportResponseHandler<GetTermVersionResponse>() {
@Override
public void handleResponse(GetTermVersionResponse response) {
boolean isLatestClusterStatePresentOnLocalNode = response.matches(clusterState);
logger.trace(
"Received GetTermVersionResponse response : term {}, version {}, latest-on-local {}",
response.getTerm(),
response.getVersion(),
isLatestClusterStatePresentOnLocalNode
);
threadPool.executor(executor).execute(runTask);
if (isLatestClusterStatePresentOnLocalNode) {
onLatestWithLocalState.accept(clusterState);
} else {
onMisMatchWithLocalState.accept(clusterManagerNode, clusterState);
}
}
}

@Override
public void handleException(TransportException exp) {
handleTransportException(clusterManagerNode, clusterState, exp);
}
@Override
public void handleException(TransportException exp) {
handleTransportException(clusterManagerNode, clusterState, exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public GetTermVersionResponse read(StreamInput in) throws IOException {
return new GetTermVersionResponse(in);
}
@Override
public GetTermVersionResponse read(StreamInput in) throws IOException {
return new GetTermVersionResponse(in);
}

}
}
);
};
}

private void executeOnLocalNode(ClusterState localClusterState) {
Runnable runTask = ActionRunnable.wrap(
getDelegateForLocalExecute(localClusterState),
l -> clusterManagerOperation(task, request, localClusterState, l)
);
threadPool.executor(executor).execute(runTask);
}

private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
Expand Down Expand Up @@ -421,7 +441,6 @@ private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterS
listener.onFailure(exp);
}
}

}

/**
Expand All @@ -444,13 +463,13 @@ protected String getMasterActionName(DiscoveryNode node) {
}

/**
* Determines if transport action needs to check local cluster-state term with manager before
* executing the action on manager. This is generally true for actions that are read-only and can be executed locally
* on node if the term matches with cluster-manager.
* @return - true to perform term check and then execute the action
* Override to true if the transport action need NOT be executed always on cluster-manager (example Read-only actions).
* The action is executed locally if this method returns true AND
* the ClusterState on local node is in-sync with ClusterManager.
*
* @return - boolean if the action can be run locally
*/
protected boolean checkTermVersion() {
protected boolean canUseLocalNodeClusterState() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void testTransportTermResponse() throws ExecutionException, InterruptedEx

assertThat(resp.getTerm(), is(clusterState.term()));
assertThat(resp.getVersion(), is(clusterState.version()));
assertThat(resp.getStateUUID(), is(clusterState.stateUUID()));
assertThat(resp.getClusterUUID(), is(clusterState.metadata().clusterUUID()));
assertThat(resp.getClusterName().value().startsWith("single-node-cluster"), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,6 @@
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected String executor() {
}

@Override
protected boolean checkTermVersion() {
protected boolean canUseLocalNodeClusterState() {
return true;
}

Expand Down Expand Up @@ -225,7 +225,7 @@ public void testTermCheckMatchWithClusterManager() throws ExecutionException, In
assertThat(capturedRequest.action, equalTo("cluster:monitor/term"));
GetTermVersionResponse response = new GetTermVersionResponse(
clusterService.state().getClusterName(),
clusterService.state().stateUUID(),
clusterService.state().metadata().clusterUUID(),
clusterService.state().term(),
clusterService.state().version()
);
Expand Down

0 comments on commit 8770af1

Please sign in to comment.