Skip to content

Commit

Permalink
refactor query insight components into plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Dec 19, 2023
1 parent a1c3621 commit b082ae1
Show file tree
Hide file tree
Showing 51 changed files with 823 additions and 551 deletions.
1 change: 1 addition & 0 deletions gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ configure([
project(":plugins:mapper-annotated-text"),
project(":plugins:mapper-murmur3"),
project(":plugins:mapper-size"),
project(":plugins:query-insights"),
project(":plugins:repository-azure"),
project(":plugins:repository-gcs"),
project(":plugins:repository-hdfs"),
Expand Down
51 changes: 51 additions & 0 deletions plugins/query-insights/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

//apply plugin: 'opensearch.java-rest-test'
//apply plugin: 'opensearch.internal-cluster-test'

import org.opensearch.gradle.testclusters.RunTask
import org.opensearch.gradle.test.RestIntegTestTask
apply plugin: 'opensearch.testclusters'

opensearchplugin {
description 'OpenSearch Query Insights Plugin.'
classname 'org.opensearch.plugin.insights.QueryInsightsPlugin'
}

dependencies {
}

task integTest(type: RestIntegTestTask) {
description = "Run tests against a cluster"
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
}
tasks.named("check").configure { dependsOn(integTest) }

integTest {
// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}
}

testClusters.integTest {
testDistribution = "INTEG_TEST"

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
}

run {
useCluster testClusters.integTest
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights;

import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.insights.core.listener.SearchQueryLatencyListener;
import org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

/**
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin, SearchPlugin, EnginePlugin {
/**
* Default constructor
*/
public QueryInsightsPlugin() {}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// create top n queries service
TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client);
// top n queries listener
SearchQueryLatencyListener searchQueryLatencyListener = new SearchQueryLatencyListener(clusterService, topQueriesByLatencyService);
return List.of(topQueriesByLatencyService, searchQueryLatencyListener);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestTopQueriesAction());
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionPlugin.ActionHandler<>(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class));
}

@Override
public List<Setting<?>> getSettings() {
return List.of(
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.exporter;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.List;

Expand All @@ -19,24 +20,19 @@
*
* @opensearch.internal
*/
abstract class QueryInsightExporter<T extends SearchQueryRecord<?>> {
public abstract class QueryInsightExporter<T extends SearchQueryRecord<?>> {

private boolean enabled = false;

/** The export interval of this exporter, default to 60 seconds */
private TimeValue exportInterval = TimeValue.timeValueSeconds(60);

/**
* Initial set up the exporter
*/
abstract void setup() throws Exception;
private TimeValue exportInterval = TimeValue.timeValueSeconds(5);

/**
* Export the data with the exporter.
*
* @param records the data to export
*/
abstract void export(List<T> records) throws Exception;
public abstract void export(List<T> records) throws Exception;

public boolean getEnabled() {
return enabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,11 +28,13 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

/**
Expand All @@ -42,7 +44,7 @@
*
* @opensearch.internal
*/
public class QueryInsightLocalIndexExporter<T extends SearchQueryRecord<?>> extends QueryInsightExporter<T>{
public class QueryInsightLocalIndexExporter<T extends SearchQueryRecord<?>> extends QueryInsightExporter<T> {

private static final Logger log = LogManager.getLogger(QueryInsightLocalIndexExporter.class);

Expand All @@ -60,23 +62,15 @@ public QueryInsightLocalIndexExporter(
ClusterService clusterService,
Client client,
String localIndexName,
InputStream localIndexMapping) {
InputStream localIndexMapping
) {
this.setEnabled(enabled);
this.clusterService = clusterService;
this.client = client;
this.localIndexName = localIndexName;
this.localIndexMapping = localIndexMapping;
try {
setup();
} catch (IOException e) {
log.error(String.format("failed to set up with error %s.", e));
}
}

@Override
public void setup() throws IOException {}


/**
* Export the data to the predefined local OpenSearch Index
*
Expand All @@ -96,21 +90,26 @@ public synchronized void export(List<T> records) throws IOException {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(String.format("successfully initialized local index %s for query insight.", localIndexName));
log.debug(String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", localIndexName));
try {
bulkRecord(records);
} catch (IOException e) {
log.error(String.format("fail to ingest query insight data to local index, error: %s", e));
log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e));
}
}
else {
log.error(String.format("request to created local index %s for query insight not acknowledged.", localIndexName));
} else {
log.error(
String.format(
Locale.ROOT,
"request to created local index %s for query insight not acknowledged.",
localIndexName
)
);
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format("error creating local index for query insight: %s", e));
log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e));
}
});
}
Expand All @@ -133,9 +132,8 @@ private boolean checkIfIndexExists() {
* @throws IOException if an error occurs
*/
private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.localIndexName).mapping(
getIndexMappings()
).settings(Settings.builder().put("index.hidden", false).build());
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.localIndexName).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
}

Expand All @@ -157,11 +155,7 @@ private synchronized void dropLocalIndex(ActionListener<AcknowledgedResponse> li
* @throws IOException if an error occurs
*/
private String getIndexMappings() throws IOException {
return new String(
Objects.requireNonNull(this.localIndexMapping)
.readAllBytes(),
Charset.defaultCharset()
);
return new String(Objects.requireNonNull(this.localIndexMapping).readAllBytes(), Charset.defaultCharset());
}

/**
Expand All @@ -171,27 +165,26 @@ private String getIndexMappings() throws IOException {
* @throws IOException if an error occurs
*/
private synchronized void bulkRecord(List<T> records) throws IOException {
BulkRequest bulkRequest = new BulkRequest()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).timeout(TimeValue.timeValueSeconds(60));
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.timeout(TimeValue.timeValueSeconds(60));
for (T record : records) {
bulkRequest.add(
new IndexRequest(localIndexName)
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
new IndexRequest(localIndexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
client.bulk(bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) {
log.debug(String.format("successfully ingest data for %s! ", localIndexName));
}
else {
log.error(String.format("error when ingesting data for %s", localIndexName));
log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", localIndexName));
} else {
log.error(String.format(Locale.ROOT, "error when ingesting data for %s", localIndexName));
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format("failed to ingest data for %s, %s", localIndexName, e));
log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", localIndexName, e));
}
});
}
Expand All @@ -203,25 +196,23 @@ public void onFailure(Exception e) {
* @throws IOException if an error occurs
*/
private synchronized void indexRecord(T record) throws IOException {
IndexRequest indexRequest = new IndexRequest(localIndexName)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
IndexRequest indexRequest = new IndexRequest(localIndexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.timeout(TimeValue.timeValueSeconds(60));

client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse response) {
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) {
log.debug(String.format("successfully indexed data for %s ", localIndexName));
}
else {
log.error(String.format("failed to index data for %s", localIndexName));
log.debug(String.format(Locale.ROOT, "successfully indexed data for %s ", localIndexName));
} else {
log.error(String.format(Locale.ROOT, "failed to index data for %s", localIndexName));
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format("failed to index data for %s, error: %s", localIndexName, e));
log.error(String.format(Locale.ROOT, "failed to index data for %s, error: %s", localIndexName, e));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
* compatible open source license.
*/

/** Top n queries transport handlers. */
package org.opensearch.action.admin.cluster.insights.top_queries;
/**
* Exporters for Query Insights
*/
package org.opensearch.plugin.insights.core.exporter;
Loading

0 comments on commit b082ae1

Please sign in to comment.