diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 578c922c80a0d..3a3b28e91dbe6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -8,7 +8,10 @@ package org.opensearch.gateway.remote; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; @@ -19,6 +22,8 @@ import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.gateway.GatewayMetaState; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; @@ -27,19 +32,32 @@ import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginsService; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; +import org.opensearch.tasks.Task; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; import org.junit.Before; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -50,6 +68,8 @@ import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS; import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; +import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_REMOTE_STATE_ACTION_NAME; +import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME; import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; @@ -57,6 +77,8 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteDownloadStats.CHECKSUM_VALIDATION_FAILED_COUNT; +import static org.opensearch.gateway.remote.RemotePersistenceStats.DIFF_DOWNLOAD_STATS; +import static org.opensearch.gateway.remote.RemotePersistenceStats.FULL_DOWNLOAD_STATS; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; @@ -88,6 +110,7 @@ public void setup() { isRemotePublicationEnabled = true; hasRemoteStateCharPrefix = randomBoolean(); hasRemoteRoutingCharPrefix = randomBoolean(); + clearInterceptedActions(); } @Override @@ -132,6 +155,13 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(InterceptingTransportService.TestPlugin.class); + return plugins; + } + public void testPublication() throws Exception { // create cluster with multi node (3 master + 2 data) prepareCluster(3, 2, INDEX_NAME, 1, 2); @@ -232,7 +262,7 @@ public void testRemotePublicationDownloadStats() { .addMetric(DISCOVERY.metricName()) .get(); - assertDataNodeDownloadStats(nodesStatsResponseDataNode); + assertDataNodeDownloadStats(nodesStatsResponseDataNode.getNodes().get(0)); } public void testRemotePublicationDisabledByRollingRestart() throws Exception { @@ -395,9 +425,162 @@ public void testVotingConfigAreCommitted() throws ExecutionException, Interrupte }); } - private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) { + public void testRemotePublicationDisabledAfterSettingChange() throws ExecutionException, InterruptedException { + prepareCluster(3, 2, Settings.EMPTY); + ensureStableCluster(5); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + + // disable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build()) + .get(); + + clearInterceptedActions(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + createIndex(INDEX_NAME + "2", remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME + "2"); + + assertTrue(getRequestCount(PUBLISH_STATE_ACTION_NAME) > 0); + assertEquals(0, (int) getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME)); + } + + public void testRemotePublicationEnabledAfterSettingChange() { + isRemotePublicationEnabled = false; + prepareCluster(3, 2, Settings.EMPTY); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertTrue(getRequestCount(PUBLISH_STATE_ACTION_NAME) > 0); + assertEquals(0, (int) getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME)); + + // enable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, true).build()) + .get(); + + clearInterceptedActions(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + createIndex(INDEX_NAME + "2", remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME + "2"); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + } + + public void testRemotePublicationSettingChangePersistedAfterRestart() throws Exception { + isRemotePublicationEnabled = false; + prepareCluster(3, 2, Settings.EMPTY); + ensureStableCluster(5); + // enable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, true).build()) + .get(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + + // disable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build()) + .get(); + + clearInterceptedActions(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { + @Override + public void doAfterNodes(int n, Client client) { + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + } + }); + + ensureStableCluster(5); + assertEquals(0, (int) getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_STATE_ACTION_NAME) > 0); + } + + public void testRemotePublicationSettingChangePersistedAfterFullRestart() throws Exception { + isRemotePublicationEnabled = false; + prepareCluster(3, 2, Settings.EMPTY); + ensureStableCluster(5); + // enable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, true).build()) + .get(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + + // disable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build()) + .get(); + + internalCluster().fullRestart(); + + ensureStableCluster(5); + NodesStatsResponse response = internalCluster().client() + .admin() + .cluster() + .prepareNodesStats(internalCluster().getNodeNames()) + .clear() + .addMetric(DISCOVERY.metricName()) + .get(); + response.getNodes().forEach(nodeStats -> { + DiscoveryStats discoveryStats = nodeStats.getDiscoveryStats(); + assertNotNull(discoveryStats.getClusterStateStats()); + // ensure none of the nodes received remote publication + discoveryStats.getClusterStateStats() + .getPersistenceStats() + .stream() + .filter( + persistedStateStats -> persistedStateStats.getStatsName().equals(FULL_DOWNLOAD_STATS) + || persistedStateStats.getStatsName().equals(DIFF_DOWNLOAD_STATS) + ) + .forEach(persistedStateStats -> { + assertEquals(0, persistedStateStats.getSuccessCount()); + assertEquals(0, persistedStateStats.getFailedCount()); + assertEquals(0, persistedStateStats.getTotalTimeInMillis()); + }); + }); + } + + private void assertDataNodeDownloadStats(NodeStats nodeStats) { // assert cluster state stats for data node - DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats(); + DiscoveryStats dataNodeDiscoveryStats = nodeStats.getDiscoveryStats(); assertNotNull(dataNodeDiscoveryStats.getClusterStateStats()); assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess()); assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0); @@ -442,4 +625,94 @@ private Map getMetadataFiles(BlobStoreRepository repository, St return fileName.split(DELIMITER)[0]; }).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum)); } + + private static void clearInterceptedActions() { + Iterable pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance + .clearInterceptedActions(); + } + } + + private static void interceptTransportActions(String... actions) { + Iterable pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance + .interceptTransportActions(actions); + } + } + + private static Integer getRequestCount(String action) { + Iterable pluginsServices = internalCluster().getInstances(PluginsService.class); + return pluginsServices.iterator() + .next() + .filterPlugins(InterceptingTransportService.TestPlugin.class) + .stream() + .findFirst() + .get().instance.getRequestCount(action); + } + + public static class InterceptingTransportService implements TransportInterceptor { + + public static class TestPlugin extends Plugin implements NetworkPlugin { + public final InterceptingTransportService instance = new InterceptingTransportService(); + + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + return Collections.singletonList(instance); + } + } + + private final Set actions = new HashSet<>(); + private final Map requests = new HashMap<>(); + + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new InterceptingTransportService.InterceptingHandler<>(action, actualHandler); + } + + synchronized Integer getRequestCount(String action) { + return requests.getOrDefault(action, 0); + } + + synchronized void interceptTransportActions(String... actions) { + Collections.addAll(this.actions, actions); + } + + synchronized void clearInterceptedActions() { + actions.clear(); + requests.clear(); + } + + private class InterceptingHandler implements TransportRequestHandler { + private final String action; + private final TransportRequestHandler handler; + private final Logger logger = LogManager.getLogger(InterceptingHandler.class); + + InterceptingHandler(String action, TransportRequestHandler handler) { + this.action = action; + this.handler = handler; + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + synchronized (this) { + if (actions.contains(action)) { + Integer requestCount = requests.getOrDefault(action, 0); + requests.put(action, requestCount + 1); + logger.info("intercepted action: {} count: {}", action, requestCount + 1); + } + } + handler.messageReceived(request, channel, task); + } + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9cffc7051d756..c1d5c0949e07e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -87,7 +88,8 @@ public CoordinationState( DiscoveryNode localNode, PersistedStateRegistry persistedStateRegistry, ElectionStrategy electionStrategy, - Settings settings + Settings settings, + ClusterSettings clusterSettings ) { this.localNode = localNode; @@ -105,10 +107,10 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); - // ToDo: revisit this check while making the setting dynamic this.isRemotePublicationEnabled = isRemoteStateEnabled && REMOTE_PUBLICATION_SETTING.get(settings) - && localNode.isRemoteStatePublicationEnabled(); + && localNode.isRemoteStatePublicationConfigured(); + clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); } public boolean isRemotePublicationEnabled() { @@ -651,6 +653,15 @@ private boolean shouldCommitRemotePersistedState() { && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; } + private void setRemotePublicationSetting(boolean remotePublicationSetting) { + if (remotePublicationSetting == false) { + this.isRemotePublicationEnabled = false; + } else { + this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured(); + } + + } + /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 9859abe503eaa..56172bf376e7a 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -187,8 +187,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; private final PersistedStateRegistry persistedStateRegistry; + private final RemoteClusterStateService remoteClusterStateService; private final RemoteStoreNodeService remoteStoreNodeService; private NodeConnectionsService nodeConnectionsService; + private final ClusterSettings clusterSettings; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -312,6 +314,8 @@ public Coordinator( this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; this.remoteStoreNodeService = remoteStoreNodeService; + this.remoteClusterStateService = remoteClusterStateService; + this.clusterSettings = clusterSettings; } private ClusterFormationState getClusterFormationState() { @@ -867,7 +871,9 @@ boolean publicationInProgress() { @Override protected void doStart() { synchronized (mutex) { - coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings)); + coordinationState.set( + new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings) + ); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); @@ -912,9 +918,9 @@ public DiscoveryStats stats() { stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); } }); - if (coordinationState.get().isRemotePublicationEnabled()) { - stats.add(publicationHandler.getFullDownloadStats()); - stats.add(publicationHandler.getDiffDownloadStats()); + if (remoteClusterStateService != null) { + stats.add(remoteClusterStateService.getFullDownloadStats()); + stats.add(remoteClusterStateService.getDiffDownloadStats()); } clusterStateStats.setPersistenceStats(stats); return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 13033b670d44b..aca8653be4dd8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -513,10 +513,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi assert existingNodes.isEmpty() == false; Optional remotePublicationNode = existingNodes.stream() - .filter(DiscoveryNode::isRemoteStatePublicationEnabled) + .filter(DiscoveryNode::isRemoteStatePublicationConfigured) .findFirst(); - if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) { + if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) { ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES); } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index caed2b6eceb49..6277b70f9a7c8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() { ); } - public PersistedStateStats getFullDownloadStats() { - return remoteClusterStateService.getFullDownloadStats(); - } - - public PersistedStateStats getDiffDownloadStats() { - return remoteClusterStateService.getDiffDownloadStats(); - } - private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) { ClusterState incomingState; @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext( ) { if (isRemotePublicationEnabled == true) { if (allNodesRemotePublicationEnabled.get() == false) { - if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) { + if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) { allNodesRemotePublicationEnabled.set(true); } } @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext( return publicationContext; } - private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) { + private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) { assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0; for (DiscoveryNode node : discoveryNodes.getNodes().values()) { // if a node is non-remote then created local publication context - if (node.isRemoteStatePublicationEnabled() == false) { + if (node.isRemoteStatePublicationConfigured() == false) { return false; } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index a6f0a457f7f9b..5f5783425a08c 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -518,7 +518,7 @@ public boolean isRemoteStoreNode() { * Returns whether remote cluster state publication is enabled on this node * @return true if the node contains remote cluster state node attribute and remote routing table node attribute */ - public boolean isRemoteStatePublicationEnabled() { + public boolean isRemoteStatePublicationConfigured() { return this.getAttributes() .keySet() .stream() diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ce5e57b79dadb..e684e9d26fa7a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -111,6 +111,7 @@ import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -132,7 +133,7 @@ public class RemoteClusterStateService implements Closeable { REMOTE_PUBLICATION_SETTING_KEY, false, Property.NodeScope, - Property.Final + Property.Dynamic ); /** @@ -232,7 +233,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; - private final boolean isPublicationEnabled; + private boolean isPublicationEnabled; private final String remotePathPrefix; private final RemoteClusterStateCache remoteClusterStateCache; @@ -273,9 +274,10 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings) + this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( repositoriesService, @@ -1115,6 +1117,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; } + private void setRemotePublicationSetting(boolean remotePublicationSetting) { + if (remotePublicationSetting == false) { + this.isPublicationEnabled = false; + } else { + this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); + } + } + // Package private for unit test RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java index 11f26ac8b3ed9..1a8e85f30527d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java @@ -21,8 +21,8 @@ public class RemotePersistenceStats { RemoteDownloadStats remoteDiffDownloadStats; RemoteDownloadStats remoteFullDownloadStats; - final String FULL_DOWNLOAD_STATS = "remote_full_download"; - final String DIFF_DOWNLOAD_STATS = "remote_diff_download"; + public static final String FULL_DOWNLOAD_STATS = "remote_full_download"; + public static final String DIFF_DOWNLOAD_STATS = "remote_diff_download"; public RemotePersistenceStats() { remoteUploadStats = new RemoteUploadStats(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 32cb95e0c04f6..a77a94723d966 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -1283,7 +1283,7 @@ public static CoordinationState createCoordinationState( DiscoveryNode localNode, Settings settings ) { - return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings); + return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null); } public static ClusterState clusterState( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java index 5ddf614db3334..1852be6310f05 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java @@ -302,7 +302,8 @@ public void testPrevotingIndicatesElectionSuccess() { localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, - Settings.EMPTY + Settings.EMPTY, + null ); final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java index 4d18ff95887dd..a2c54dcc88efb 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java @@ -94,7 +94,13 @@ class MockNode { ); PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState)); - coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY); + coordinationState = new CoordinationState( + localNode, + persistedStateRegistry, + ElectionStrategy.DEFAULT_INSTANCE, + Settings.EMPTY, + null + ); } final DiscoveryNode localNode; diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java index cbe695cbb2136..2215ecafc16c9 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java @@ -150,7 +150,7 @@ static class ClusterNode { persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); this.electionStrategy = electionStrategy; - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); } void reboot() { @@ -189,7 +189,7 @@ void reboot() { localNode.getVersion() ); - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); } void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {