From 9b494b6135f00582b437fc2340392b70a7e07080 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Fri, 2 Aug 2024 02:54:47 +0000 Subject: [PATCH] Mitigation for overflowed filecache Signed-off-by: Finn Carroll --- .../store/remote/utils/TransferManager.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index f07c4832d982c..9aaab7e0aa0d4 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -15,6 +15,7 @@ import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCachedIndexInput; +import org.opensearch.index.store.remote.utils.cache.CacheUsage; import java.io.BufferedOutputStream; import java.io.IOException; @@ -95,6 +96,25 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio @SuppressWarnings("removal") private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { try { + // This local file cache is ref counted and may not strictly enforce capacity. + // In our use case capacity directly relates to disk usage. + // If we find available capacity is exceeded deny further BlobFetchRequests. + if (fileCache.usage().usage() > fileCache.capacity()) { + System.gc(); + + // File reference cleanup is not immediate and is processed + // in a dedicated thread. + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + fileCache.prune(); + if (fileCache.usage().usage() > fileCache.capacity()) { + throw new IOException("Local file cache capacity exceeded - BlobFetchRequest failed: " + request.getFilePath()); + } + } if (Files.exists(request.getFilePath()) == false) { logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); try (