Skip to content

Commit

Permalink
Make Remote Publication a dynamic setting
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 authored and soosinha committed Oct 10, 2024
1 parent 58adc18 commit 11fe7ad
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 16 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
Expand Down Expand Up @@ -87,7 +88,8 @@ public CoordinationState(
DiscoveryNode localNode,
PersistedStateRegistry persistedStateRegistry,
ElectionStrategy electionStrategy,
Settings settings
Settings settings,
ClusterSettings clusterSettings
) {
this.localNode = localNode;

Expand All @@ -105,10 +107,10 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
}

public boolean isRemotePublicationEnabled() {
Expand Down Expand Up @@ -651,6 +653,15 @@ private boolean shouldCommitRemotePersistedState() {
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isRemotePublicationEnabled = false;
} else {
this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured();
}

}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteClusterStateService remoteClusterStateService;
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final RemoteClusterStateService remoteClusterStateService;
private final ClusterSettings clusterSettings;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -314,6 +316,7 @@ public Coordinator(
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
this.clusterSettings = clusterSettings;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -869,7 +872,9 @@ boolean publicationInProgress() {
@Override
protected void doStart() {
synchronized (mutex) {
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings));
coordinationState.set(
new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings)
);
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand All @@ -132,7 +134,7 @@ public class RemoteClusterStateService implements Closeable {
REMOTE_PUBLICATION_SETTING_KEY,
false,
Property.NodeScope,
Property.Final
Property.Dynamic
);

/**
Expand Down Expand Up @@ -232,7 +234,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;
private boolean isPublicationEnabled;
private final String remotePathPrefix;

private final RemoteClusterStateCache remoteClusterStateCache;
Expand Down Expand Up @@ -273,9 +275,10 @@ public RemoteClusterStateService(
this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings)
this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
Expand Down Expand Up @@ -1115,6 +1118,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl
this.remoteClusterStateValidationMode = remoteClusterStateValidationMode;
}

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isPublicationEnabled = false;
} else {
this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
}
}

// Package private for unit test
RemoteRoutingTableService getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class RemotePersistenceStats {
RemoteDownloadStats remoteDiffDownloadStats;
RemoteDownloadStats remoteFullDownloadStats;

final String FULL_DOWNLOAD_STATS = "remote_full_download";
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";
public static final String FULL_DOWNLOAD_STATS = "remote_full_download";
public static final String DIFF_DOWNLOAD_STATS = "remote_diff_download";

public RemotePersistenceStats() {
remoteUploadStats = new RemoteUploadStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ public static CoordinationState createCoordinationState(
DiscoveryNode localNode,
Settings settings
) {
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings);
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null);
}

public static ClusterState clusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ public void testPrevotingIndicatesElectionSuccess() {
localNode,
persistedStateRegistry,
ElectionStrategy.DEFAULT_INSTANCE,
Settings.EMPTY
Settings.EMPTY,
null
);

final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ class MockNode {
);
PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState));
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY);
coordinationState = new CoordinationState(
localNode,
persistedStateRegistry,
ElectionStrategy.DEFAULT_INSTANCE,
Settings.EMPTY,
null
);
}

final DiscoveryNode localNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static class ClusterNode {
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);

this.electionStrategy = electionStrategy;
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
}

void reboot() {
Expand Down Expand Up @@ -189,7 +189,7 @@ void reboot() {
localNode.getVersion()
);

state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
}

void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {
Expand Down

0 comments on commit 11fe7ad

Please sign in to comment.