Skip to content

Commit

Permalink
Bind SegmentReplicarionStatsTracker in Node.java
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Oct 12, 2023
1 parent 2235f68 commit 1c11770
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ public SegmentReplicationPressureService(
ClusterService clusterService,
IndicesService indicesService,
ShardStateAction shardStateAction,
SegmentReplicationStatsTracker tracker,
ThreadPool threadPool
) {
this.indicesService = indicesService;
this.tracker = new SegmentReplicationStatsTracker(this.indicesService);
this.tracker = tracker;
this.shardStateAction = shardStateAction;
this.threadPool = threadPool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index;

import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -28,6 +29,7 @@ public class SegmentReplicationStatsTracker {
private final IndicesService indicesService;
private final Map<ShardId, AtomicInteger> rejectionCount;

@Inject
public SegmentReplicationStatsTracker(IndicesService indicesService) {
this.indicesService = indicesService;
rejectionCount = ConcurrentCollections.newConcurrentMap();
Expand Down
8 changes: 3 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.metadata.AliasValidator;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
Expand Down Expand Up @@ -137,7 +136,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
Expand Down Expand Up @@ -966,8 +965,7 @@ protected Node(
transportService.getTaskManager()
);

ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, clusterModule.getAllocationService(), rerouteService, threadPool);
final SegmentReplicationPressureService segmentReplicationPressureService =new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, threadPool);
final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class),
Expand Down Expand Up @@ -1096,7 +1094,7 @@ protected Node(
searchPipelineService,
fileCache,
taskCancellationMonitoringService,
segmentReplicationPressureService
segmentReplicationStatsTracker
);

final SearchService searchService = newSearchService(
Expand Down
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.opensearch.discovery.Discovery;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
Expand Down Expand Up @@ -94,7 +94,7 @@ public class NodeService implements Closeable {
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;

private final SegmentReplicationPressureService segmentReplicationPressureService;
private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;

NodeService(
Settings settings,
Expand All @@ -118,7 +118,7 @@ public class NodeService implements Closeable {
SearchPipelineService searchPipelineService,
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService,
SegmentReplicationPressureService segmentReplicationPressureService
SegmentReplicationStatsTracker segmentReplicationStatsTracker
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -143,7 +143,7 @@ public class NodeService implements Closeable {
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationPressureService = segmentReplicationPressureService;
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
}

public NodeInfo info(
Expand Down Expand Up @@ -223,7 +223,7 @@ public NodeStats stats(
boolean fileCacheStats,
boolean taskCancellation,
boolean searchPipelineStats,
boolean segmentReplicationBackPressureStats
boolean segmentReplicationTrackerStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -252,7 +252,7 @@ public NodeStats stats(
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationBackPressureStats ? this.segmentReplicationPressureService.nodeStats() : null
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ private SegmentReplicationPressureService buildPressureService(Settings settings
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));

return new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, mock(ThreadPool.class));
return new SegmentReplicationPressureService(
settings,
clusterService,
indicesService,
shardStateAction,
new SegmentReplicationStatsTracker(indicesService),
mock(ThreadPool.class)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
import org.opensearch.gateway.TransportNodesListGatewayStartedShards;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
Expand Down Expand Up @@ -2186,6 +2187,7 @@ public void onFailure(final Exception e) {
clusterService,
mock(IndicesService.class),
mock(ShardStateAction.class),
mock(SegmentReplicationStatsTracker.class),
mock(ThreadPool.class)
),
mock(RemoteStorePressureService.class),
Expand Down

0 comments on commit 1c11770

Please sign in to comment.