From aff33ff3c98f2643bbebea979d6e7cd300b85524 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Fri, 8 Mar 2024 18:51:10 +0530 Subject: [PATCH] added ClusterStateTermVersion as a seperate class for reuse --- .../state/term/GetTermVersionResponse.java | 49 +++------ .../term/TransportGetTermVersionAction.java | 5 +- .../TransportClusterManagerNodeAction.java | 5 +- .../coordination/ClusterStateTermVersion.java | 99 +++++++++++++++++++ .../state/term/ClusterTermVersionIT.java | 3 +- .../state/term/ClusterTermVersionTests.java | 9 +- ...TransportClusterManagerTermCheckTests.java | 21 ++-- 7 files changed, 134 insertions(+), 57 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java index 91e53e8d2b146..4b0cfce9f717f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java @@ -8,8 +8,8 @@ package org.opensearch.action.admin.cluster.state.term; -import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -23,55 +23,32 @@ */ public class GetTermVersionResponse extends ActionResponse { - private final ClusterName clusterName; - private final String clusterUUID; - private final long term; - private final long version; + private final ClusterStateTermVersion clusterStateTermVersion; - public GetTermVersionResponse(ClusterName clusterName, String clusterUUID, long term, long version) { - this.clusterName = clusterName; - this.clusterUUID = clusterUUID; - this.term = term; - this.version = version; + public GetTermVersionResponse(ClusterStateTermVersion clusterStateTermVersion) { + this.clusterStateTermVersion = clusterStateTermVersion; } public GetTermVersionResponse(StreamInput in) throws IOException { super(in); - this.clusterName = new ClusterName(in); - this.clusterUUID = in.readString(); - this.term = in.readLong(); - this.version = in.readLong(); + this.clusterStateTermVersion = new ClusterStateTermVersion(in); } @Override public void writeTo(StreamOutput out) throws IOException { - clusterName.writeTo(out); - out.writeString(clusterUUID); - out.writeLong(term); - out.writeLong(version); + clusterStateTermVersion.writeTo(out); } - public long getTerm() { - return term; - } - - public long getVersion() { - return version; - } - - public ClusterName getClusterName() { - return clusterName; - } - - public String getClusterUUID() { - return clusterUUID; + public ClusterStateTermVersion getClusterStateTermVersion() { + return clusterStateTermVersion; } public boolean matches(ClusterState clusterState) { - return clusterName.equals(clusterState.getClusterName()) - && clusterUUID.equals(clusterState.metadata().clusterUUID()) - && term == clusterState.term() - && version == clusterState.version(); + return clusterStateTermVersion != null + && clusterStateTermVersion.getClusterName().equals(clusterState.getClusterName()) + && clusterStateTermVersion.getClusterUUID().equals(clusterState.metadata().clusterUUID()) + && clusterStateTermVersion.getTerm() == clusterState.term() + && clusterStateTermVersion.getVersion() == clusterState.version(); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java index c043501ad0c4a..6c2e20695df5e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java @@ -14,6 +14,7 @@ import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -79,6 +80,8 @@ protected void clusterManagerOperation( } private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) { - return new GetTermVersionResponse(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion()); + return new GetTermVersionResponse( + new ClusterStateTermVersion(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion()) + ); } } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 57ea8f5382fab..a976dd87ca213 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -366,9 +366,8 @@ protected BiConsumer clusterStateLatestChecker( public void handleResponse(GetTermVersionResponse response) { boolean isLatestClusterStatePresentOnLocalNode = response.matches(clusterState); logger.trace( - "Received GetTermVersionResponse response : term {}, version {}, latest-on-local {}", - response.getTerm(), - response.getVersion(), + "Received GetTermVersionResponse response : ClusterStateTermVersion {}, latest-on-local {}", + response.getClusterStateTermVersion(), isLatestClusterStatePresentOnLocalNode ); if (isLatestClusterStatePresentOnLocalNode) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java new file mode 100644 index 0000000000000..5c292e85d5301 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; + +public class ClusterStateTermVersion implements Writeable { + + private final ClusterName clusterName; + private final String clusterUUID; + private final long term; + private final long version; + + public ClusterStateTermVersion(ClusterName clusterName, String clusterUUID, long term, long version) { + this.clusterName = clusterName; + this.clusterUUID = clusterUUID; + this.term = term; + this.version = version; + } + + public ClusterStateTermVersion(StreamInput in) throws IOException { + this.clusterName = new ClusterName(in); + this.clusterUUID = in.readString(); + this.term = in.readLong(); + this.version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + clusterName.writeTo(out); + out.writeString(clusterUUID); + out.writeLong(term); + out.writeLong(version); + } + + public ClusterName getClusterName() { + return clusterName; + } + + public String getClusterUUID() { + return clusterUUID; + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + @Override + public String toString() { + return "ClusterStateTermVersion{" + + "clusterName=" + + clusterName + + ", clusterUUID='" + + clusterUUID + + '\'' + + ", term=" + + term + + ", version=" + + version + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ClusterStateTermVersion that = (ClusterStateTermVersion) o; + + if (term != that.term) return false; + if (version != that.version) return false; + if (!clusterName.equals(that.clusterName)) return false; + return clusterUUID.equals(that.clusterUUID); + } + + @Override + public int hashCode() { + int result = clusterName.hashCode(); + result = 31 * result + clusterUUID.hashCode(); + result = 31 * result + (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + return result; + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java index 8740edcea2398..fa2a6121af349 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java @@ -12,6 +12,7 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -113,7 +114,7 @@ private void addCallCountInterceptor(String nodeName, Map private void stubClusterTermResponse(String master) { MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> { - channel.sendResponse(new GetTermVersionResponse(new ClusterName("test"), "1", -1, -1)); + channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))); }); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java index 0d7c5ae3df670..22d9623eebdbe 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java @@ -8,14 +8,11 @@ package org.opensearch.action.admin.cluster.state.term; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.util.concurrent.ExecutionException; -import static org.hamcrest.Matchers.is; - public class ClusterTermVersionTests extends OpenSearchSingleNodeTestCase { public void testTransportTermResponse() throws ExecutionException, InterruptedException { @@ -23,11 +20,7 @@ public void testTransportTermResponse() throws ExecutionException, InterruptedEx GetTermVersionResponse resp = client().execute(GetTermVersionAction.INSTANCE, request).get(); final ClusterService clusterService = getInstanceFromNode(ClusterService.class); - final ClusterState clusterState = clusterService.state(); - assertThat(resp.getTerm(), is(clusterState.term())); - assertThat(resp.getVersion(), is(clusterState.version())); - assertThat(resp.getClusterUUID(), is(clusterState.metadata().clusterUUID())); - assertThat(resp.getClusterName().value().startsWith("single-node-cluster"), is(true)); + assertTrue(resp.matches(clusterService.state())); } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java index 0dce99e6b6255..1890a8d646e35 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.replication.ClusterStateCreationUtils; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -224,10 +225,12 @@ public void testTermCheckMatchWithClusterManager() throws ExecutionException, In assertTrue(capturedRequest.node.isClusterManagerNode()); assertThat(capturedRequest.action, equalTo("cluster:monitor/term")); GetTermVersionResponse response = new GetTermVersionResponse( - clusterService.state().getClusterName(), - clusterService.state().metadata().clusterUUID(), - clusterService.state().term(), - clusterService.state().version() + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term(), + clusterService.state().version() + ) ); transport.handleResponse(capturedRequest.requestId, response); assertTrue(listener.isDone()); @@ -248,10 +251,12 @@ public void testTermCheckNoMatchWithClusterManager() throws ExecutionException, assertTrue(termCheckRequest.node.isClusterManagerNode()); assertThat(termCheckRequest.action, equalTo("cluster:monitor/term")); GetTermVersionResponse termVersionResponse = new GetTermVersionResponse( - clusterService.state().getClusterName(), - clusterService.state().stateUUID(), - clusterService.state().term(), - clusterService.state().version() - 1 + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().stateUUID(), + clusterService.state().term(), + clusterService.state().version() - 1 + ) ); transport.handleResponse(termCheckRequest.requestId, termVersionResponse); assertFalse(listener.isDone());