diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index 18a8ddd522023..9f40fe2e83461 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -232,7 +232,7 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi } @Override - protected boolean checkTermVersion() { + protected boolean canUseLocalNodeClusterState() { return true; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java index 85f2a7133e576..91e53e8d2b146 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java @@ -24,13 +24,13 @@ public class GetTermVersionResponse extends ActionResponse { private final ClusterName clusterName; - private final String stateUUID; + private final String clusterUUID; private final long term; private final long version; - public GetTermVersionResponse(ClusterName clusterName, String stateUUID, long term, long version) { + public GetTermVersionResponse(ClusterName clusterName, String clusterUUID, long term, long version) { this.clusterName = clusterName; - this.stateUUID = stateUUID; + this.clusterUUID = clusterUUID; this.term = term; this.version = version; } @@ -38,7 +38,7 @@ public GetTermVersionResponse(ClusterName clusterName, String stateUUID, long te public GetTermVersionResponse(StreamInput in) throws IOException { super(in); this.clusterName = new ClusterName(in); - this.stateUUID = in.readString(); + this.clusterUUID = in.readString(); this.term = in.readLong(); this.version = in.readLong(); } @@ -46,7 +46,7 @@ public GetTermVersionResponse(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { clusterName.writeTo(out); - out.writeString(stateUUID); + out.writeString(clusterUUID); out.writeLong(term); out.writeLong(version); } @@ -63,13 +63,13 @@ public ClusterName getClusterName() { return clusterName; } - public String getStateUUID() { - return stateUUID; + public String getClusterUUID() { + return clusterUUID; } public boolean matches(ClusterState clusterState) { return clusterName.equals(clusterState.getClusterName()) - && stateUUID.equals(clusterState.stateUUID()) + && clusterUUID.equals(clusterState.metadata().clusterUUID()) && term == clusterState.term() && version == clusterState.version(); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java index 5f51f16c2e80a..c043501ad0c4a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java @@ -79,7 +79,6 @@ protected void clusterManagerOperation( } private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) { - logger.trace("Serving cluster term version request using term {} and version {}", state.term(), state.version()); - return new GetTermVersionResponse(state.getClusterName(), state.stateUUID(), state.term(), state.getVersion()); + return new GetTermVersionResponse(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion()); } } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 329902dc5e397..57ea8f5382fab 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -73,9 +73,11 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Predicate; -import static org.opensearch.Version.CURRENT; +import static org.opensearch.Version.V_3_0_0; /** * A base class for operations that needs to be performed on the cluster-manager node. @@ -272,9 +274,12 @@ protected void doStart(ClusterState clusterState) { retryOnMasterChange(clusterState, null); } else { DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode(); - boolean shouldCheckTerm = clusterManagerNode.getVersion().onOrAfter(CURRENT) && checkTermVersion(); - if (shouldCheckTerm) { - execOnClusterManagerOnTermMismatch(clusterManagerNode, clusterState); + if (clusterManagerNode.getVersion().onOrAfter(V_3_0_0) && canUseLocalNodeClusterState()) { + BiConsumer executeOnLocalOrClusterManager = clusterStateLatestChecker( + this::executeOnLocalNode, + this::executeOnClusterManager + ); + executeOnLocalOrClusterManager.accept(clusterManagerNode, clusterState); } else { executeOnClusterManager(clusterManagerNode, clusterState); } @@ -333,7 +338,8 @@ private ActionListener getDelegateForLocalExecute(ClusterState cluster if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) { logger.debug( () -> new ParameterizedMessage( - "master could not publish cluster state or " + "stepped down before publishing action [{}], scheduling a retry", + "cluster-manager could not publish cluster state or " + + "stepped down before publishing action [{}], scheduling a retry", actionName ), t @@ -346,44 +352,58 @@ private ActionListener getDelegateForLocalExecute(ClusterState cluster }); } - private void execOnClusterManagerOnTermMismatch(DiscoveryNode clusterManagerNode, ClusterState clusterState) { - transportService.sendRequest( - clusterManagerNode, - GetTermVersionAction.NAME, - new GetTermVersionRequest(), - new TransportResponseHandler() { - @Override - public void handleResponse(GetTermVersionResponse response) { - boolean shouldExecuteOnClusterManger = !response.matches(clusterState); - if (shouldExecuteOnClusterManger) { - executeOnClusterManager(clusterManagerNode, clusterState); - } else { - Runnable runTask = ActionRunnable.wrap( - getDelegateForLocalExecute(clusterState), - l -> clusterManagerOperation(task, request, clusterState, l) + protected BiConsumer clusterStateLatestChecker( + Consumer onLatestWithLocalState, + BiConsumer onMisMatchWithLocalState + ) { + return (clusterManagerNode, clusterState) -> { + transportService.sendRequest( + clusterManagerNode, + GetTermVersionAction.NAME, + new GetTermVersionRequest(), + new TransportResponseHandler() { + @Override + public void handleResponse(GetTermVersionResponse response) { + boolean isLatestClusterStatePresentOnLocalNode = response.matches(clusterState); + logger.trace( + "Received GetTermVersionResponse response : term {}, version {}, latest-on-local {}", + response.getTerm(), + response.getVersion(), + isLatestClusterStatePresentOnLocalNode ); - threadPool.executor(executor).execute(runTask); + if (isLatestClusterStatePresentOnLocalNode) { + onLatestWithLocalState.accept(clusterState); + } else { + onMisMatchWithLocalState.accept(clusterManagerNode, clusterState); + } } - } - @Override - public void handleException(TransportException exp) { - handleTransportException(clusterManagerNode, clusterState, exp); - } + @Override + public void handleException(TransportException exp) { + handleTransportException(clusterManagerNode, clusterState, exp); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - @Override - public GetTermVersionResponse read(StreamInput in) throws IOException { - return new GetTermVersionResponse(in); - } + @Override + public GetTermVersionResponse read(StreamInput in) throws IOException { + return new GetTermVersionResponse(in); + } - } + } + ); + }; + } + private void executeOnLocalNode(ClusterState localClusterState) { + Runnable runTask = ActionRunnable.wrap( + getDelegateForLocalExecute(localClusterState), + l -> clusterManagerOperation(task, request, localClusterState, l) ); + threadPool.executor(executor).execute(runTask); } private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) { @@ -421,7 +441,6 @@ private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterS listener.onFailure(exp); } } - } /** @@ -444,13 +463,13 @@ protected String getMasterActionName(DiscoveryNode node) { } /** - * Determines if transport action needs to check local cluster-state term with manager before - * executing the action on manager. This is generally true for actions that are read-only and can be executed locally - * on node if the term matches with cluster-manager. - * @return - true to perform term check and then execute the action + * Override to true if the transport action need NOT be executed always on cluster-manager (example Read-only actions). + * The action is executed locally if this method returns true AND + * the ClusterState on local node is in-sync with ClusterManager. + * + * @return - boolean if the action can be run locally */ - protected boolean checkTermVersion() { + protected boolean canUseLocalNodeClusterState() { return false; } - } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java index fba9ef4b2f526..0d7c5ae3df670 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java @@ -27,7 +27,7 @@ public void testTransportTermResponse() throws ExecutionException, InterruptedEx assertThat(resp.getTerm(), is(clusterState.term())); assertThat(resp.getVersion(), is(clusterState.version())); - assertThat(resp.getStateUUID(), is(clusterState.stateUUID())); + assertThat(resp.getClusterUUID(), is(clusterState.metadata().clusterUUID())); assertThat(resp.getClusterName().value().startsWith("single-node-cluster"), is(true)); } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 9ae1310a8b15c..538416e1137f5 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -6,24 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java index f1e672093f177..0dce99e6b6255 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java @@ -189,7 +189,7 @@ protected String executor() { } @Override - protected boolean checkTermVersion() { + protected boolean canUseLocalNodeClusterState() { return true; } @@ -225,7 +225,7 @@ public void testTermCheckMatchWithClusterManager() throws ExecutionException, In assertThat(capturedRequest.action, equalTo("cluster:monitor/term")); GetTermVersionResponse response = new GetTermVersionResponse( clusterService.state().getClusterName(), - clusterService.state().stateUUID(), + clusterService.state().metadata().clusterUUID(), clusterService.state().term(), clusterService.state().version() );