Skip to content

Commit

Permalink
Add sessionId parameters for create async query API (#2312) (#2323)
Browse files Browse the repository at this point in the history
* add InteractiveSession and SessionManager

---------


(cherry picked from commit 8f5e01d)

Signed-off-by: Peng Huo <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 9383c7c commit fbf6143
Show file tree
Hide file tree
Showing 26 changed files with 625 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.common.setting;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.List;
Expand Down Expand Up @@ -36,7 +38,8 @@ public enum Key {
METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled");

@Getter private final String keyValue;

Expand All @@ -60,4 +63,9 @@ public static Optional<Key> of(String keyValue) {
public abstract <T> T getSettingValue(Key key);

public abstract List<?> getSettings();

/** Helper class */
public static boolean isSparkExecutionSessionEnabled(Settings settings) {
return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED);
}
}
36 changes: 36 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,39 @@ SQL query::
"status": 400
}

plugins.query.executionengine.spark.session.enabled
===================================================

Description
-----------

By default, execution engine is executed in job mode. You can enable session mode by this setting.

1. The default value is false.
2. This setting is node scope.
3. This setting can be updated dynamically.

You can update the setting with a new value like this.

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.enabled":"true"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"session": {
"enabled": "true"
}
}
}
}
}
}
}

44 changes: 44 additions & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,50 @@ Sample Response::
"queryId": "00fd796ut1a7eg0q"
}

Execute query in session
------------------------

if plugins.query.executionengine.spark.session.enabled is set to true, session based execution is enabled. Under the hood, all queries submitted to the same session will be executed in the same SparkContext. Session is auto closed if not query submission in 10 minutes.

Async query response include ``sessionId`` indicate the query is executed in session.

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Sample Response::

{
"queryId": "HlbM61kX6MDkAktO",
"sessionId": "1Giy65ZnzNlmsPAm"
}

User could reuse the session by using ``sessionId`` query parameters.

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10",
"sessionId" : "1Giy65ZnzNlmsPAm"
}'

Sample Response::

{
"queryId": "7GC4mHhftiTejvxN",
"sessionId": "1Giy65ZnzNlmsPAm"
}


Async Query Result API
======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_SESSION_ENABLED_SETTING =
Setting.boolSetting(
Key.SPARK_EXECUTION_SESSION_ENABLED.getKeyValue(),
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -205,6 +212,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_SESSION_ENABLED,
SPARK_EXECUTION_SESSION_ENABLED_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -270,6 +283,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;
import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
Expand Down Expand Up @@ -99,6 +100,8 @@
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
Expand Down Expand Up @@ -318,7 +321,11 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client);
client,
new SessionManager(
new StateStore(SPARK_REQUEST_BUFFER_INDEX_NAME, client),
emrServerlessClient,
pluginSettings));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,17 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
sparkExecutionEngineConfig.getClusterName(),
sparkExecutionEngineConfig.getSparkSubmitParameters()));
sparkExecutionEngineConfig.getSparkSubmitParameters(),
createAsyncQueryRequest.getSessionId()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.isDropIndexQuery(),
dispatchQueryResponse.getResultIndex()));
return new CreateAsyncQueryResponse(dispatchQueryResponse.getJobId());
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getJobId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand All @@ -81,6 +84,7 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
Optional<AsyncQueryJobMetadata> jobMetadata =
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
if (jobMetadata.isPresent()) {
String sessionId = jobMetadata.get().getSessionId();
JSONObject jsonObject = sparkQueryDispatcher.getQueryResponse(jobMetadata.get());
if (JobRunState.SUCCESS.toString().equals(jsonObject.getString(STATUS_FIELD))) {
DefaultSparkSqlFunctionResponseHandle sparkSqlFunctionResponseHandle =
Expand All @@ -90,13 +94,18 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
result.add(sparkSqlFunctionResponseHandle.next());
}
return new AsyncQueryExecutionResponse(
JobRunState.SUCCESS.toString(), sparkSqlFunctionResponseHandle.schema(), result, null);
JobRunState.SUCCESS.toString(),
sparkSqlFunctionResponseHandle.schema(),
result,
null,
sessionId);
} else {
return new AsyncQueryExecutionResponse(
jsonObject.optString(STATUS_FIELD, JobRunState.FAILED.toString()),
null,
null,
jsonObject.optString(ERROR_FIELD, ""));
jsonObject.optString(ERROR_FIELD, ""),
sessionId);
}
}
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ public class AsyncQueryExecutionResponse {
private final ExecutionEngine.Schema schema;
private final List<ExprValue> results;
private final String error;
private final String sessionId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ public class AsyncQueryJobMetadata {
private String jobId;
private boolean isDropIndexQuery;
private String resultIndex;
// optional sessionId.
private String sessionId;

public AsyncQueryJobMetadata(String applicationId, String jobId, String resultIndex) {
this.applicationId = applicationId;
this.jobId = jobId;
this.isDropIndexQuery = false;
this.resultIndex = resultIndex;
this.sessionId = null;
}

@Override
Expand All @@ -57,6 +60,7 @@ public static XContentBuilder convertToXContent(AsyncQueryJobMetadata metadata)
builder.field("applicationId", metadata.getApplicationId());
builder.field("isDropIndexQuery", metadata.isDropIndexQuery());
builder.field("resultIndex", metadata.getResultIndex());
builder.field("sessionId", metadata.getSessionId());
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -92,6 +96,7 @@ public static AsyncQueryJobMetadata toJobMetadata(XContentParser parser) throws
String applicationId = null;
boolean isDropIndexQuery = false;
String resultIndex = null;
String sessionId = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
Expand All @@ -109,13 +114,17 @@ public static AsyncQueryJobMetadata toJobMetadata(XContentParser parser) throws
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;
default:
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (jobId == null || applicationId == null) {
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(applicationId, jobId, isDropIndexQuery, resultIndex);
return new AsyncQueryJobMetadata(
applicationId, jobId, isDropIndexQuery, resultIndex, sessionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SparkConstants {
public static final String SPARK_SQL_APPLICATION_JAR =
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar";
public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result";
public static final String SPARK_REQUEST_BUFFER_INDEX_NAME = ".query_execution_request";
// TODO should be replaced with mvn jar.
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
Expand Down
Loading

0 comments on commit fbf6143

Please sign in to comment.