diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index c892b475d71da..ad67a7baaf295 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1312,21 +1312,25 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List { - String clusterName = clusterState.getClusterName().value(); - logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); - Set allClustersUUIDsInRemote; - try { - allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value())); - } catch (IOException e) { - logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); - return; - } - // Retain last 2 cluster uuids data - allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); - allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); - deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); - }); + try { + threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + String clusterName = clusterState.getClusterName().value(); + logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); + Set allClustersUUIDsInRemote; + try { + allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value())); + } catch (IOException e) { + logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); + return; + } + // Retain last 2 cluster uuids data + allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); + allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); + deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); + }); + } catch (Exception e) { + logger.error("Exception occurred while scheduling deletion of stale cluster UUIDs from remote store", e); + } } public RemotePersistenceStats getStats() { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 2f6055df87804..55160570fe6f1 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -408,29 +408,33 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna */ public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) { logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep); - transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() { - @Override - public void onResponse(Set folders) { - Set primaryTermsInRemote = folders.stream().filter(folderName -> { - try { - Long.parseLong(folderName); - return true; - } catch (Exception ignored) { - // NO-OP - } - return false; - }).map(Long::parseLong).collect(Collectors.toSet()); - Set primaryTermsToDelete = primaryTermsInRemote.stream() - .filter(term -> term < minPrimaryTermToKeep) - .collect(Collectors.toSet()); - primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term)); - } + try { + transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() { + @Override + public void onResponse(Set folders) { + Set primaryTermsInRemote = folders.stream().filter(folderName -> { + try { + Long.parseLong(folderName); + return true; + } catch (Exception ignored) { + // NO-OP + } + return false; + }).map(Long::parseLong).collect(Collectors.toSet()); + Set primaryTermsToDelete = primaryTermsInRemote.stream() + .filter(term -> term < minPrimaryTermToKeep) + .collect(Collectors.toSet()); + primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term)); + } - @Override - public void onFailure(Exception e) { - logger.error("Exception occurred while getting primary terms from remote store", e); - } - }); + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred while getting primary terms from remote store", e); + } + }); + } catch (Exception e) { + logger.error("Exception occurred while scheduling listing primary terms from remote store", e); + } } /** @@ -457,18 +461,22 @@ public void onFailure(Exception e) { } public void delete() { - // cleans up all the translog contents in async fashion - transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info("Deleted all remote translog data"); - } + // cleans up all the translog contents in async fashion in a best effort way + try { + transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote translog data"); + } - @Override - public void onFailure(Exception e) { - logger.error("Exception occurred while cleaning translog", e); - } - }); + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred while cleaning translog", e); + } + }); + } catch (Exception e) { + logger.error("Exception occurred while scheduling delete from remote store", e); + } } public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) { @@ -547,7 +555,14 @@ public void onFailure(Exception e) { ); } catch (Exception e) { onCompletion.run(); - throw e; + logger.error( + () -> new ParameterizedMessage( + "Exception occurred while deleting translog for primaryTerm={} files={}", + primaryTerm, + files + ), + e + ); } } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 0b9026b81eb4e..4876365ddd957 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -183,7 +183,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); - map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); + map.put(Names.REMOTE_PURGE, ThreadPoolType.FIXED); map.put(Names.REMOTE_REFRESH_RETRY, ThreadPoolType.SCALING); map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING); map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); @@ -265,7 +265,7 @@ public ThreadPool( new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProc, TimeValue.timeValueMinutes(5)) ); builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000)); - builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProc, TimeValue.timeValueMinutes(5))); + builders.put(Names.REMOTE_PURGE, new FixedExecutorBuilder(settings, Names.REMOTE_PURGE, halfProc, 10000)); builders.put( Names.REMOTE_REFRESH_RETRY, new ScalingExecutorBuilder(Names.REMOTE_REFRESH_RETRY, 1, halfProc, TimeValue.timeValueMinutes(5))