Skip to content

Commit

Permalink
refactor service for improving multithreading efficiency
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Feb 5, 2024
1 parent 7d2b3ab commit bfd7e93
Show file tree
Hide file tree
Showing 19 changed files with 663 additions and 648 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ public void testGetTopQueriesWhenFeatureDisabled() {
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
"Cannot get top n queries for [latency] when it is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);
}

/**
* Test update top query record when feature enabled
*/
public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException {
public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();

logger.info("--> starting nodes for query insight testing");
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
"Cannot get top n queries for [latency] when it is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);

Expand All @@ -143,7 +143,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
/**
* Test get top queries when feature enabled
*/
public void testGetTopQueriesWhenFeatureEnabled() {
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand Down Expand Up @@ -174,7 +174,8 @@ public void testGetTopQueriesWhenFeatureEnabled() {
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

// Sleep to wait for queue drained to top queries store
Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Expand All @@ -187,7 +188,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
/**
* Test get top queries with small top n size
*/
public void testGetTopQueriesWithSmallTopN() {
public void testGetTopQueriesWithSmallTopN() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1")
Expand Down Expand Up @@ -218,7 +219,7 @@ public void testGetTopQueriesWithSmallTopN() {
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Expand All @@ -231,7 +232,7 @@ public void testGetTopQueriesWithSmallTopN() {
/**
* Test get top queries with small window size
*/
public void testGetTopQueriesWithSmallWindowSize() {
public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand Down Expand Up @@ -267,7 +268,7 @@ public void testGetTopQueriesWithSmallWindowSize() {
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());

Thread.sleep(6000);
internalCluster().stopAllNodes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,25 @@ public QueryInsightsPlugin() {}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// create top n queries service
QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
return List.of(

Check warning on line 79 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java#L79

Added line #L79 was not covered by tests
new ScalingExecutorBuilder(
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR,
Expand All @@ -88,13 +88,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
final Settings settings,
final RestController restController,
final ClusterSettings clusterSettings,
final IndexScopedSettings indexScopedSettings,
final SettingsFilter settingsFilter,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestTopQueriesAction());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

Expand All @@ -34,7 +33,9 @@
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;

/**
* The listener for top N queries by latency
* The listener for query insights services.
* It forwards query-related data to the appropriate query insights stores,
* either for each request or for each phase.
*
* @opensearch.internal
*/
Expand All @@ -52,47 +53,53 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) {
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v));
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
this.queryInsightsService::setTopNSize,
this.queryInsightsService::validateTopNSize
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v)

Check warning on line 64 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java#L63-L64

Added lines #L63 - L64 were not covered by tests
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
this.queryInsightsService::setWindowSize,
this.queryInsightsService::validateWindowSize
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v)

Check warning on line 70 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java#L69-L70

Added lines #L69 - L70 were not covered by tests
);
this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
}

/**
* Enable or disable metric collection for {@link MetricType}
* Enable or disable top queries insights collection for {@link MetricType}
* This function will enable or disable the corresponding listeners
* and query insights services.
*
* @param metricType {@link MetricType}
* @param enabled boolean
*/
public void setEnabled(MetricType metricType, boolean enabled) {
this.queryInsightsService.enableCollection(metricType, enabled);

// disable QueryInsightsListener only if collection for all metrics are disabled.
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
if (!enabled) {
// disable QueryInsightsListener only if all metrics collections are disabled.
for (MetricType t : MetricType.allMetricTypes()) {
if (this.queryInsightsService.isCollectionEnabled(t)) {
this.queryInsightsService.getTopQueriesService(metricType).setEnabled(false);
return;
}
}
super.setEnabled(false);
this.queryInsightsService.stop();
} else {
super.setEnabled(true);
this.queryInsightsService.start();
}
this.queryInsightsService.enableCollection(metricType, enabled);
}

@Override
Expand All @@ -113,17 +120,14 @@ public void onPhaseFailure(SearchPhaseContext context) {}
public void onRequestStart(SearchRequestContext searchRequestContext) {}

Check warning on line 120 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java#L120

Added line #L120 was not covered by tests

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
SearchRequest request = context.getRequest();
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
final SearchRequest request = context.getRequest();
try {
Map<MetricType, Measurement<? extends Number>> measurements = new HashMap<>();
Map<MetricType, Number> measurements = new HashMap<>();
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
new Measurement<>(
MetricType.LATENCY.name(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
)
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
);
}
Map<Attribute, Object> attributes = new HashMap<>();
Expand Down
Loading

0 comments on commit bfd7e93

Please sign in to comment.