diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 4284daf9ffef4..d9d480e7b2b27 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -106,10 +106,11 @@ public SegmentReplicationPressureService( ClusterService clusterService, IndicesService indicesService, ShardStateAction shardStateAction, + SegmentReplicationStatsTracker tracker, ThreadPool threadPool ) { this.indicesService = indicesService; - this.tracker = new SegmentReplicationStatsTracker(this.indicesService); + this.tracker = tracker; this.shardStateAction = shardStateAction; this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 2a7aca0a1c7ed..f018aa5a800de 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -8,6 +8,7 @@ package org.opensearch.index; +import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; @@ -28,6 +29,7 @@ public class SegmentReplicationStatsTracker { private final IndicesService indicesService; private final Map rejectionCount; + @Inject public SegmentReplicationStatsTracker(IndicesService indicesService) { this.indicesService = indicesService; rejectionCount = ConcurrentCollections.newConcurrentMap(); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 2b9c79b21ff29..5d8dd0401ce35 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -62,7 +62,6 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; -import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexTemplateMetadata; @@ -137,7 +136,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexingPressureService; -import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.recovery.RemoteStoreRestoreService; @@ -966,8 +965,7 @@ protected Node( transportService.getTaskManager() ); - ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, clusterModule.getAllocationService(), rerouteService, threadPool); - final SegmentReplicationPressureService segmentReplicationPressureService =new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, threadPool); + final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), @@ -1096,7 +1094,7 @@ protected Node( searchPipelineService, fileCache, taskCancellationMonitoringService, - segmentReplicationPressureService + segmentReplicationStatsTracker ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 68dc4c3a00581..59c44ee90a475 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -48,7 +48,7 @@ import org.opensearch.discovery.Discovery; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexingPressureService; -import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.IndicesService; import org.opensearch.ingest.IngestService; @@ -94,7 +94,7 @@ public class NodeService implements Closeable { private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; - private final SegmentReplicationPressureService segmentReplicationPressureService; + private final SegmentReplicationStatsTracker segmentReplicationStatsTracker; NodeService( Settings settings, @@ -118,7 +118,7 @@ public class NodeService implements Closeable { SearchPipelineService searchPipelineService, FileCache fileCache, TaskCancellationMonitoringService taskCancellationMonitoringService, - SegmentReplicationPressureService segmentReplicationPressureService + SegmentReplicationStatsTracker segmentReplicationStatsTracker ) { this.settings = settings; this.threadPool = threadPool; @@ -143,7 +143,7 @@ public class NodeService implements Closeable { this.taskCancellationMonitoringService = taskCancellationMonitoringService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); - this.segmentReplicationPressureService = segmentReplicationPressureService; + this.segmentReplicationStatsTracker = segmentReplicationStatsTracker; } public NodeInfo info( @@ -223,7 +223,7 @@ public NodeStats stats( boolean fileCacheStats, boolean taskCancellation, boolean searchPipelineStats, - boolean segmentReplicationBackPressureStats + boolean segmentReplicationTrackerStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -252,7 +252,7 @@ public NodeStats stats( fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, searchPipelineStats ? this.searchPipelineService.stats() : null, - segmentReplicationBackPressureStats ? this.segmentReplicationPressureService.nodeStats() : null + segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getStats() : null ); } diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index 34fa13f0ba62c..478fdcb24f76a 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -278,6 +278,13 @@ private SegmentReplicationPressureService buildPressureService(Settings settings ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - return new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, mock(ThreadPool.class)); + return new SegmentReplicationPressureService( + settings, + clusterService, + indicesService, + shardStateAction, + new SegmentReplicationStatsTracker(indicesService), + mock(ThreadPool.class) + ); } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 97c5d23831965..8458b887a8c3c 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -178,6 +178,7 @@ import org.opensearch.gateway.TransportNodesListGatewayStartedShards; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; @@ -2186,6 +2187,7 @@ public void onFailure(final Exception e) { clusterService, mock(IndicesService.class), mock(ShardStateAction.class), + mock(SegmentReplicationStatsTracker.class), mock(ThreadPool.class) ), mock(RemoteStorePressureService.class),