From c564ee35ec000c38fe9d525a4b6add90cfc969fc Mon Sep 17 00:00:00 2001 From: panguixin Date: Sat, 3 Feb 2024 08:27:13 +0800 Subject: [PATCH] Cleanup file cache on deleting index/shard directory (#11443) * cleanup file cache on deleting index/shard directory Signed-off-by: panguixin * add index store listener Signed-off-by: panguixin --------- Signed-off-by: panguixin --- .../snapshots/SearchableSnapshotIT.java | 74 +++++++++++++ .../org/opensearch/env/NodeEnvironment.java | 43 +++++++- .../remote/filecache/FileCacheCleaner.java | 100 ++++++++++-------- .../opensearch/indices/IndicesService.java | 5 - .../main/java/org/opensearch/node/Node.java | 8 +- .../opensearch/env/NodeEnvironmentTests.java | 56 ++++++++++ .../filecache/FileCacheCleanerTests.java | 18 +--- .../snapshots/SnapshotResiliencyTests.java | 3 - 8 files changed, 235 insertions(+), 72 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 21554a8e4fb15..c89fef20aafb1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -30,14 +30,17 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.Priority; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.monitor.fs.FsInfo; @@ -859,4 +862,75 @@ private void assertCacheDirectoryReplicaAndIndexCount(int numCacheFolderCount, i // Verifies if all the shards (primary and replica) have been deleted assertEquals(numCacheFolderCount, searchNodeFileCachePaths.size()); } + + public void testRelocateSearchableSnapshotIndex() throws Exception { + final String snapshotName = "test-snap"; + final String repoName = "test-repo"; + final String indexName = "test-idx-1"; + final String restoredIndexName = indexName + "-copy"; + final Client client = client(); + + internalCluster().ensureAtLeastNumDataNodes(1); + createIndexWithDocsAndEnsureGreen(0, 100, indexName); + + createRepositoryWithSettings(null, repoName); + takeSnapshot(client, snapshotName, repoName, indexName); + deleteIndicesAndEnsureGreen(client, indexName); + + String searchNode1 = internalCluster().startSearchOnlyNodes(1).get(0); + internalCluster().validateClusterFormed(); + restoreSnapshotAndEnsureGreen(client, snapshotName, repoName); + assertRemoteSnapshotIndexSettings(client, restoredIndexName); + + String searchNode2 = internalCluster().startSearchOnlyNodes(1).get(0); + internalCluster().validateClusterFormed(); + + final Index index = resolveIndex(restoredIndexName); + assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, true); + assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false); + + // relocate the shard from node1 to node2 + client.admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(restoredIndexName, 0, searchNode1, searchNode2)) + .execute() + .actionGet(); + ClusterHealthResponse clusterHealthResponse = client.admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(new TimeValue(5, TimeUnit.MINUTES)) + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + assertDocCount(restoredIndexName, 100L); + + assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, false); + assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, true); + deleteIndicesAndEnsureGreen(client, restoredIndexName); + assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false); + } + + private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception { + final Node node = internalCluster().getInstance(Node.class, nodeName); + final ShardId shardId = new ShardId(index, 0); + final ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardId); + + assertBusy(() -> { + assertTrue( + "shard state path should " + (exists ? "exist" : "not exist"), + Files.exists(shardPath.getShardStatePath()) == exists + ); + assertTrue("shard cache path should " + (exists ? "exist" : "not exist"), Files.exists(shardPath.getDataPath()) == exists); + }, 30, TimeUnit.SECONDS); + + final Path indexDataPath = node.getNodeEnvironment().fileCacheNodePath().fileCachePath.resolve(index.getUUID()); + final Path indexPath = node.getNodeEnvironment().fileCacheNodePath().indicesPath.resolve(index.getUUID()); + assertBusy(() -> { + assertTrue("index path should " + (exists ? "exist" : "not exist"), Files.exists(indexDataPath) == exists); + assertTrue("index cache path should " + (exists ? "exist" : "not exist"), Files.exists(indexPath) == exists); + }, 30, TimeUnit.SECONDS); + } } diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index c7ba5eb040a1f..2748938d8b761 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -199,6 +199,8 @@ public String toString() { private final NodeMetadata nodeMetadata; + private final IndexStoreListener indexStoreListener; + /** * Maximum number of data nodes that should run in an environment. */ @@ -295,18 +297,23 @@ public void close() { } } + public NodeEnvironment(Settings settings, Environment environment) throws IOException { + this(settings, environment, IndexStoreListener.EMPTY); + } + /** * Setup the environment. * @param settings settings from opensearch.yml */ - public NodeEnvironment(Settings settings, Environment environment) throws IOException { - if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { + public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException { + if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) { nodePaths = null; fileCacheNodePath = null; sharedDataPath = null; locks = null; nodeLockId = -1; nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT); + this.indexStoreListener = IndexStoreListener.EMPTY; return; } boolean success = false; @@ -385,6 +392,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths); + this.indexStoreListener = indexStoreListener; success = true; } finally { if (success == false) { @@ -577,6 +585,9 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException { final ShardId shardId = lock.getShardId(); assert isShardLocked(shardId) : "shard " + shardId + " is not locked"; + + indexStoreListener.beforeShardPathDeleted(shardId, indexSettings, this); + final Path[] paths = availableShardPaths(shardId); logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths); acquireFSLockForPaths(indexSettings, paths); @@ -653,6 +664,8 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti * @param indexSettings settings for the index being deleted */ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException { + indexStoreListener.beforeIndexPathDeleted(index, indexSettings, this); + final Path[] indexPaths = indexPaths(index); logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths); IOUtils.rm(indexPaths); @@ -663,6 +676,18 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin } } + private void deleteIndexFileCacheDirectory(Index index) { + final Path indexCachePath = fileCacheNodePath().fileCachePath.resolve(index.getUUID()); + logger.trace("deleting index {} file cache directory, path: [{}]", index, indexCachePath); + if (Files.exists(indexCachePath)) { + try { + IOUtils.rm(indexCachePath); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e); + } + } + } + /** * Tries to lock all local shards for the given index. If any of the shard locks can't be acquired * a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released. @@ -1387,4 +1412,18 @@ private static void tryWriteTempFile(Path path) throws IOException { } } } + + /** + * A listener that is executed on per-index and per-shard store events, like deleting shard path + * + * @opensearch.internal + */ + public interface IndexStoreListener { + default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {} + + default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {} + + IndexStoreListener EMPTY = new IndexStoreListener() { + }; + } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java index fb89e651e7616..0261ab24dfa7a 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java @@ -11,16 +11,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.common.settings.Settings; +import org.opensearch.common.inject.Provider; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; -import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.ShardPath; -import org.opensearch.indices.cluster.IndicesClusterStateService; import java.io.IOException; import java.nio.file.DirectoryStream; @@ -30,79 +27,90 @@ import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; /** - * IndexEventListener to clean up file cache when the index is deleted. The cached entries will be eligible + * IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible * for eviction when the shard is deleted, but this listener deterministically removes entries from memory and * from disk at the time of shard deletion as opposed to waiting for the cache to need to perform eviction. * * @opensearch.internal */ -public class FileCacheCleaner implements IndexEventListener { - private static final Logger log = LogManager.getLogger(FileCacheCleaner.class); +public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener { + private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class); - private final NodeEnvironment nodeEnvironment; - private final FileCache fileCache; + private final Provider fileCacheProvider; - public FileCacheCleaner(NodeEnvironment nodeEnvironment, FileCache fileCache) { - this.nodeEnvironment = nodeEnvironment; - this.fileCache = fileCache; + public FileCacheCleaner(Provider fileCacheProvider) { + this.fileCacheProvider = fileCacheProvider; } /** - * before shard deleted and after shard closed, cleans up the corresponding index file path entries from FC. - * @param shardId The shard id - * @param settings the shards index settings + * before shard path deleted, cleans up the corresponding index file path entries from FC and delete the corresponding shard file + * cache path. + * + * @param shardId the shard id + * @param indexSettings the index settings + * @param nodeEnvironment the node environment */ @Override - public void beforeIndexShardDeleted(ShardId shardId, Settings settings) { + public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) { + if (indexSettings.isRemoteSnapshot()) { + final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId); + cleanupShardFileCache(shardPath); + deleteShardFileCacheDirectory(shardPath); + } + } + + /** + * Cleans up the corresponding index file path entries from FileCache + * + * @param shardPath the shard path + */ + private void cleanupShardFileCache(ShardPath shardPath) { try { - if (isRemoteSnapshot(settings)) { - final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId); - final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); - try (DirectoryStream ds = Files.newDirectoryStream(localStorePath)) { - for (Path subPath : ds) { - fileCache.remove(subPath.toRealPath()); - } + final FileCache fc = fileCacheProvider.get(); + assert fc != null; + final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); + try (DirectoryStream ds = Files.newDirectoryStream(localStorePath)) { + for (Path subPath : ds) { + fc.remove(subPath.toRealPath()); } } } catch (IOException ioe) { - log.error(() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardId), ioe); + logger.error( + () -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()), + ioe + ); } } - @Override - public void afterIndexShardDeleted(ShardId shardId, Settings settings) { - if (isRemoteSnapshot(settings)) { - final Path path = ShardPath.loadFileCachePath(nodeEnvironment, shardId).getDataPath(); - try { - if (Files.exists(path)) { - IOUtils.rm(path); - } - } catch (IOException e) { - log.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardId), e); + private void deleteShardFileCacheDirectory(ShardPath shardPath) { + final Path path = shardPath.getDataPath(); + try { + if (Files.exists(path)) { + IOUtils.rm(path); } + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e); } } + /** + * before index path deleted, delete the corresponding index file cache path. + * + * @param index the index + * @param indexSettings the index settings + * @param nodeEnvironment the node environment + */ @Override - public void afterIndexRemoved( - Index index, - IndexSettings indexSettings, - IndicesClusterStateService.AllocatedIndices.IndexRemovalReason reason - ) { - if (isRemoteSnapshot(indexSettings.getSettings()) - && reason == IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED) { + public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) { + if (indexSettings.isRemoteSnapshot()) { final Path indexCachePath = nodeEnvironment.fileCacheNodePath().fileCachePath.resolve(index.getUUID()); if (Files.exists(indexCachePath)) { try { IOUtils.rm(indexCachePath); } catch (IOException e) { - log.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e); + logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e); } } } } - - private static boolean isRemoteSnapshot(Settings settings) { - return IndexModule.Type.REMOTE_SNAPSHOT.match(settings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())); - } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index db5b93f073b03..c83f2a4c5cd5d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -135,7 +135,6 @@ import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; -import org.opensearch.index.store.remote.filecache.FileCacheCleaner; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.TranslogFactory; @@ -362,7 +361,6 @@ public class IndicesService extends AbstractLifecycleComponent private final BiFunction translogFactorySupplier; private volatile TimeValue clusterDefaultRefreshInterval; private volatile TimeValue clusterRemoteTranslogBufferInterval; - private final FileCacheCleaner fileCacheCleaner; private final SearchRequestStats searchRequestStats; @@ -395,7 +393,6 @@ public IndicesService( Map recoveryStateFactories, IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, Supplier repositoriesServiceSupplier, - FileCacheCleaner fileCacheCleaner, SearchRequestStats searchRequestStats, @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, RecoverySettings recoverySettings @@ -450,7 +447,6 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.directoryFactories = directoryFactories; this.recoveryStateFactories = recoveryStateFactories; - this.fileCacheCleaner = fileCacheCleaner; // doClose() is called when shutting down a node, yet there might still be ongoing requests // that we need to wait for before closing some resources such as the caches. In order to // avoid closing these resources while ongoing requests are still being processed, we use a @@ -766,7 +762,6 @@ public void onStoreClosed(ShardId shardId) { }; finalListeners.add(onStoreClose); finalListeners.add(oldShardsStats); - finalListeners.add(fileCacheCleaner); final IndexService indexService = createIndexService( CREATE_INDEX, indexMetadata, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 8510122c39fcb..547f610f4a752 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -527,7 +527,11 @@ protected Node( */ this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings)); Environment.assertEquivalent(initialEnvironment, this.environment); - nodeEnvironment = new NodeEnvironment(tmpSettings, environment); + if (DiscoveryNode.isSearchNode(settings) == false) { + nodeEnvironment = new NodeEnvironment(tmpSettings, environment); + } else { + nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache)); + } logger.info( "node name [{}], node ID [{}], cluster name [{}], roles {}", NODE_NAME_SETTING.get(tmpSettings), @@ -678,7 +682,6 @@ protected Node( ); // File cache will be initialized by the node once circuit breakers are in place. initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); - final FileCacheCleaner fileCacheCleaner = new FileCacheCleaner(nodeEnvironment, fileCache); final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache); pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> { @@ -812,7 +815,6 @@ protected Node( recoveryStateFactories, remoteDirectoryFactory, repositoriesServiceReference::get, - fileCacheCleaner, searchRequestStats, remoteStoreStatsTrackerFactory, recoverySettings diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 7f669934579ee..962eb743dca6e 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -359,6 +359,57 @@ protected void doRun() throws Exception { env.close(); } + public void testIndexStoreListener() throws Exception { + final AtomicInteger shardCounter = new AtomicInteger(0); + final AtomicInteger indexCounter = new AtomicInteger(0); + final Index index = new Index("foo", "fooUUID"); + final ShardId shardId = new ShardId(index, 0); + final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() { + @Override + public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(shardId, inShardId); + shardCounter.incrementAndGet(); + } + + @Override + public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(index, inIndex); + indexCounter.incrementAndGet(); + } + }; + final NodeEnvironment env = newNodeEnvironment(listener); + + for (Path path : env.indexPaths(index)) { + Files.createDirectories(path.resolve("0")); + } + + for (Path path : env.indexPaths(index)) { + assertTrue(Files.exists(path.resolve("0"))); + } + assertEquals(0, shardCounter.get()); + + env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings); + + for (Path path : env.indexPaths(index)) { + assertFalse(Files.exists(path.resolve("0"))); + } + assertEquals(1, shardCounter.get()); + + for (Path path : env.indexPaths(index)) { + assertTrue(Files.exists(path)); + } + assertEquals(0, indexCounter.get()); + + env.deleteIndexDirectorySafe(index, 5000, idxSettings); + + for (Path path : env.indexPaths(index)) { + assertFalse(Files.exists(path)); + } + assertEquals(1, indexCounter.get()); + assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); + env.close(); + } + public void testStressShardLock() throws IOException, InterruptedException { class Int { int value = 0; @@ -629,6 +680,11 @@ public NodeEnvironment newNodeEnvironment() throws IOException { return newNodeEnvironment(Settings.EMPTY); } + public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException { + Settings build = buildEnvSettings(Settings.EMPTY); + return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener); + } + @Override public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException { Settings build = buildEnvSettings(settings); diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java index 04434fa52e555..e2a6a4011a6b7 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java @@ -18,7 +18,6 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; -import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.test.OpenSearchTestCase; import org.hamcrest.MatcherAssert; import org.junit.After; @@ -59,7 +58,7 @@ public class FileCacheCleanerTests extends OpenSearchTestCase { @Before public void setUpFileCache() throws IOException { env = newNodeEnvironment(SETTINGS); - cleaner = new FileCacheCleaner(env, fileCache); + cleaner = new FileCacheCleaner(() -> fileCache); files.put(SHARD_0, addFile(fileCache, env, SHARD_0)); files.put(SHARD_1, addFile(fileCache, env, SHARD_1)); MatcherAssert.assertThat(fileCache.size(), equalTo(2L)); @@ -103,12 +102,11 @@ public void testShardRemoved() { final Path cachePath = ShardPath.loadFileCachePath(env, SHARD_0).getDataPath(); assertTrue(Files.exists(cachePath)); - cleaner.beforeIndexShardDeleted(SHARD_0, SETTINGS); + cleaner.beforeShardPathDeleted(SHARD_0, INDEX_SETTINGS, env); MatcherAssert.assertThat(fileCache.size(), equalTo(1L)); assertNull(fileCache.get(files.get(SHARD_0))); assertFalse(Files.exists(files.get(SHARD_0))); assertTrue(Files.exists(files.get(SHARD_1))); - cleaner.afterIndexShardDeleted(SHARD_0, SETTINGS); assertFalse(Files.exists(cachePath)); } @@ -116,15 +114,9 @@ public void testIndexRemoved() { final Path indexCachePath = env.fileCacheNodePath().fileCachePath.resolve(SHARD_0.getIndex().getUUID()); assertTrue(Files.exists(indexCachePath)); - cleaner.beforeIndexShardDeleted(SHARD_0, SETTINGS); - cleaner.afterIndexShardDeleted(SHARD_0, SETTINGS); - cleaner.beforeIndexShardDeleted(SHARD_1, SETTINGS); - cleaner.afterIndexShardDeleted(SHARD_1, SETTINGS); - cleaner.afterIndexRemoved( - SHARD_0.getIndex(), - INDEX_SETTINGS, - IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED - ); + cleaner.beforeShardPathDeleted(SHARD_0, INDEX_SETTINGS, env); + cleaner.beforeShardPathDeleted(SHARD_1, INDEX_SETTINGS, env); + cleaner.beforeIndexPathDeleted(SHARD_0.getIndex(), INDEX_SETTINGS, env); MatcherAssert.assertThat(fileCache.size(), equalTo(0L)); assertFalse(Files.exists(indexCachePath)); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9bb1f51c51cf6..7c50e961853b5 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -188,7 +188,6 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; -import org.opensearch.index.store.remote.filecache.FileCacheCleaner; import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesService; @@ -2037,7 +2036,6 @@ public void onFailure(final Exception e) { final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); final SetOnce repositoriesServiceReference = new SetOnce<>(); repositoriesServiceReference.set(repositoriesService); - FileCacheCleaner fileCacheCleaner = new FileCacheCleaner(nodeEnv, null); indicesService = new IndicesService( settings, mock(PluginsService.class), @@ -2072,7 +2070,6 @@ public void onFailure(final Exception e) { emptyMap(), new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, - fileCacheCleaner, null, new RemoteStoreStatsTrackerFactory(clusterService, settings), DefaultRecoverySettings.INSTANCE