From 5b0843e9d8f83c455fab145f38e81b7daa58b857 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 16 Dec 2024 13:03:25 +0530 Subject: [PATCH] Change Remote state read thread pool to Fixed type --- .../TransportClusterManagerNodeAction.java | 9 ++++++++- .../coordination/PublicationTransportHandler.java | 2 +- .../gateway/remote/RemoteClusterStateService.java | 14 ++++++++++++++ .../java/org/opensearch/threadpool/ThreadPool.java | 4 ++-- .../threadpool/ScalingThreadPoolTests.java | 1 - 5 files changed, 25 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 819e09312a0df..558b7370749d5 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -430,6 +430,13 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes if (remoteClusterStateService != null && termVersionResponse.isStatePresentInRemote()) { try { + logger.info( + () -> new ParameterizedMessage( + "Term version checker downloading full cluster state for term {}, version {}", + termVersion.getTerm(), + termVersion.getVersion() + ) + ); ClusterStateTermVersion clusterStateTermVersion = termVersionResponse.getClusterStateTermVersion(); Optional clusterMetadataManifest = remoteClusterStateService .getClusterMetadataManifestByTermVersion( @@ -454,7 +461,7 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes return clusterStateFromRemote; } } catch (Exception e) { - logger.trace("Error while fetching from remote cluster state", e); + logger.error("Error while fetching from remote cluster state", e); } } return null; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 7275d72f2db9f..4ad5b80038048 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -258,7 +258,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest } if (applyFullState == true) { - logger.debug( + logger.info( () -> new ParameterizedMessage( "Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index c5fc6d5cae6a7..76708460a6e0c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1473,8 +1473,22 @@ public ClusterState getClusterStateForManifest( try { ClusterState stateFromCache = remoteClusterStateCache.getState(clusterName, manifest); if (stateFromCache != null) { + logger.trace( + () -> new ParameterizedMessage( + "Found cluster state in cache for term {} and version {}", + manifest.getClusterTerm(), + manifest.getStateVersion() + ) + ); return stateFromCache; } + logger.info( + () -> new ParameterizedMessage( + "Cluster state not found in cache for term {} and version {}", + manifest.getClusterTerm(), + manifest.getStateVersion() + ) + ); final ClusterState clusterState; final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 269a4c87dfb72..59d3b110aeca8 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -198,7 +198,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); map.put(Names.REMOTE_REFRESH_RETRY, ThreadPoolType.SCALING); map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING); - map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING); + map.put(Names.REMOTE_STATE_READ, ThreadPoolType.FIXED); map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); @@ -306,7 +306,7 @@ public ThreadPool( ); builders.put( Names.REMOTE_STATE_READ, - new ScalingExecutorBuilder(Names.REMOTE_STATE_READ, 1, boundedBy(4 * allocatedProcessors, 4, 32), TimeValue.timeValueMinutes(5)) + new FixedExecutorBuilder(settings, Names.REMOTE_STATE_READ, boundedBy(4 * allocatedProcessors, 4, 32), 120000) ); builders.put( Names.INDEX_SEARCHER, diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index b4726bab50198..23c21648b1263 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -156,7 +156,6 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessors); sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors); sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors); - sizes.put(ThreadPool.Names.REMOTE_STATE_READ, n -> ThreadPool.boundedBy(4 * n, 4, 32)); return sizes.get(threadPoolName).apply(numberOfProcessors); }