Skip to content

Commit

Permalink
added ClusterStateTermVersion as a seperate class for reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Mar 8, 2024
1 parent 8770af1 commit aff33ff
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -23,55 +23,32 @@
*/
public class GetTermVersionResponse extends ActionResponse {

private final ClusterName clusterName;
private final String clusterUUID;
private final long term;
private final long version;
private final ClusterStateTermVersion clusterStateTermVersion;

public GetTermVersionResponse(ClusterName clusterName, String clusterUUID, long term, long version) {
this.clusterName = clusterName;
this.clusterUUID = clusterUUID;
this.term = term;
this.version = version;
public GetTermVersionResponse(ClusterStateTermVersion clusterStateTermVersion) {
this.clusterStateTermVersion = clusterStateTermVersion;
}

public GetTermVersionResponse(StreamInput in) throws IOException {
super(in);
this.clusterName = new ClusterName(in);
this.clusterUUID = in.readString();
this.term = in.readLong();
this.version = in.readLong();
this.clusterStateTermVersion = new ClusterStateTermVersion(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
out.writeString(clusterUUID);
out.writeLong(term);
out.writeLong(version);
clusterStateTermVersion.writeTo(out);
}

public long getTerm() {
return term;
}

public long getVersion() {
return version;
}

public ClusterName getClusterName() {
return clusterName;
}

public String getClusterUUID() {
return clusterUUID;
public ClusterStateTermVersion getClusterStateTermVersion() {
return clusterStateTermVersion;
}

public boolean matches(ClusterState clusterState) {
return clusterName.equals(clusterState.getClusterName())
&& clusterUUID.equals(clusterState.metadata().clusterUUID())
&& term == clusterState.term()
&& version == clusterState.version();
return clusterStateTermVersion != null
&& clusterStateTermVersion.getClusterName().equals(clusterState.getClusterName())
&& clusterStateTermVersion.getClusterUUID().equals(clusterState.metadata().clusterUUID())
&& clusterStateTermVersion.getTerm() == clusterState.term()
&& clusterStateTermVersion.getVersion() == clusterState.version();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -79,6 +80,8 @@ protected void clusterManagerOperation(
}

private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) {
return new GetTermVersionResponse(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion());
return new GetTermVersionResponse(
new ClusterStateTermVersion(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,8 @@ protected BiConsumer<DiscoveryNode, ClusterState> clusterStateLatestChecker(
public void handleResponse(GetTermVersionResponse response) {
boolean isLatestClusterStatePresentOnLocalNode = response.matches(clusterState);
logger.trace(
"Received GetTermVersionResponse response : term {}, version {}, latest-on-local {}",
response.getTerm(),
response.getVersion(),
"Received GetTermVersionResponse response : ClusterStateTermVersion {}, latest-on-local {}",
response.getClusterStateTermVersion(),
isLatestClusterStatePresentOnLocalNode
);
if (isLatestClusterStatePresentOnLocalNode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterName;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;

public class ClusterStateTermVersion implements Writeable {

private final ClusterName clusterName;
private final String clusterUUID;
private final long term;
private final long version;

public ClusterStateTermVersion(ClusterName clusterName, String clusterUUID, long term, long version) {
this.clusterName = clusterName;
this.clusterUUID = clusterUUID;
this.term = term;
this.version = version;
}

public ClusterStateTermVersion(StreamInput in) throws IOException {
this.clusterName = new ClusterName(in);
this.clusterUUID = in.readString();
this.term = in.readLong();
this.version = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
out.writeString(clusterUUID);
out.writeLong(term);
out.writeLong(version);
}

public ClusterName getClusterName() {
return clusterName;
}

public String getClusterUUID() {
return clusterUUID;
}

public long getTerm() {
return term;
}

public long getVersion() {
return version;
}

@Override
public String toString() {
return "ClusterStateTermVersion{"
+ "clusterName="
+ clusterName
+ ", clusterUUID='"
+ clusterUUID
+ '\''
+ ", term="
+ term
+ ", version="
+ version
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ClusterStateTermVersion that = (ClusterStateTermVersion) o;

if (term != that.term) return false;
if (version != that.version) return false;
if (!clusterName.equals(that.clusterName)) return false;
return clusterUUID.equals(that.clusterUUID);
}

@Override
public int hashCode() {
int result = clusterName.hashCode();
result = 31 * result + clusterUUID.hashCode();
result = 31 * result + (int) (term ^ (term >>> 32));
result = 31 * result + (int) (version ^ (version >>> 32));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -113,7 +114,7 @@ private void addCallCountInterceptor(String nodeName, Map<String, AtomicInteger>
private void stubClusterTermResponse(String master) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
channel.sendResponse(new GetTermVersionResponse(new ClusterName("test"), "1", -1, -1));
channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,19 @@

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.is;

public class ClusterTermVersionTests extends OpenSearchSingleNodeTestCase {

public void testTransportTermResponse() throws ExecutionException, InterruptedException {
GetTermVersionRequest request = new GetTermVersionRequest();
GetTermVersionResponse resp = client().execute(GetTermVersionAction.INSTANCE, request).get();

final ClusterService clusterService = getInstanceFromNode(ClusterService.class);
final ClusterState clusterState = clusterService.state();

assertThat(resp.getTerm(), is(clusterState.term()));
assertThat(resp.getVersion(), is(clusterState.version()));
assertThat(resp.getClusterUUID(), is(clusterState.metadata().clusterUUID()));
assertThat(resp.getClusterName().value().startsWith("single-node-cluster"), is(true));
assertTrue(resp.matches(clusterService.state()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -224,10 +225,12 @@ public void testTermCheckMatchWithClusterManager() throws ExecutionException, In
assertTrue(capturedRequest.node.isClusterManagerNode());
assertThat(capturedRequest.action, equalTo("cluster:monitor/term"));
GetTermVersionResponse response = new GetTermVersionResponse(
clusterService.state().getClusterName(),
clusterService.state().metadata().clusterUUID(),
clusterService.state().term(),
clusterService.state().version()
new ClusterStateTermVersion(
clusterService.state().getClusterName(),
clusterService.state().metadata().clusterUUID(),
clusterService.state().term(),
clusterService.state().version()
)
);
transport.handleResponse(capturedRequest.requestId, response);
assertTrue(listener.isDone());
Expand All @@ -248,10 +251,12 @@ public void testTermCheckNoMatchWithClusterManager() throws ExecutionException,
assertTrue(termCheckRequest.node.isClusterManagerNode());
assertThat(termCheckRequest.action, equalTo("cluster:monitor/term"));
GetTermVersionResponse termVersionResponse = new GetTermVersionResponse(
clusterService.state().getClusterName(),
clusterService.state().stateUUID(),
clusterService.state().term(),
clusterService.state().version() - 1
new ClusterStateTermVersion(
clusterService.state().getClusterName(),
clusterService.state().stateUUID(),
clusterService.state().term(),
clusterService.state().version() - 1
)
);
transport.handleResponse(termCheckRequest.requestId, termVersionResponse);
assertFalse(listener.isDone());
Expand Down

0 comments on commit aff33ff

Please sign in to comment.