-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Query insights plugin implementation
Signed-off-by: Chenyang Ji <[email protected]>
- Loading branch information
Showing
29 changed files
with
1,734 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
opensearchplugin { | ||
description 'OpenSearch Query Insights Plugin.' | ||
classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' | ||
} | ||
|
||
dependencies { | ||
} |
87 changes: 87 additions & 0 deletions
87
plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights; | ||
|
||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.opensearch.cluster.node.DiscoveryNodes; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.IndexScopedSettings; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.settings.SettingsFilter; | ||
import org.opensearch.core.action.ActionResponse; | ||
import org.opensearch.core.common.io.stream.NamedWriteableRegistry; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.env.Environment; | ||
import org.opensearch.env.NodeEnvironment; | ||
import org.opensearch.plugins.ActionPlugin; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.rest.RestController; | ||
import org.opensearch.rest.RestHandler; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.watcher.ResourceWatcherService; | ||
|
||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Plugin class for Query Insights. | ||
*/ | ||
public class QueryInsightsPlugin extends Plugin implements ActionPlugin { | ||
/** | ||
* Default constructor | ||
*/ | ||
public QueryInsightsPlugin() {} | ||
|
||
@Override | ||
public Collection<Object> createComponents( | ||
Client client, | ||
ClusterService clusterService, | ||
ThreadPool threadPool, | ||
ResourceWatcherService resourceWatcherService, | ||
ScriptService scriptService, | ||
NamedXContentRegistry xContentRegistry, | ||
Environment environment, | ||
NodeEnvironment nodeEnvironment, | ||
NamedWriteableRegistry namedWriteableRegistry, | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
Supplier<RepositoriesService> repositoriesServiceSupplier | ||
) { | ||
return List.of(); | ||
} | ||
|
||
@Override | ||
public List<RestHandler> getRestHandlers( | ||
Settings settings, | ||
RestController restController, | ||
ClusterSettings clusterSettings, | ||
IndexScopedSettings indexScopedSettings, | ||
SettingsFilter settingsFilter, | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
Supplier<DiscoveryNodes> nodesInCluster | ||
) { | ||
return List.of(); | ||
} | ||
|
||
@Override | ||
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() { | ||
return List.of(); | ||
} | ||
|
||
@Override | ||
public List<Setting<?>> getSettings() { | ||
return List.of(); | ||
} | ||
} |
53 changes: 53 additions & 0 deletions
53
...hts/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.exporter; | ||
|
||
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Simple abstract class to export data collected by search query analyzers | ||
* <p> | ||
* Mainly for use within the Query Insight framework | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> { | ||
private QueryInsightsExporterType type; | ||
private String identifier; | ||
|
||
QueryInsightsExporter(QueryInsightsExporterType type, String identifier) { | ||
this.type = type; | ||
this.identifier = identifier; | ||
} | ||
|
||
/** | ||
* Export the data with the exporter. | ||
* | ||
* @param records the data to export | ||
*/ | ||
public abstract void export(List<T> records) throws Exception; | ||
|
||
public void setType(QueryInsightsExporterType type) { | ||
this.type = type; | ||
} | ||
|
||
public QueryInsightsExporterType getType() { | ||
return type; | ||
} | ||
|
||
public void setIdentifier(String identifier) { | ||
this.identifier = identifier; | ||
} | ||
|
||
public String getIdentifier() { | ||
return identifier; | ||
} | ||
} |
31 changes: 31 additions & 0 deletions
31
...src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.exporter; | ||
|
||
import java.util.Locale; | ||
|
||
/** | ||
* Types for the Query Insights Exporters | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public enum QueryInsightsExporterType { | ||
/* local index exporter */ | ||
LOCAL_INDEX("local_index"); | ||
|
||
private final String type; | ||
|
||
QueryInsightsExporterType(String type) { | ||
this.type = type; | ||
} | ||
|
||
public static QueryInsightsExporterType parse(String type) { | ||
return valueOf(type.toUpperCase(Locale.ROOT)); | ||
} | ||
} |
181 changes: 181 additions & 0 deletions
181
...in/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.exporter; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.opensearch.action.admin.indices.create.CreateIndexResponse; | ||
import org.opensearch.action.bulk.BulkRequest; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.action.support.WriteRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.common.xcontent.XContentFactory; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.rest.RestStatus; | ||
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.charset.Charset; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Class to export data collected by search query analyzers to a local OpenSearch index | ||
* <p> | ||
* Internal used within the Query Insight framework | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class QueryInsightsLocalIndexExporter<T extends SearchQueryRecord<?>> extends QueryInsightsExporter<T> { | ||
private static final Logger log = LogManager.getLogger(QueryInsightsLocalIndexExporter.class); | ||
private static final int INDEX_TIMEOUT = 60; | ||
|
||
private final ClusterService clusterService; | ||
private final Client client; | ||
|
||
/** The mapping for the local index that holds the data */ | ||
private final InputStream localIndexMapping; | ||
|
||
public QueryInsightsLocalIndexExporter( | ||
ClusterService clusterService, | ||
Client client, | ||
String localIndexName, | ||
InputStream localIndexMapping | ||
) { | ||
super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName); | ||
this.clusterService = clusterService; | ||
this.client = client; | ||
this.localIndexMapping = localIndexMapping; | ||
} | ||
|
||
/** | ||
* Export the data to the predefined local OpenSearch Index | ||
* | ||
* @param records the data to export | ||
* @throws IOException if an error occurs | ||
*/ | ||
@Override | ||
public synchronized void export(List<T> records) throws IOException { | ||
if (records.size() == 0) { | ||
return; | ||
} | ||
if (checkIfIndexExists()) { | ||
bulkRecord(records); | ||
} else { | ||
// local index not exist | ||
initLocalIndex(new ActionListener<>() { | ||
@Override | ||
public void onResponse(CreateIndexResponse response) { | ||
if (response.isAcknowledged()) { | ||
log.debug( | ||
String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier()) | ||
); | ||
try { | ||
bulkRecord(records); | ||
} catch (IOException e) { | ||
log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e)); | ||
} | ||
} else { | ||
log.error( | ||
String.format( | ||
Locale.ROOT, | ||
"request to created local index %s for query insight not acknowledged.", | ||
getIdentifier() | ||
) | ||
); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e)); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* Util function to check if a local OpenSearch Index exists | ||
* | ||
* @return boolean | ||
*/ | ||
private boolean checkIfIndexExists() { | ||
ClusterState clusterState = clusterService.state(); | ||
return clusterState.getRoutingTable().hasIndex(this.getIdentifier()); | ||
} | ||
|
||
/** | ||
* Initialize the local OpenSearch Index for the exporter | ||
* | ||
* @param listener the listener to be notified upon completion | ||
* @throws IOException if an error occurs | ||
*/ | ||
private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException { | ||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings()) | ||
.settings(Settings.builder().put("index.hidden", false).build()); | ||
client.admin().indices().create(createIndexRequest, listener); | ||
} | ||
|
||
/** | ||
* Get the index mapping of the local index | ||
* | ||
* @return String to represent the index mapping | ||
* @throws IOException if an error occurs | ||
*/ | ||
private String getIndexMappings() throws IOException { | ||
return new String(Objects.requireNonNull(this.localIndexMapping).readAllBytes(), Charset.defaultCharset()); | ||
} | ||
|
||
/** | ||
* Bulk ingest the data into to the predefined local OpenSearch Index | ||
* | ||
* @param records the data to export | ||
* @throws IOException if an error occurs | ||
*/ | ||
private void bulkRecord(List<T> records) throws IOException { | ||
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT)); | ||
for (T record : records) { | ||
bulkRequest.add( | ||
new IndexRequest(getIdentifier()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
); | ||
} | ||
client.bulk(bulkRequest, new ActionListener<>() { | ||
@Override | ||
public void onResponse(BulkResponse response) { | ||
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) { | ||
log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", getIdentifier())); | ||
} else { | ||
log.error( | ||
String.format( | ||
Locale.ROOT, | ||
"error when ingesting data for %s, error: %s", | ||
getIdentifier(), | ||
response.buildFailureMessage() | ||
) | ||
); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", getIdentifier(), e)); | ||
} | ||
}); | ||
} | ||
} |
Oops, something went wrong.