Skip to content

Commit

Permalink
Separate Remote State and Publication enabled and configured methods
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Sep 30, 2024
1 parent 8ddb3ee commit 70947b5
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote;

import org.junit.Assert;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -118,6 +119,26 @@ public Settings.Builder remoteWithRoutingTableNodeSetting() {
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
}

public void testRemoteClusterStateServiceNotInitialized_WhenNodeAttributesNotPresent() {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

ensureStableCluster(3);
ensureGreen();

internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNull);
}

public void testServiceInitialized_WhenNodeAttributesPresent() {
internalCluster().startClusterManagerOnlyNode(buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE));
internalCluster().startDataOnlyNodes(2, buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE));

ensureStableCluster(3);
ensureGreen();

internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNotNull);
}

public void testRemotePublishConfigNodeJoinNonRemoteCluster() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
ensureStableCluster(5);
ensureGreen(INDEX_NAME);

assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
RemoteClusterStateService remoteClusterStateService = internalCluster().getCurrentClusterManagerNodeInstance(
RemoteClusterStateService.class
);

assertFalse(remoteClusterStateService.isRemotePublicationEnabled());
}

public void testRemotePublicationDownloadStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public CoordinationState(
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
&& localNode.isRemoteStatePublicationConfigured();
}

public boolean isRemotePublicationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final RemoteClusterStateService remoteClusterStateService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -312,6 +313,7 @@ public Coordinator(
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -912,9 +914,9 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
if (remoteClusterStateService != null) {
stats.add(remoteClusterStateService.getFullDownloadStats());
stats.add(remoteClusterStateService.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi

assert existingNodes.isEmpty() == false;
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.filter(DiscoveryNode::isRemoteStatePublicationConfigured)
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext(
) {
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
Expand All @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext(
return publicationContext;
}

private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
if (node.isRemoteStatePublicationConfigured() == false) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ public boolean isRemoteStoreNode() {
}

/**
* Returns whether remote cluster state publication is enabled on this node
* Returns whether settings required for remote cluster state publication is configured
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
*/
public boolean isRemoteStatePublicationEnabled() {
public boolean isRemoteStatePublicationConfigured() {
return this.getAttributes()
.keySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand Down Expand Up @@ -256,7 +256,7 @@ public RemoteClusterStateService(
List<IndexMetadataUploadListener> indexMetadataUploadListeners,
NamedWriteableRegistry namedWriteableRegistry
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) : "Remote cluster state is not configured";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
Expand Down Expand Up @@ -1055,7 +1055,7 @@ public void close() throws IOException {
}

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH;
import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory}
Expand Down Expand Up @@ -235,7 +235,7 @@ private Repository validateAndGetRepository(String repoSetting) {
}

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not configured";
if (isRemoteDataAttributePresent == false) {
// If remote store data attributes are not present than we skip this.
return;
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -796,7 +796,7 @@ protected Node(
final RemoteClusterStateService remoteClusterStateService;
final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
final RemoteIndexPathUploader remoteIndexPathUploader;
if (isRemoteStoreClusterStateEnabled(settings)) {
if (isRemoteClusterStateConfigured(settings)) {
remoteIndexPathUploader = new RemoteIndexPathUploader(
threadPool,
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static boolean isRemoteDataAttributePresent(Settings settings) {
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
}

public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
public static boolean isRemoteClusterStateConfigured(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}
Expand All @@ -194,8 +194,7 @@ public static String getRemoteStoreTranslogRepo(Settings settings) {
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
&& isRemoteClusterStateAttributePresent(settings);
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteClusterStateConfigured(settings);
}

private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,30 @@ public static Settings buildRemoteStoreNodeAttributes(
);
}

public static Settings buildRemoteStateNodeAttributes(
String stateRepoName,
Path stateRepoPath,
String stateRepoType
) {
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
stateRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
stateRepoName
);
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
Settings.Builder settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, stateRepoName)
.put(stateRepoTypeAttributeKey, stateRepoType)
.put(stateRepoSettingsAttributeKeyPrefix + "location", stateRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
return settings.build();
}

private static Settings buildRemoteStoreNodeAttributes(
String segmentRepoName,
Path segmentRepoPath,
Expand Down Expand Up @@ -2850,16 +2874,6 @@ private static Settings buildRemoteStoreNodeAttributes(
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
segmentRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
segmentRepoName
);
String routingTableRepoAttributeKey = null, routingTableRepoSettingsAttributeKeyPrefix = null;
if (routingTableRepoName != null) {
routingTableRepoAttributeKey = String.format(
Expand All @@ -2885,10 +2899,7 @@ private static Settings buildRemoteStoreNodeAttributes(
.put(translogRepoTypeAttributeKey, translogRepoType)
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
.put(translogRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, segmentRepoType)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
.put(buildRemoteStateNodeAttributes(segmentRepoName, segmentRepoPath, segmentRepoType));
if (routingTableRepoName != null) {
settings.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoAttributeKey, routingTableRepoType)
Expand Down

0 comments on commit 70947b5

Please sign in to comment.