Skip to content

Commit

Permalink
Add sessionId parameters for create async query API (#2312)
Browse files Browse the repository at this point in the history
* add InteractiveSession and SessionManager

Signed-off-by: Peng Huo <[email protected]>

* add statement

Signed-off-by: Peng Huo <[email protected]>

* add statement

Signed-off-by: Peng Huo <[email protected]>

* fix format

Signed-off-by: Peng Huo <[email protected]>

* snapshot

Signed-off-by: Peng Huo <[email protected]>

* address comments

Signed-off-by: Peng Huo <[email protected]>

* update

Signed-off-by: Peng Huo <[email protected]>

* Update REST and Transport interface

Signed-off-by: Peng Huo <[email protected]>

* Revert on transport layer

Signed-off-by: Peng Huo <[email protected]>

* format code

Signed-off-by: Peng Huo <[email protected]>

* add API doc

Signed-off-by: Peng Huo <[email protected]>

* modify api

Signed-off-by: Peng Huo <[email protected]>

* address comments

Signed-off-by: Peng Huo <[email protected]>

* update doc

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
(cherry picked from commit 8f5e01d)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 18, 2023
1 parent b3c2e94 commit 292a5f3
Show file tree
Hide file tree
Showing 26 changed files with 625 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.common.setting;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.List;
Expand Down Expand Up @@ -36,7 +38,8 @@ public enum Key {
METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled");

@Getter private final String keyValue;

Expand All @@ -60,4 +63,9 @@ public static Optional<Key> of(String keyValue) {
public abstract <T> T getSettingValue(Key key);

public abstract List<?> getSettings();

/** Helper class */
public static boolean isSparkExecutionSessionEnabled(Settings settings) {
return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED);
}
}
36 changes: 36 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,39 @@ SQL query::
"status": 400
}

plugins.query.executionengine.spark.session.enabled
===================================================

Description
-----------

By default, execution engine is executed in job mode. You can enable session mode by this setting.

1. The default value is false.
2. This setting is node scope.
3. This setting can be updated dynamically.

You can update the setting with a new value like this.

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.enabled":"true"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"session": {
"enabled": "true"
}
}
}
}
}
}
}

44 changes: 44 additions & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,50 @@ Sample Response::
"queryId": "00fd796ut1a7eg0q"
}

Execute query in session
------------------------

if plugins.query.executionengine.spark.session.enabled is set to true, session based execution is enabled. Under the hood, all queries submitted to the same session will be executed in the same SparkContext. Session is auto closed if not query submission in 10 minutes.

Async query response include ``sessionId`` indicate the query is executed in session.

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Sample Response::

{
"queryId": "HlbM61kX6MDkAktO",
"sessionId": "1Giy65ZnzNlmsPAm"
}

User could reuse the session by using ``sessionId`` query parameters.

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10",
"sessionId" : "1Giy65ZnzNlmsPAm"
}'

Sample Response::

{
"queryId": "7GC4mHhftiTejvxN",
"sessionId": "1Giy65ZnzNlmsPAm"
}


Async Query Result API
======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_SESSION_ENABLED_SETTING =
Setting.boolSetting(
Key.SPARK_EXECUTION_SESSION_ENABLED.getKeyValue(),
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -205,6 +212,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_SESSION_ENABLED,
SPARK_EXECUTION_SESSION_ENABLED_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -270,6 +283,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;
import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
Expand Down Expand Up @@ -99,6 +100,8 @@
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
Expand Down Expand Up @@ -318,7 +321,11 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client);
client,
new SessionManager(
new StateStore(SPARK_REQUEST_BUFFER_INDEX_NAME, client),
emrServerlessClient,
pluginSettings));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,17 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
sparkExecutionEngineConfig.getClusterName(),
sparkExecutionEngineConfig.getSparkSubmitParameters()));
sparkExecutionEngineConfig.getSparkSubmitParameters(),
createAsyncQueryRequest.getSessionId()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.isDropIndexQuery(),
dispatchQueryResponse.getResultIndex()));
return new CreateAsyncQueryResponse(dispatchQueryResponse.getJobId());
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getJobId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand All @@ -81,6 +84,7 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
Optional<AsyncQueryJobMetadata> jobMetadata =
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
if (jobMetadata.isPresent()) {
String sessionId = jobMetadata.get().getSessionId();
JSONObject jsonObject = sparkQueryDispatcher.getQueryResponse(jobMetadata.get());
if (JobRunState.SUCCESS.toString().equals(jsonObject.getString(STATUS_FIELD))) {
DefaultSparkSqlFunctionResponseHandle sparkSqlFunctionResponseHandle =
Expand All @@ -90,13 +94,18 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
result.add(sparkSqlFunctionResponseHandle.next());
}
return new AsyncQueryExecutionResponse(
JobRunState.SUCCESS.toString(), sparkSqlFunctionResponseHandle.schema(), result, null);
JobRunState.SUCCESS.toString(),
sparkSqlFunctionResponseHandle.schema(),
result,
null,
sessionId);
} else {
return new AsyncQueryExecutionResponse(
jsonObject.optString(STATUS_FIELD, JobRunState.FAILED.toString()),
null,
null,
jsonObject.optString(ERROR_FIELD, ""));
jsonObject.optString(ERROR_FIELD, ""),
sessionId);
}
}
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ public class AsyncQueryExecutionResponse {
private final ExecutionEngine.Schema schema;
private final List<ExprValue> results;
private final String error;
private final String sessionId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ public class AsyncQueryJobMetadata {
private String jobId;
private boolean isDropIndexQuery;
private String resultIndex;
// optional sessionId.
private String sessionId;

public AsyncQueryJobMetadata(String applicationId, String jobId, String resultIndex) {
this.applicationId = applicationId;
this.jobId = jobId;
this.isDropIndexQuery = false;
this.resultIndex = resultIndex;
this.sessionId = null;
}

@Override
Expand All @@ -57,6 +60,7 @@ public static XContentBuilder convertToXContent(AsyncQueryJobMetadata metadata)
builder.field("applicationId", metadata.getApplicationId());
builder.field("isDropIndexQuery", metadata.isDropIndexQuery());
builder.field("resultIndex", metadata.getResultIndex());
builder.field("sessionId", metadata.getSessionId());
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -92,6 +96,7 @@ public static AsyncQueryJobMetadata toJobMetadata(XContentParser parser) throws
String applicationId = null;
boolean isDropIndexQuery = false;
String resultIndex = null;
String sessionId = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
Expand All @@ -109,13 +114,17 @@ public static AsyncQueryJobMetadata toJobMetadata(XContentParser parser) throws
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;

Check warning on line 119 in spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java#L118-L119

Added lines #L118 - L119 were not covered by tests
default:
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (jobId == null || applicationId == null) {
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(applicationId, jobId, isDropIndexQuery, resultIndex);
return new AsyncQueryJobMetadata(
applicationId, jobId, isDropIndexQuery, resultIndex, sessionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SparkConstants {
public static final String SPARK_SQL_APPLICATION_JAR =
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar";
public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result";
public static final String SPARK_REQUEST_BUFFER_INDEX_NAME = ".query_execution_request";
// TODO should be replaced with mvn jar.
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
Expand Down
Loading

0 comments on commit 292a5f3

Please sign in to comment.