Skip to content

Commit

Permalink
Create RemoteStatePublishRequest, simplify if checks in CoordinationS…
Browse files Browse the repository at this point in the history
…tate

Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Sep 4, 2024
1 parent c03b343 commit bd9c314
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& localNode.isRemoteStatePublicationEnabled();
Expand Down Expand Up @@ -461,7 +462,6 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);

if (shouldUpdateRemotePersistedState(publishRequest)) {
updateRemotePersistedStateOnPublishRequest(publishRequest);
}
Expand Down Expand Up @@ -626,32 +626,28 @@ public void close() throws IOException {
}

private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
return isRemotePublicationEnabled
&& localNode.isClusterManagerNode()
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
}

private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
if (publishRequest.hasManifest()) {
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null;
if (publishRequest instanceof RemoteStatePublishRequest) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
.setLastAcceptedManifest(publishRequest.getAcceptedManifest().get());
.setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest());
} else {
// We will end up here if PublishRequest was sent not using Remote Store even with remotePublication enabled on this node
// We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
}
}

private boolean shouldCommitRemotePersistedState() {
return isRemotePublicationEnabled
&& localNode.isClusterManagerNode()
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.getNodes()
.isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterM
}
}
if (manifest != null) {
return handlePublishRequest.apply(new PublishRequest(incomingState, manifest));
return handlePublishRequest.apply(new RemoteStatePublishRequest(incomingState, manifest));
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
}
Expand Down Expand Up @@ -542,7 +542,7 @@ public String executor() {
}

public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
logger.info("sending cluster state over transport to node: {}", destination.getName());
logger.debug("sending cluster state over transport to node: {}", destination.getName());
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, listener);
Expand Down Expand Up @@ -642,7 +642,7 @@ public class RemotePublicationContext extends PublicationContext {
@Override
public void sendClusterState(final DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
try {
logger.info("sending remote cluster state to node: {}", destination.getName());
logger.debug("sending remote cluster state to node: {}", destination.getName());
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;
import java.util.Optional;

/**
* Request which is used by the cluster-manager node to publish cluster state changes.
Expand All @@ -46,57 +44,32 @@
public class PublishRequest {

private final ClusterState acceptedState;
private final ClusterMetadataManifest acceptedManifest;

public PublishRequest(ClusterState acceptedState) {
this.acceptedState = acceptedState;
this.acceptedManifest = null;
}

public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) {
this.acceptedState = acceptedState;
this.acceptedManifest = acceptedManifest;
}

public ClusterState getAcceptedState() {
return acceptedState;
}

public boolean hasManifest() {
return acceptedManifest != null;
}

public Optional<ClusterMetadataManifest> getAcceptedManifest() {
return Optional.ofNullable(acceptedManifest);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PublishRequest)) return false;

PublishRequest that = (PublishRequest) o;

return acceptedState.term() == that.acceptedState.term()
&& acceptedState.version() == that.acceptedState.version()
&& Objects.equals(acceptedManifest, that.acceptedManifest);
return acceptedState.term() == that.acceptedState.term() && acceptedState.version() == that.acceptedState.version();
}

@Override
public int hashCode() {
return Objects.hash(acceptedState.term(), acceptedState.version(), acceptedManifest);
return Objects.hash(acceptedState.term(), acceptedState.version());
}

@Override
public String toString() {
return "PublishRequest{term="
+ acceptedState.term()
+ ", version="
+ acceptedState.version()
+ ", state="
+ acceptedState
+ ", manifest="
+ acceptedManifest
+ '}';
return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

/**
* PublishRequest created by downloading the accepted {@link ClusterState} from Remote Store, using the published {@link ClusterMetadataManifest}
*
* @opensearch.internal
*/
public class RemoteStatePublishRequest extends PublishRequest {
private final ClusterMetadataManifest manifest;

public RemoteStatePublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) {
super(acceptedState);
this.manifest = acceptedManifest;
}

public ClusterMetadataManifest getAcceptedManifest() {
return manifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RemoteStatePublishRequest that = (RemoteStatePublishRequest) o;
return Objects.equals(manifest, that.manifest);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), manifest);
}

@Override
public String toString() {
return "RemoteStatePublishRequest{" + super.toString() + "manifest=" + manifest + "} ";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(
emptyMap(),
false,
emptyList(),
emptyMap()
null
).uploadedCoordinationMetadata;
}
UploadedMetadataResults uploadedMetadataResults = new UploadedMetadataResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ public void testHandlePublishRequestOnFollowerWhenRemotePublicationEnabled() {
.clusterUUIDCommitted(true)
.build();

PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishRequest(state2, manifest));
PublishResponse publishResponse = coordinationState.handlePublishRequest(new RemoteStatePublishRequest(state2, manifest));
assertEquals(state2.term(), publishResponse.getTerm());
assertEquals(state2.version(), publishResponse.getVersion());
verifyNoInteractions(remoteClusterStateService);
Expand Down Expand Up @@ -1141,7 +1141,7 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabled() {
.build();

PublishRequest publishRequest = coordinationState.handleClientValue(state2);
coordinationState.handlePublishRequest(new PublishRequest(publishRequest.getAcceptedState(), manifest));
coordinationState.handlePublishRequest(new RemoteStatePublishRequest(publishRequest.getAcceptedState(), manifest));
ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version());
coordinationState.handleCommit(applyCommitRequest);
verifyNoInteractions(remoteClusterStateService);
Expand Down

0 comments on commit bd9c314

Please sign in to comment.