diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml
index c18046f873477..0d8bf59823acf 100644
--- a/.idea/runConfigurations/Debug_OpenSearch.xml
+++ b/.idea/runConfigurations/Debug_OpenSearch.xml
@@ -6,10 +6,6 @@
-
-
-
-
-
\ No newline at end of file
+
diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md
index 964383078c38d..af788c40ae304 100644
--- a/CHANGELOG-3.0.md
+++ b/CHANGELOG-3.0.md
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/))
- Deprecate CamelCase `PathHierarchy` tokenizer name in favor to lowercase `path_hierarchy` ([#10894](https://github.com/opensearch-project/OpenSearch/pull/10894))
- Breaking change: Do not request "search_pipelines" metrics by default in NodesInfoRequest ([#12497](https://github.com/opensearch-project/OpenSearch/pull/12497))
+- Remove deprecated constant META_FIELDS_BEFORE_7DOT8 ([#13860](https://github.com/opensearch-project/OpenSearch/pull/13860))
### Deprecated
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1ff3fa9a737ec..dc0233f5ff2f8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))
- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
+- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
### Dependencies
@@ -42,7 +43,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove handling of index.mapper.dynamic in AutoCreateIndex([#13067](https://github.com/opensearch-project/OpenSearch/pull/13067))
### Fixed
-- Fix negative RequestStats metric issue ([#13553](https://github.com/opensearch-project/OpenSearch/pull/13553))
- Fix get field mapping API returns 404 error in mixed cluster with multiple versions ([#13624](https://github.com/opensearch-project/OpenSearch/pull/13624))
- Allow clearing `remote_store.compatibility_mode` setting ([#13646](https://github.com/opensearch-project/OpenSearch/pull/13646))
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
diff --git a/release-notes/opensearch.release-notes-2.14.0.md b/release-notes/opensearch.release-notes-2.14.0.md
index 8ef0215baa67a..c5fc3e895c45d 100644
--- a/release-notes/opensearch.release-notes-2.14.0.md
+++ b/release-notes/opensearch.release-notes-2.14.0.md
@@ -84,4 +84,5 @@
- Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012))
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))
- Fix mapper_parsing_exception when using flat_object fields with names longer than 11 characters ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13259))
-- DATETIME_FORMATTER_CACHING_SETTING experimental feature should not default to 'true' ([#13532](https://github.com/opensearch-project/OpenSearch/pull/13532))
\ No newline at end of file
+- DATETIME_FORMATTER_CACHING_SETTING experimental feature should not default to 'true' ([#13532](https://github.com/opensearch-project/OpenSearch/pull/13532))
+- Fix negative RequestStats metric issue ([#13553](https://github.com/opensearch-project/OpenSearch/pull/13553))
diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java
new file mode 100644
index 0000000000000..0f441fe01a368
--- /dev/null
+++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java
@@ -0,0 +1,145 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.test.OpenSearchIntegTestCase;
+import org.junit.Before;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
+import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
+
+@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
+public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
+
+ private static final String INDEX_NAME = "test-index";
+
+ @Before
+ public void setup() {
+ asyncUploadMockFsRepo = false;
+ }
+
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) {
+ return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
+ }
+
+ private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
+ prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
+ Map indexStats = indexData(1, false, INDEX_NAME);
+ assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
+ ensureGreen(INDEX_NAME);
+ return indexStats;
+ }
+
+ public void testRemoteCleanupTaskUpdated() {
+ int shardCount = randomIntBetween(1, 2);
+ int replicaCount = 1;
+ int dataNodeCount = shardCount * (replicaCount + 1);
+ int clusterManagerNodeCount = 1;
+
+ initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
+ RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
+ RemoteClusterStateCleanupManager.class
+ );
+
+ assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
+ assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
+
+ // now disable
+ client().admin()
+ .cluster()
+ .prepareUpdateSettings()
+ .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
+ .get();
+
+ assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
+ assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
+
+ // now set Clean up interval to 1 min
+ client().admin()
+ .cluster()
+ .prepareUpdateSettings()
+ .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
+ .get();
+ assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
+ }
+
+ public void testRemoteCleanupDeleteStale() throws Exception {
+ int shardCount = randomIntBetween(1, 2);
+ int replicaCount = 1;
+ int dataNodeCount = shardCount * (replicaCount + 1);
+ int clusterManagerNodeCount = 1;
+
+ initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
+
+ // set cleanup interval to 100 ms to make the test faster
+ ClusterUpdateSettingsResponse response = client().admin()
+ .cluster()
+ .prepareUpdateSettings()
+ .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
+ .get();
+
+ assertTrue(response.isAcknowledged());
+
+ // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
+ // to repository, if manifest files are less than that it means clean up has run
+ updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);
+
+ RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
+ BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
+ BlobPath baseMetadataPath = repository.basePath()
+ .add(
+ Base64.getUrlEncoder()
+ .withoutPadding()
+ .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
+ )
+ .add("cluster-state")
+ .add(getClusterState().metadata().clusterUUID());
+ BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
+
+ assertBusy(() -> {
+ int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
+ logger.info("number of current manifest file: {}", manifestFiles);
+ // we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
+ // other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
+ // Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
+ assertTrue(
+ "Current number of manifest files: " + manifestFiles,
+ manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
+ );
+ }, 500, TimeUnit.MILLISECONDS);
+ }
+
+ private void updateClusterStateNTimes(int n) {
+ int newReplicaCount = randomIntBetween(0, 3);
+ for (int i = n; i > 0; i--) {
+ ClusterUpdateSettingsResponse response = client().admin()
+ .cluster()
+ .prepareUpdateSettings()
+ .setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
+ .get();
+ assertTrue(response.isAcknowledged());
+ }
+ }
+}
diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
index 42120aa32eb47..ab2f0f0080566 100644
--- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
@@ -10,7 +10,6 @@
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
-import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
@@ -27,7 +26,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
@@ -51,16 +49,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}
- private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
- internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
- internalCluster().startDataOnlyNodes(numDataOnlyNodes);
- for (String index : indices.split(",")) {
- createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
- ensureYellowAndNoInitializingShards(index);
- ensureGreen(index);
- }
- }
-
private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
Map indexStats = indexData(1, false, INDEX_NAME);
@@ -69,49 +57,6 @@ private Map initialTestSetup(int shardCount, int replicaCount, int
return indexStats;
}
- public void testFullClusterRestoreStaleDelete() throws Exception {
- int shardCount = randomIntBetween(1, 2);
- int replicaCount = 1;
- int dataNodeCount = shardCount * (replicaCount + 1);
- int clusterManagerNodeCount = 1;
-
- initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
- setReplicaCount(0);
- setReplicaCount(2);
- setReplicaCount(0);
- setReplicaCount(1);
- setReplicaCount(0);
- setReplicaCount(1);
- setReplicaCount(0);
- setReplicaCount(2);
- setReplicaCount(0);
-
- RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
- RemoteClusterStateService.class
- );
-
- RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
-
- BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
- BlobPath baseMetadataPath = repository.basePath()
- .add(
- Base64.getUrlEncoder()
- .withoutPadding()
- .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
- )
- .add("cluster-state")
- .add(getClusterState().metadata().clusterUUID());
-
- assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());
-
- Map indexMetadataMap = remoteClusterStateService.getLatestClusterState(
- cluster().getClusterName(),
- getClusterState().metadata().clusterUUID()
- ).getMetadata().getIndices();
- assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
- assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
- }
-
public void testRemoteStateStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
@@ -241,12 +186,4 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) {
assertNotNull(nodesStatsResponse.getNodes().get(0));
assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats());
}
-
- private void setReplicaCount(int replicaCount) {
- client().admin()
- .indices()
- .prepareUpdateSettings(INDEX_NAME)
- .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
- .get();
- }
}
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
index 740aee69f7d80..64efcee6ef1b5 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
@@ -350,4 +350,14 @@ protected void restore(boolean restoreAllShards, String... indices) {
PlainActionFuture.newFuture()
);
}
+
+ protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
+ internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
+ internalCluster().startDataOnlyNodes(numDataOnlyNodes);
+ for (String index : indices.split(",")) {
+ createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
+ ensureYellowAndNoInitializingShards(index);
+ ensureGreen(index);
+ }
+ }
}
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index a2be2ea4510e0..d57ef7e780602 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -104,6 +104,7 @@
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
+import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
@@ -711,6 +712,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL,
// Remote cluster state settings
+ RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java
new file mode 100644
index 0000000000000..8a106a25e5630
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java
@@ -0,0 +1,408 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Strings;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.service.ClusterApplierService;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.blobstore.BlobMetadata;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.util.concurrent.AbstractAsyncTask;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.index.translog.transfer.BlobStoreTransferService;
+import org.opensearch.threadpool.ThreadPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
+
+/**
+ * A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
+ *
+ * @opensearch.internal
+ */
+public class RemoteClusterStateCleanupManager implements Closeable {
+
+ public static final int RETAINED_MANIFESTS = 10;
+ public static final int SKIP_CLEANUP_STATE_CHANGES = 10;
+ public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5);
+ public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE;
+
+ /**
+ * Setting to specify the interval to do run stale file cleanup job
+ * Min value -1 indicates that the stale file cleanup job should be disabled
+ */
+ public static final Setting REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting(
+ "cluster.remote_store.state.cleanup_interval",
+ CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT,
+ CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM,
+ Setting.Property.NodeScope,
+ Setting.Property.Dynamic
+ );
+ private static final Logger logger = LogManager.getLogger(RemoteClusterStateCleanupManager.class);
+ private final RemoteClusterStateService remoteClusterStateService;
+ private final RemotePersistenceStats remoteStateStats;
+ private BlobStoreTransferService blobStoreTransferService;
+ private TimeValue staleFileCleanupInterval;
+ private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
+ private AsyncStaleFileDeletion staleFileDeletionTask;
+ private long lastCleanupAttemptStateVersion;
+ private final ThreadPool threadpool;
+ private final ClusterApplierService clusterApplierService;
+
+ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
+ this.remoteClusterStateService = remoteClusterStateService;
+ this.remoteStateStats = remoteClusterStateService.getStats();
+ ClusterSettings clusterSettings = clusterService.getClusterSettings();
+ this.clusterApplierService = clusterService.getClusterApplierService();
+ this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);
+ this.threadpool = remoteClusterStateService.getThreadpool();
+ // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
+ this.lastCleanupAttemptStateVersion = 0;
+ clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
+ }
+
+ void start() {
+ staleFileDeletionTask = new AsyncStaleFileDeletion(this);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (staleFileDeletionTask != null) {
+ staleFileDeletionTask.close();
+ }
+ }
+
+ private BlobStoreTransferService getBlobStoreTransferService() {
+ if (blobStoreTransferService == null) {
+ blobStoreTransferService = new BlobStoreTransferService(remoteClusterStateService.getBlobStore(), threadpool);
+ }
+ return blobStoreTransferService;
+ }
+
+ private void updateCleanupInterval(TimeValue updatedInterval) {
+ this.staleFileCleanupInterval = updatedInterval;
+ logger.info("updated remote state cleanup interval to {}", updatedInterval);
+ // After updating the interval, we need to close the current task and create a new one which will run with updated interval
+ if (staleFileDeletionTask != null && !staleFileDeletionTask.getInterval().equals(updatedInterval)) {
+ staleFileDeletionTask.setInterval(updatedInterval);
+ }
+ }
+
+ // visible for testing
+ void cleanUpStaleFiles() {
+ ClusterState currentAppliedState = clusterApplierService.state();
+ if (currentAppliedState.nodes().isLocalNodeElectedClusterManager()) {
+ long cleanUpAttemptStateVersion = currentAppliedState.version();
+ assert Strings.isNotEmpty(currentAppliedState.getClusterName().value()) : "cluster name is not set";
+ assert Strings.isNotEmpty(currentAppliedState.metadata().clusterUUID()) : "cluster uuid is not set";
+ if (cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion > SKIP_CLEANUP_STATE_CHANGES) {
+ logger.info(
+ "Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates",
+ currentAppliedState.getClusterName().value(),
+ currentAppliedState.metadata().clusterUUID(),
+ cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion
+ );
+ this.deleteStaleClusterMetadata(
+ currentAppliedState.getClusterName().value(),
+ currentAppliedState.metadata().clusterUUID(),
+ RETAINED_MANIFESTS
+ );
+ lastCleanupAttemptStateVersion = cleanUpAttemptStateVersion;
+ } else {
+ logger.debug(
+ "Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}",
+ currentAppliedState.getClusterName().value(),
+ currentAppliedState.metadata().clusterUUID(),
+ cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion,
+ SKIP_CLEANUP_STATE_CHANGES
+ );
+ }
+ } else {
+ logger.debug("Skipping cleanup task as local node is not elected Cluster Manager");
+ }
+ }
+
+ private void addStaleGlobalMetadataPath(String fileName, Set filesToKeep, Set staleGlobalMetadataPaths) {
+ if (!filesToKeep.contains(fileName)) {
+ String[] splitPath = fileName.split("/");
+ staleGlobalMetadataPaths.add(
+ new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
+ splitPath[splitPath.length - 1]
+ )
+ );
+ }
+ }
+
+ // visible for testing
+ void deleteClusterMetadata(
+ String clusterName,
+ String clusterUUID,
+ List activeManifestBlobMetadata,
+ List staleManifestBlobMetadata
+ ) {
+ try {
+ Set filesToKeep = new HashSet<>();
+ Set staleManifestPaths = new HashSet<>();
+ Set staleIndexMetadataPaths = new HashSet<>();
+ Set staleGlobalMetadataPaths = new HashSet<>();
+ activeManifestBlobMetadata.forEach(blobMetadata -> {
+ ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
+ clusterName,
+ clusterUUID,
+ blobMetadata.name()
+ );
+ clusterMetadataManifest.getIndices()
+ .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
+ if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
+ filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
+ } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
+ filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
+ filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
+ filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
+ clusterMetadataManifest.getCustomMetadataMap()
+ .values()
+ .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
+ }
+ });
+ staleManifestBlobMetadata.forEach(blobMetadata -> {
+ ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
+ clusterName,
+ clusterUUID,
+ blobMetadata.name()
+ );
+ staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
+ if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
+ addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
+ } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
+ addStaleGlobalMetadataPath(
+ clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
+ filesToKeep,
+ staleGlobalMetadataPaths
+ );
+ addStaleGlobalMetadataPath(
+ clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(),
+ filesToKeep,
+ staleGlobalMetadataPaths
+ );
+ addStaleGlobalMetadataPath(
+ clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(),
+ filesToKeep,
+ staleGlobalMetadataPaths
+ );
+ clusterMetadataManifest.getCustomMetadataMap()
+ .values()
+ .forEach(
+ attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
+ );
+ }
+
+ clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
+ if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
+ staleIndexMetadataPaths.add(
+ new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
+ );
+ }
+ });
+ });
+
+ if (staleManifestPaths.isEmpty()) {
+ logger.debug("No stale Remote Cluster Metadata files found");
+ return;
+ }
+
+ deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
+ deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
+ deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
+ } catch (IllegalStateException e) {
+ logger.error("Error while fetching Remote Cluster Metadata manifests", e);
+ } catch (IOException e) {
+ logger.error("Error while deleting stale Remote Cluster Metadata files", e);
+ remoteStateStats.cleanUpAttemptFailed();
+ } catch (Exception e) {
+ logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e);
+ remoteStateStats.cleanUpAttemptFailed();
+ }
+ }
+
+ /**
+ * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests
+ *
+ * @param clusterName name of the cluster
+ * @param clusterUUID uuid of cluster state to refer to in remote
+ * @param manifestsToRetain no of latest manifest files to keep in remote
+ */
+ // package private for testing
+ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
+ if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
+ logger.info("Delete stale cluster metadata task is already in progress.");
+ return;
+ }
+ try {
+ getBlobStoreTransferService().listAllInSortedOrderAsync(
+ ThreadPool.Names.REMOTE_PURGE,
+ remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
+ MANIFEST_FILE_PREFIX,
+ Integer.MAX_VALUE,
+ new ActionListener<>() {
+ @Override
+ public void onResponse(List blobMetadata) {
+ if (blobMetadata.size() > manifestsToRetain) {
+ deleteClusterMetadata(
+ clusterName,
+ clusterUUID,
+ blobMetadata.subList(0, manifestsToRetain),
+ blobMetadata.subList(manifestsToRetain, blobMetadata.size())
+ );
+ }
+ deleteStaleMetadataRunning.set(false);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ logger.error(
+ new ParameterizedMessage(
+ "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
+ clusterUUID
+ )
+ );
+ deleteStaleMetadataRunning.set(false);
+ }
+ }
+ );
+ } catch (Exception e) {
+ deleteStaleMetadataRunning.set(false);
+ throw e;
+ }
+ }
+
+ /**
+ * Purges all remote cluster state against provided cluster UUIDs
+ *
+ * @param clusterName name of the cluster
+ * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
+ */
+ void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) {
+ clusterUUIDs.forEach(
+ clusterUUID -> getBlobStoreTransferService().deleteAsync(
+ ThreadPool.Names.REMOTE_PURGE,
+ remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
+ new ActionListener<>() {
+ @Override
+ public void onResponse(Void unused) {
+ logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ logger.error(
+ new ParameterizedMessage(
+ "Exception occurred while deleting all remote cluster metadata for cluster UUID {}",
+ clusterUUID
+ ),
+ e
+ );
+ remoteStateStats.cleanUpAttemptFailed();
+ }
+ }
+ )
+ );
+ }
+
+ // package private for testing
+ void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException {
+ logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
+ getBlobStoreTransferService().deleteBlobs(
+ remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
+ stalePaths
+ );
+ }
+
+ /**
+ * Purges all remote cluster state against provided cluster UUIDs
+ * @param clusterState current state of the cluster
+ * @param committedManifest last committed ClusterMetadataManifest
+ */
+ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
+ threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
+ String clusterName = clusterState.getClusterName().value();
+ logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
+ Set allClustersUUIDsInRemote;
+ try {
+ allClustersUUIDsInRemote = new HashSet<>(
+ remoteClusterStateService.getAllClusterUUIDs(clusterState.getClusterName().value())
+ );
+ } catch (IOException e) {
+ logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
+ return;
+ }
+ // Retain last 2 cluster uuids data
+ allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
+ allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
+ deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
+ });
+ }
+
+ public TimeValue getStaleFileCleanupInterval() {
+ return this.staleFileCleanupInterval;
+ }
+
+ AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing
+ return this.staleFileDeletionTask;
+ }
+
+ RemotePersistenceStats getStats() {
+ return this.remoteStateStats;
+ }
+
+ static final class AsyncStaleFileDeletion extends AbstractAsyncTask {
+ private final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
+
+ AsyncStaleFileDeletion(RemoteClusterStateCleanupManager remoteClusterStateCleanupManager) {
+ super(
+ logger,
+ remoteClusterStateCleanupManager.threadpool,
+ remoteClusterStateCleanupManager.getStaleFileCleanupInterval(),
+ true
+ );
+ this.remoteClusterStateCleanupManager = remoteClusterStateCleanupManager;
+ rescheduleIfNecessary();
+ }
+
+ @Override
+ protected boolean mustReschedule() {
+ return true;
+ }
+
+ @Override
+ protected void runInternal() {
+ remoteClusterStateCleanupManager.cleanUpStaleFiles();
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
index ac821cd15a5b3..9bf1dff2359ca 100644
--- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
+++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
@@ -18,11 +18,13 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
+import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
@@ -59,7 +61,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongSupplier;
@@ -81,8 +82,6 @@ public class RemoteClusterStateService implements Closeable {
public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
- public static final int RETAINED_MANIFESTS = 10;
-
public static final String DELIMITER = "__";
public static final String CUSTOM_DELIMITER = "--";
@@ -207,8 +206,7 @@ public class RemoteClusterStateService implements Closeable {
private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;
private volatile TimeValue metadataManifestUploadTimeout;
-
- private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
+ private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
private final RemotePersistenceStats remoteStateStats;
private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]";
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
@@ -232,7 +230,7 @@ public RemoteClusterStateService(
String nodeId,
Supplier repositoriesService,
Settings settings,
- ClusterSettings clusterSettings,
+ ClusterService clusterService,
LongSupplier relativeTimeNanosSupplier,
ThreadPool threadPool,
List indexMetadataUploadListeners
@@ -243,6 +241,7 @@ public RemoteClusterStateService(
this.settings = settings;
this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
this.threadpool = threadPool;
+ ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
@@ -252,16 +251,10 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
+ this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
}
- private BlobStoreTransferService getBlobStoreTransferService() {
- if (blobStoreTransferService == null) {
- blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);
- }
- return blobStoreTransferService;
- }
-
/**
* This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
* invoked by the elected cluster manager when the remote cluster state is enabled.
@@ -417,7 +410,6 @@ public ClusterMetadataManifest writeIncrementalMetadata(
: previousManifest.getCustomMetadataMap(),
false
);
- deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateSucceeded();
@@ -721,6 +713,10 @@ private CheckedRunnable getAsyncMetadataWriteAction(
);
}
+ public RemoteClusterStateCleanupManager getCleanupManager() {
+ return remoteClusterStateCleanupManager;
+ }
+
@Nullable
public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
throws IOException {
@@ -740,12 +736,16 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
previousManifest.getCustomMetadataMap(),
true
);
- deleteStaleClusterUUIDs(clusterState, committedManifest);
+ if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) {
+ remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest);
+ }
+
return committedManifest;
}
@Override
public void close() throws IOException {
+ remoteClusterStateCleanupManager.close();
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
@@ -760,6 +760,7 @@ public void start() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
+ remoteClusterStateCleanupManager.start();
}
private ClusterMetadataManifest uploadManifest(
@@ -850,6 +851,14 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust
);
}
+ ThreadPool getThreadpool() {
+ return threadpool;
+ }
+
+ BlobStore getBlobStore() {
+ return blobStoreRepository.blobStore();
+ }
+
private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
@@ -867,7 +876,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID)
return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID));
}
- private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
+ BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
}
@@ -982,7 +991,7 @@ private static String metadataAttributeFileName(String componentPrefix, Long met
);
}
- private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
+ BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN);
}
@@ -1235,7 +1244,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) {
}
}
- private Set getAllClusterUUIDs(String clusterName) throws IOException {
+ Set getAllClusterUUIDs(String clusterName) throws IOException {
Map clusterUUIDMetadata = clusterUUIDContainer(clusterName).children();
if (clusterUUIDMetadata == null) {
return Collections.emptySet();
@@ -1426,7 +1435,7 @@ private Optional getLatestManifestFileName(String clusterName, String cl
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
- private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
+ ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
throws IllegalStateException {
try {
return getClusterMetadataManifestBlobStoreFormat(filename).read(
@@ -1486,234 +1495,6 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) {
}
}
- /**
- * Purges all remote cluster state against provided cluster UUIDs
- *
- * @param clusterName name of the cluster
- * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
- */
- void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) {
- clusterUUIDs.forEach(clusterUUID -> {
- getBlobStoreTransferService().deleteAsync(
- ThreadPool.Names.REMOTE_PURGE,
- getCusterMetadataBasePath(clusterName, clusterUUID),
- new ActionListener<>() {
- @Override
- public void onResponse(Void unused) {
- logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID);
- }
-
- @Override
- public void onFailure(Exception e) {
- logger.error(
- new ParameterizedMessage(
- "Exception occurred while deleting all remote cluster metadata for cluster UUID {}",
- clusterUUID
- ),
- e
- );
- remoteStateStats.cleanUpAttemptFailed();
- }
- }
- );
- });
- }
-
- /**
- * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests
- *
- * @param clusterName name of the cluster
- * @param clusterUUID uuid of cluster state to refer to in remote
- * @param manifestsToRetain no of latest manifest files to keep in remote
- */
- // package private for testing
- void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
- if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
- logger.info("Delete stale cluster metadata task is already in progress.");
- return;
- }
- try {
- getBlobStoreTransferService().listAllInSortedOrderAsync(
- ThreadPool.Names.REMOTE_PURGE,
- getManifestFolderPath(clusterName, clusterUUID),
- "manifest",
- Integer.MAX_VALUE,
- new ActionListener<>() {
- @Override
- public void onResponse(List blobMetadata) {
- if (blobMetadata.size() > manifestsToRetain) {
- deleteClusterMetadata(
- clusterName,
- clusterUUID,
- blobMetadata.subList(0, manifestsToRetain - 1),
- blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size())
- );
- }
- deleteStaleMetadataRunning.set(false);
- }
-
- @Override
- public void onFailure(Exception e) {
- logger.error(
- new ParameterizedMessage(
- "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
- clusterUUID
- )
- );
- deleteStaleMetadataRunning.set(false);
- }
- }
- );
- } catch (Exception e) {
- deleteStaleMetadataRunning.set(false);
- throw e;
- }
- }
-
- private void deleteClusterMetadata(
- String clusterName,
- String clusterUUID,
- List activeManifestBlobMetadata,
- List staleManifestBlobMetadata
- ) {
- try {
- Set filesToKeep = new HashSet<>();
- Set staleManifestPaths = new HashSet<>();
- Set staleIndexMetadataPaths = new HashSet<>();
- Set staleGlobalMetadataPaths = new HashSet<>();
- activeManifestBlobMetadata.forEach(blobMetadata -> {
- ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
- clusterName,
- clusterUUID,
- blobMetadata.name()
- );
- clusterMetadataManifest.getIndices()
- .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
- if (clusterMetadataManifest.getGlobalMetadataFileName() != null) {
- filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
- } else {
- filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
- filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
- filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
- clusterMetadataManifest.getCustomMetadataMap()
- .forEach((key, value) -> { filesToKeep.add(value.getUploadedFilename()); });
- }
- });
- staleManifestBlobMetadata.forEach(blobMetadata -> {
- ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
- clusterName,
- clusterUUID,
- blobMetadata.name()
- );
- staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
- if (clusterMetadataManifest.getGlobalMetadataFileName() != null) {
- if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) {
- String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/");
- staleGlobalMetadataPaths.add(
- new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
- globalMetadataSplitPath[globalMetadataSplitPath.length - 1]
- )
- );
- }
- } else {
- if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) {
- String[] coordinationMetadataSplitPath = clusterMetadataManifest.getCoordinationMetadata()
- .getUploadedFilename()
- .split("/");
- staleGlobalMetadataPaths.add(
- new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
- coordinationMetadataSplitPath[coordinationMetadataSplitPath.length - 1]
- )
- );
- }
- if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) {
- String[] templatesMetadataSplitPath = clusterMetadataManifest.getTemplatesMetadata()
- .getUploadedFilename()
- .split("/");
- staleGlobalMetadataPaths.add(
- new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
- templatesMetadataSplitPath[templatesMetadataSplitPath.length - 1]
- )
- );
- }
- if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) {
- String[] settingsMetadataSplitPath = clusterMetadataManifest.getSettingsMetadata().getUploadedFilename().split("/");
- staleGlobalMetadataPaths.add(
- new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
- settingsMetadataSplitPath[settingsMetadataSplitPath.length - 1]
- )
- );
- }
- clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> {
- if (filesToKeep.contains(value.getUploadedFilename()) == false) {
- String[] customMetadataSplitPath = value.getUploadedFilename().split("/");
- staleGlobalMetadataPaths.add(
- new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
- customMetadataSplitPath[customMetadataSplitPath.length - 1]
- )
- );
- }
- });
- }
-
- clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
- if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
- staleIndexMetadataPaths.add(
- new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
- + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
- );
- }
- });
- });
-
- if (staleManifestPaths.isEmpty()) {
- logger.debug("No stale Remote Cluster Metadata files found");
- return;
- }
-
- deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
- deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
- deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
- } catch (IllegalStateException e) {
- logger.error("Error while fetching Remote Cluster Metadata manifests", e);
- } catch (IOException e) {
- logger.error("Error while deleting stale Remote Cluster Metadata files", e);
- remoteStateStats.cleanUpAttemptFailed();
- } catch (Exception e) {
- logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e);
- remoteStateStats.cleanUpAttemptFailed();
- }
- }
-
- private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException {
- logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
- getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths);
- }
-
- /**
- * Purges all remote cluster state against provided cluster UUIDs
- *
- * @param clusterState current state of the cluster
- * @param committedManifest last committed ClusterMetadataManifest
- */
- public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
- threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
- String clusterName = clusterState.getClusterName().value();
- logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
- Set allClustersUUIDsInRemote;
- try {
- allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
- } catch (IOException e) {
- logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
- return;
- }
- // Retain last 2 cluster uuids data
- allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
- allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
- deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
- });
- }
-
public RemotePersistenceStats getStats() {
return remoteStateStats;
}
diff --git a/server/src/main/java/org/opensearch/index/mapper/MapperService.java b/server/src/main/java/org/opensearch/index/mapper/MapperService.java
index a1f3894c9f14c..83fb743305702 100644
--- a/server/src/main/java/org/opensearch/index/mapper/MapperService.java
+++ b/server/src/main/java/org/opensearch/index/mapper/MapperService.java
@@ -73,7 +73,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -201,11 +200,6 @@ public enum MergeReason {
Property.IndexScope,
Property.Deprecated
);
- // Deprecated set of meta-fields, for checking if a field is meta, use an instance method isMetadataField instead
- @Deprecated
- public static final Set META_FIELDS_BEFORE_7DOT8 = Collections.unmodifiableSet(
- new HashSet<>(Arrays.asList("_id", IgnoredFieldMapper.NAME, "_index", "_routing", "_size", "_timestamp", "_ttl", "_type"))
- );
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MapperService.class);
diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java
index 9462aeddbd0e4..04bd31e6a5809 100644
--- a/server/src/main/java/org/opensearch/node/Node.java
+++ b/server/src/main/java/org/opensearch/node/Node.java
@@ -138,6 +138,7 @@
import org.opensearch.gateway.MetaStateService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
+import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.identity.IdentityService;
@@ -751,6 +752,7 @@ protected Node(
threadPool::relativeTimeInMillis
);
final RemoteClusterStateService remoteClusterStateService;
+ final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
final RemoteIndexPathUploader remoteIndexPathUploader;
if (isRemoteStoreClusterStateEnabled(settings)) {
remoteIndexPathUploader = new RemoteIndexPathUploader(
@@ -763,14 +765,16 @@ protected Node(
nodeEnvironment.nodeId(),
repositoriesServiceReference::get,
settings,
- clusterService.getClusterSettings(),
+ clusterService,
threadPool::preciseRelativeTimeInNanos,
threadPool,
List.of(remoteIndexPathUploader)
);
+ remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager();
} else {
remoteClusterStateService = null;
remoteIndexPathUploader = null;
+ remoteClusterStateCleanupManager = null;
}
// collect engine factory providers from plugins
@@ -1374,6 +1378,7 @@ protected Node(
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
+ b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory);
diff --git a/server/src/main/java/org/opensearch/plugins/PluginsService.java b/server/src/main/java/org/opensearch/plugins/PluginsService.java
index a6eefd2f4fd17..f08c9c738f1b4 100644
--- a/server/src/main/java/org/opensearch/plugins/PluginsService.java
+++ b/server/src/main/java/org/opensearch/plugins/PluginsService.java
@@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesFormat;
+import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.util.SPIClassIterator;
import org.opensearch.Build;
@@ -762,6 +763,7 @@ static void reloadLuceneSPI(ClassLoader loader) {
// Codecs:
PostingsFormat.reloadPostingsFormats(loader);
DocValuesFormat.reloadDocValuesFormats(loader);
+ KnnVectorsFormat.reloadKnnVectorsFormat(loader);
Codec.reloadCodecs(loader);
}
diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
index 3ba98c44f8d3e..418e6d8de6adb 100644
--- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
+++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
@@ -462,9 +462,7 @@ public void testDataOnlyNodePersistence() throws Exception {
});
when(transportService.getThreadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
- when(clusterService.getClusterSettings()).thenReturn(
- new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
- );
+ when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(
nodeEnvironment,
xContentRegistry(),
@@ -487,7 +485,7 @@ public void testDataOnlyNodePersistence() throws Exception {
nodeEnvironment.nodeId(),
repositoriesServiceSupplier,
settings,
- clusterSettings,
+ clusterService,
() -> 0L,
threadPool,
List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java
new file mode 100644
index 0000000000000..24fd1b164a4ff
--- /dev/null
+++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java
@@ -0,0 +1,446 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.cluster.ClusterName;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.metadata.Metadata;
+import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.cluster.service.ClusterApplierService;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.blobstore.BlobContainer;
+import org.opensearch.common.blobstore.BlobMetadata;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.blobstore.BlobStore;
+import org.opensearch.common.blobstore.support.PlainBlobMetadata;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.concurrent.AbstractAsyncTask;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.repositories.fs.FsRepository;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.test.VersionUtils;
+import org.opensearch.threadpool.TestThreadPool;
+import org.opensearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.CLUSTER_STATE_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.encodeString;
+import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex;
+import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase {
+ private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
+ private Supplier repositoriesServiceSupplier;
+ private RepositoriesService repositoriesService;
+ private BlobStoreRepository blobStoreRepository;
+ private BlobStore blobStore;
+ private ClusterSettings clusterSettings;
+ private ClusterApplierService clusterApplierService;
+ private ClusterState clusterState;
+ private Metadata metadata;
+ private RemoteClusterStateService remoteClusterStateService;
+ private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
+
+ @Before
+ public void setup() {
+ repositoriesServiceSupplier = mock(Supplier.class);
+ repositoriesService = mock(RepositoriesService.class);
+ when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
+
+ String stateRepoTypeAttributeKey = String.format(
+ Locale.getDefault(),
+ "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
+ "remote_store_repository"
+ );
+ String stateRepoSettingsAttributeKeyPrefix = String.format(
+ Locale.getDefault(),
+ "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
+ "remote_store_repository"
+ );
+
+ Settings settings = Settings.builder()
+ .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository")
+ .put(stateRepoTypeAttributeKey, FsRepository.TYPE)
+ .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
+ .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
+ .build();
+
+ clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ clusterApplierService = mock(ClusterApplierService.class);
+ clusterState = mock(ClusterState.class);
+ metadata = mock(Metadata.class);
+ ClusterService clusterService = mock(ClusterService.class);
+ when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+ when(clusterState.getClusterName()).thenReturn(new ClusterName("test"));
+ when(metadata.clusterUUID()).thenReturn("testUUID");
+ when(clusterState.metadata()).thenReturn(metadata);
+ when(clusterApplierService.state()).thenReturn(clusterState);
+ when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
+
+ blobStoreRepository = mock(BlobStoreRepository.class);
+ blobStore = mock(BlobStore.class);
+ when(blobStoreRepository.blobStore()).thenReturn(blobStore);
+ when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository);
+
+ remoteClusterStateService = mock(RemoteClusterStateService.class);
+ when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats());
+ when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool);
+ when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
+ remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ super.tearDown();
+ remoteClusterStateCleanupManager.close();
+ threadPool.shutdown();
+ }
+
+ public void testDeleteClusterMetadata() throws IOException {
+ String clusterUUID = "clusterUUID";
+ String clusterName = "test-cluster";
+ List inactiveBlobs = Arrays.asList(
+ new PlainBlobMetadata("manifest1.dat", 1L),
+ new PlainBlobMetadata("manifest2.dat", 1L),
+ new PlainBlobMetadata("manifest3.dat", 1L)
+ );
+ List activeBlobs = Arrays.asList(
+ new PlainBlobMetadata("manifest4.dat", 1L),
+ new PlainBlobMetadata("manifest5.dat", 1L)
+ );
+ UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1");
+ UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2");
+ UploadedIndexMetadata index1UpdatedMetadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1_updated");
+ UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata");
+ UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata");
+ UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata");
+ UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute(
+ COORDINATION_METADATA,
+ "coordination_metadata_updated"
+ );
+ UploadedMetadataAttribute templateMetadataUpdated = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata_updated");
+ UploadedMetadataAttribute settingMetadataUpdated = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata_updated");
+ ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder()
+ .indices(List.of(index1Metadata))
+ .globalMetadataFileName("global_metadata")
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .codecVersion(CODEC_V1)
+ .stateUUID(randomAlphaOfLength(10))
+ .clusterUUID(clusterUUID)
+ .nodeId("nodeA")
+ .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
+ .previousClusterUUID(ClusterState.UNKNOWN_UUID)
+ .committed(true)
+ .build();
+ ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1)
+ .indices(List.of(index1Metadata, index2Metadata))
+ .codecVersion(CODEC_V2)
+ .globalMetadataFileName(null)
+ .coordinationMetadata(coordinationMetadata)
+ .templatesMetadata(templateMetadata)
+ .settingMetadata(settingMetadata)
+ .build();
+ ClusterMetadataManifest manifest3 = ClusterMetadataManifest.builder(manifest2)
+ .indices(List.of(index1UpdatedMetadata, index2Metadata))
+ .settingMetadata(settingMetadataUpdated)
+ .build();
+
+ // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated
+ ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3)
+ .coordinationMetadata(coordinationMetadataUpdated)
+ .build();
+ ClusterMetadataManifest manifest5 = ClusterMetadataManifest.builder(manifest4).templatesMetadata(templateMetadataUpdated).build();
+
+ when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn(
+ manifest4,
+ manifest5,
+ manifest1,
+ manifest2,
+ manifest3
+ );
+ BlobContainer container = mock(BlobContainer.class);
+ when(blobStore.blobContainer(any())).thenReturn(container);
+ doNothing().when(container).deleteBlobsIgnoringIfNotExists(any());
+
+ remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs);
+ verify(container).deleteBlobsIgnoringIfNotExists(
+ List.of(
+ new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + coordinationMetadata.getUploadedFilename() + ".dat",
+ new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + settingMetadata.getUploadedFilename() + ".dat",
+ new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + "global_metadata.dat"
+ )
+ );
+ verify(container).deleteBlobsIgnoringIfNotExists(
+ List.of(
+ new BlobPath().add(INDEX_PATH_TOKEN).add(index1Metadata.getIndexUUID()).buildAsString()
+ + index1Metadata.getUploadedFilePath()
+ + ".dat"
+ )
+ );
+ Set staleManifest = new HashSet<>();
+ inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name()));
+ verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));
+ }
+
+ public void testDeleteStaleClusterUUIDs() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
+ .indices(List.of())
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .stateUUID(randomAlphaOfLength(10))
+ .clusterUUID("cluster-uuid1")
+ .nodeId("nodeA")
+ .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
+ .previousClusterUUID(ClusterState.UNKNOWN_UUID)
+ .committed(true)
+ .build();
+
+ BlobPath blobPath = new BlobPath().add("random-path");
+ BlobContainer uuidContainerContainer = mock(BlobContainer.class);
+ BlobContainer manifest2Container = mock(BlobContainer.class);
+ BlobContainer manifest3Container = mock(BlobContainer.class);
+ when(blobStore.blobContainer(any())).then(invocation -> {
+ BlobPath blobPath1 = invocation.getArgument(0);
+ if (blobPath1.buildAsString().endsWith("cluster-state/")) {
+ return uuidContainerContainer;
+ } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
+ return manifest2Container;
+ } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
+ return manifest3Container;
+ } else {
+ throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
+ }
+ });
+ when(
+ manifest2Container.listBlobsByPrefixInSortedOrder(
+ MANIFEST_FILE_PREFIX + DELIMITER,
+ Integer.MAX_VALUE,
+ BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
+ )
+ ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
+ when(
+ manifest3Container.listBlobsByPrefixInSortedOrder(
+ MANIFEST_FILE_PREFIX + DELIMITER,
+ Integer.MAX_VALUE,
+ BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
+ )
+ ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
+ Set uuids = new HashSet<>(Arrays.asList("cluster-uuid1", "cluster-uuid2", "cluster-uuid3"));
+ when(remoteClusterStateService.getAllClusterUUIDs(any())).thenReturn(uuids);
+ when(remoteClusterStateService.getCusterMetadataBasePath(any(), any())).then(
+ invocationOnMock -> blobPath.add(encodeString(invocationOnMock.getArgument(0)))
+ .add(CLUSTER_STATE_PATH_TOKEN)
+ .add((String) invocationOnMock.getArgument(1))
+ );
+ remoteClusterStateCleanupManager.start();
+ remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
+ try {
+ assertBusy(() -> {
+ verify(manifest2Container, times(1)).delete();
+ verify(manifest3Container, times(1)).delete();
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void testRemoteStateCleanupFailureStats() throws IOException {
+ BlobContainer blobContainer = mock(BlobContainer.class);
+ doThrow(IOException.class).when(blobContainer).delete();
+ when(blobStore.blobContainer(any())).thenReturn(blobContainer);
+ BlobPath blobPath = new BlobPath().add("random-path");
+ when((blobStoreRepository.basePath())).thenReturn(blobPath);
+ remoteClusterStateCleanupManager.start();
+ remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", List.of("cluster-uuid1"));
+ try {
+ assertBusy(() -> {
+ // wait for stats to get updated
+ assertNotNull(remoteClusterStateCleanupManager.getStats());
+ assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount());
+ assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount());
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
+ BlobContainer blobContainer = mock(BlobContainer.class);
+ when(blobStore.blobContainer(any())).thenReturn(blobContainer);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger callCount = new AtomicInteger(0);
+ doAnswer(invocation -> {
+ callCount.incrementAndGet();
+ if (latch.await(5000, TimeUnit.SECONDS) == false) {
+ throw new Exception("Timed out waiting for delete task queuing to complete");
+ }
+ return null;
+ }).when(blobContainer)
+ .listBlobsByPrefixInSortedOrder(
+ any(String.class),
+ any(int.class),
+ any(BlobContainer.BlobNameSortOrder.class),
+ any(ActionListener.class)
+ );
+
+ remoteClusterStateCleanupManager.start();
+ remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
+ remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
+
+ latch.countDown();
+ assertBusy(() -> assertEquals(1, callCount.get()));
+ }
+
+ public void testRemoteClusterStateCleanupSetting() {
+ remoteClusterStateCleanupManager.start();
+ // verify default value
+ assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileCleanupInterval());
+
+ // verify update interval
+ int cleanupInterval = randomIntBetween(1, 10);
+ Settings newSettings = Settings.builder().put("cluster.remote_store.state.cleanup_interval", cleanupInterval + "s").build();
+ clusterSettings.applySettings(newSettings);
+ assertEquals(cleanupInterval, remoteClusterStateCleanupManager.getStaleFileCleanupInterval().seconds());
+ }
+
+ public void testRemoteCleanupTaskScheduled() {
+ AbstractAsyncTask cleanupTask = remoteClusterStateCleanupManager.getStaleFileDeletionTask();
+ assertNull(cleanupTask);
+ // now the task should be initialized
+ remoteClusterStateCleanupManager.start();
+ assertNotNull(remoteClusterStateCleanupManager.getStaleFileDeletionTask());
+ assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().mustReschedule());
+ assertEquals(
+ clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING),
+ remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval()
+ );
+ assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
+ assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isClosed());
+ }
+
+ public void testRemoteCleanupSkipsOnOnlyElectedClusterManager() {
+ DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+ when(nodes.isLocalNodeElectedClusterManager()).thenReturn(false);
+ when(clusterState.nodes()).thenReturn(nodes);
+ RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+ AtomicInteger callCount = new AtomicInteger(0);
+ doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
+ spyManager.cleanUpStaleFiles();
+ assertEquals(0, callCount.get());
+
+ when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
+ when(clusterState.version()).thenReturn(randomLongBetween(11, 20));
+ spyManager.cleanUpStaleFiles();
+ assertEquals(1, callCount.get());
+ }
+
+ public void testRemoteCleanupSkipsIfVersionIncrementLessThanThreshold() {
+ DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+ long version = randomLongBetween(1, SKIP_CLEANUP_STATE_CHANGES);
+ when(clusterApplierService.state()).thenReturn(clusterState);
+ when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
+ when(clusterState.nodes()).thenReturn(nodes);
+ when(clusterState.version()).thenReturn(version);
+
+ RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+ AtomicInteger callCount = new AtomicInteger(0);
+ doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
+
+ remoteClusterStateCleanupManager.cleanUpStaleFiles();
+ assertEquals(0, callCount.get());
+ }
+
+ public void testRemoteCleanupCallsDeleteIfVersionIncrementGreaterThanThreshold() {
+ DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+ long version = randomLongBetween(SKIP_CLEANUP_STATE_CHANGES + 1, SKIP_CLEANUP_STATE_CHANGES + 10);
+ when(clusterApplierService.state()).thenReturn(clusterState);
+ when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
+ when(clusterState.nodes()).thenReturn(nodes);
+ when(clusterState.version()).thenReturn(version);
+
+ RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+ AtomicInteger callCount = new AtomicInteger(0);
+ doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
+
+ // using spied cleanup manager so that stubbed deleteStaleClusterMetadata is called
+ spyManager.cleanUpStaleFiles();
+ assertEquals(1, callCount.get());
+ }
+
+ public void testRemoteCleanupSchedulesEvenAfterFailure() {
+ remoteClusterStateCleanupManager.start();
+ RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+ AtomicInteger callCount = new AtomicInteger(0);
+ doAnswer(invocationOnMock -> {
+ callCount.incrementAndGet();
+ throw new RuntimeException("Test exception");
+ }).when(spyManager).cleanUpStaleFiles();
+ AsyncStaleFileDeletion task = new AsyncStaleFileDeletion(spyManager);
+ assertTrue(task.isScheduled());
+ task.run();
+ // Task is still scheduled after the failure
+ assertTrue(task.isScheduled());
+ assertEquals(1, callCount.get());
+
+ task.run();
+ // Task is still scheduled after the failure
+ assertTrue(task.isScheduled());
+ assertEquals(2, callCount.get());
+ }
+}
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
index 1b242b921c0d7..5f0c371a3137e 100644
--- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
+++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
@@ -19,6 +19,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
@@ -72,9 +73,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -93,7 +91,6 @@
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
@@ -109,13 +106,12 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
private RemoteClusterStateService remoteClusterStateService;
+ private ClusterService clusterService;
private ClusterSettings clusterSettings;
private Supplier repositoriesServiceSupplier;
private RepositoriesService repositoriesService;
@@ -148,6 +144,8 @@ public void setup() {
.build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ clusterService = mock(ClusterService.class);
+ when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
Stream.of(
NetworkModule.getNamedXContents().stream(),
@@ -165,7 +163,7 @@ public void setup() {
"test-node-id",
repositoriesServiceSupplier,
settings,
- clusterSettings,
+ clusterService,
() -> 0L,
threadPool,
List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
@@ -187,14 +185,14 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException
public void testFailInitializationWhenRemoteStateDisabled() {
final Settings settings = Settings.builder().build();
- ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
assertThrows(
AssertionError.class,
() -> new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
settings,
- clusterSettings,
+ clusterService,
() -> 0L,
threadPool,
List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
@@ -1280,72 +1278,6 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx
assertThat(previousClusterUUID, equalTo("cluster-uuid2"));
}
- public void testDeleteStaleClusterUUIDs() throws IOException {
- final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
- ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
- .indices(List.of())
- .clusterTerm(1L)
- .stateVersion(1L)
- .stateUUID(randomAlphaOfLength(10))
- .clusterUUID("cluster-uuid1")
- .nodeId("nodeA")
- .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
- .previousClusterUUID(ClusterState.UNKNOWN_UUID)
- .committed(true)
- .build();
-
- BlobPath blobPath = new BlobPath().add("random-path");
- when((blobStoreRepository.basePath())).thenReturn(blobPath);
- BlobContainer uuidContainerContainer = mock(BlobContainer.class);
- BlobContainer manifest2Container = mock(BlobContainer.class);
- BlobContainer manifest3Container = mock(BlobContainer.class);
- when(blobStore.blobContainer(any())).then(invocation -> {
- BlobPath blobPath1 = invocation.getArgument(0);
- if (blobPath1.buildAsString().endsWith("cluster-state/")) {
- return uuidContainerContainer;
- } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
- return manifest2Container;
- } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
- return manifest3Container;
- } else {
- throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
- }
- });
- Map blobMetadataMap = Map.of(
- "cluster-uuid1",
- mock(BlobContainer.class),
- "cluster-uuid2",
- mock(BlobContainer.class),
- "cluster-uuid3",
- mock(BlobContainer.class)
- );
- when(uuidContainerContainer.children()).thenReturn(blobMetadataMap);
- when(
- manifest2Container.listBlobsByPrefixInSortedOrder(
- MANIFEST_FILE_PREFIX + DELIMITER,
- Integer.MAX_VALUE,
- BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
- )
- ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
- when(
- manifest3Container.listBlobsByPrefixInSortedOrder(
- MANIFEST_FILE_PREFIX + DELIMITER,
- Integer.MAX_VALUE,
- BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
- )
- ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
- remoteClusterStateService.start();
- remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
- try {
- assertBusy(() -> {
- verify(manifest2Container, times(1)).delete();
- verify(manifest3Container, times(1)).delete();
- });
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
public void testRemoteStateStats() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
@@ -1358,26 +1290,6 @@ public void testRemoteStateStats() throws IOException {
assertEquals(0, remoteClusterStateService.getStats().getFailedCount());
}
- public void testRemoteStateCleanupFailureStats() throws IOException {
- BlobContainer blobContainer = mock(BlobContainer.class);
- doThrow(IOException.class).when(blobContainer).delete();
- when(blobStore.blobContainer(any())).thenReturn(blobContainer);
- BlobPath blobPath = new BlobPath().add("random-path");
- when((blobStoreRepository.basePath())).thenReturn(blobPath);
- remoteClusterStateService.start();
- remoteClusterStateService.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1"));
- try {
- assertBusy(() -> {
- // wait for stats to get updated
- assertTrue(remoteClusterStateService.getStats() != null);
- assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
- assertEquals(1, remoteClusterStateService.getStats().getCleanupAttemptFailedCount());
- });
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
public void testFileNames() {
final Index index = new Index("test-index", "index-uuid");
final Settings idxSettings = Settings.builder()
@@ -1418,36 +1330,6 @@ private void verifyManifestFileNameWithCodec(int codecVersion) {
assertThat(splittedName[3], is("P"));
}
- public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
- BlobContainer blobContainer = mock(BlobContainer.class);
- BlobPath blobPath = new BlobPath().add("random-path");
- when((blobStoreRepository.basePath())).thenReturn(blobPath);
- when(blobStore.blobContainer(any())).thenReturn(blobContainer);
-
- CountDownLatch latch = new CountDownLatch(1);
- AtomicInteger callCount = new AtomicInteger(0);
- doAnswer(invocation -> {
- callCount.incrementAndGet();
- if (latch.await(5000, TimeUnit.SECONDS) == false) {
- throw new Exception("Timed out waiting for delete task queuing to complete");
- }
- return null;
- }).when(blobContainer)
- .listBlobsByPrefixInSortedOrder(
- any(String.class),
- any(int.class),
- any(BlobContainer.BlobNameSortOrder.class),
- any(ActionListener.class)
- );
-
- remoteClusterStateService.start();
- remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
- remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
-
- latch.countDown();
- assertBusy(() -> assertEquals(1, callCount.get()));
- }
-
public void testIndexMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
@@ -1891,7 +1773,7 @@ private static ClusterState.Builder generateClusterStateWithGlobalMetadata() {
);
}
- private static ClusterState.Builder generateClusterStateWithOneIndex() {
+ static ClusterState.Builder generateClusterStateWithOneIndex() {
final Index index = new Index("test-index", "index-uuid");
final Settings idxSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
@@ -1921,7 +1803,7 @@ private static ClusterState.Builder generateClusterStateWithOneIndex() {
);
}
- private static DiscoveryNodes nodesWithLocalNodeClusterManager() {
+ static DiscoveryNodes nodesWithLocalNodeClusterManager() {
return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build();
}