Skip to content

Commit

Permalink
Enable PPL lang and add datsource to async query API (#2187) (#2195)
Browse files Browse the repository at this point in the history
(cherry picked from commit ea2ed26)

Signed-off-by: Vamsi Manohar <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 8c5ecba commit 3f51079
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 170 deletions.
1 change: 1 addition & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Sample Request::
curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'
Expand Down
5 changes: 2 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ tasks.register('downloadG4Files', Exec) {

executable 'curl'

// Need to add these back once the grammar issues with indexName and tableName is addressed in flint integration jar.
// args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
// args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/SqlBaseParser.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4'
args '-o', 'src/main/antlr/SqlBaseLexer.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4'
}
Expand Down
6 changes: 4 additions & 2 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ skippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName
: CREATE SKIPPING INDEX (IF NOT EXISTS)?
ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand All @@ -53,7 +54,8 @@ coveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName ON tableName
: CREATE INDEX (IF NOT EXISTS)? indexName
ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand Down
11 changes: 10 additions & 1 deletion spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,16 @@ CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';

Expand All @@ -174,6 +176,13 @@ EQ : '=' | '==';
MINUS: '-';


STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
| 'R\'' (~'\'')* '\''
| 'R"'(~'"')* '"'
;

INTEGER_VALUE
: DIGIT+
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public CreateAsyncQueryResponse createAsyncQuery(
new DispatchQueryRequest(
sparkExecutionEngineConfig.getApplicationId(),
createAsyncQueryRequest.getQuery(),
createAsyncQueryRequest.getDatasource(),
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
clusterName.value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SparkQueryDispatcher {
public static final String TABLE_TAG_KEY = "table";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";

private EMRServerlessClient EMRServerlessClient;
private EMRServerlessClient emrServerlessClient;

private DataSourceService dataSourceService;

Expand All @@ -57,12 +57,12 @@ public class SparkQueryDispatcher {
private JobExecutionResponseReader jobExecutionResponseReader;

public String dispatch(DispatchQueryRequest dispatchQueryRequest) {
return EMRServerlessClient.startJobRun(getStartJobRequest(dispatchQueryRequest));
return emrServerlessClient.startJobRun(getStartJobRequest(dispatchQueryRequest));
}

// TODO : Fetch from Result Index and then make call to EMR Serverless.
public JSONObject getQueryResponse(String applicationId, String queryId) {
GetJobRunResult getJobRunResult = EMRServerlessClient.getJobRunResult(applicationId, queryId);
GetJobRunResult getJobRunResult = emrServerlessClient.getJobRunResult(applicationId, queryId);
JSONObject result = new JSONObject();
if (getJobRunResult.getJobRun().getState().equals(JobRunState.SUCCESS.toString())) {
result = jobExecutionResponseReader.getResultFromOpensearchIndex(queryId);
Expand All @@ -72,20 +72,23 @@ public JSONObject getQueryResponse(String applicationId, String queryId) {
}

public String cancelJob(String applicationId, String jobId) {
CancelJobRunResult cancelJobRunResult = EMRServerlessClient.cancelJobRun(applicationId, jobId);
CancelJobRunResult cancelJobRunResult = emrServerlessClient.cancelJobRun(applicationId, jobId);
return cancelJobRunResult.getJobRunId();
}

// we currently don't support index queries in PPL language.
// so we are treating all of them as non-index queries which don't require any kind of query
// parsing.
private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryRequest) {
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) {
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery()))
return getStartJobRequestForIndexRequest(dispatchQueryRequest);
else {
return getStartJobRequestForNonIndexQueries(dispatchQueryRequest);
}
} else {
return getStartJobRequestForNonIndexQueries(dispatchQueryRequest);
}
throw new UnsupportedOperationException(
String.format("UnSupported Lang type:: %s", dispatchQueryRequest.getLangType()));
}

private String getDataSourceRoleARN(DataSourceMetadata dataSourceMetadata) {
Expand Down Expand Up @@ -133,27 +136,17 @@ private String constructSparkParameters(String datasourceName) {
private StartJobRequest getStartJobRequestForNonIndexQueries(
DispatchQueryRequest dispatchQueryRequest) {
StartJobRequest startJobRequest;
FullyQualifiedTableName fullyQualifiedTableName =
SQLQueryUtils.extractFullyQualifiedTableName(dispatchQueryRequest.getQuery());
if (fullyQualifiedTableName.getDatasourceName() == null) {
throw new UnsupportedOperationException("Missing datasource in the query syntax.");
}
dataSourceUserAuthorizationHelper.authorizeDataSource(
this.dataSourceService.getRawDataSourceMetadata(
fullyQualifiedTableName.getDatasourceName()));
String jobName =
dispatchQueryRequest.getClusterName()
+ ":"
+ fullyQualifiedTableName.getFullyQualifiedName();
Map<String, String> tags =
getDefaultTagsForJobSubmission(dispatchQueryRequest, fullyQualifiedTableName);
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()));
String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
constructSparkParameters(fullyQualifiedTableName.getDatasourceName()),
constructSparkParameters(dispatchQueryRequest.getDatasource()),
tags);
return startJobRequest;
}
Expand All @@ -163,46 +156,29 @@ private StartJobRequest getStartJobRequestForIndexRequest(
StartJobRequest startJobRequest;
IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (fullyQualifiedTableName.getDatasourceName() == null) {
throw new UnsupportedOperationException("Queries without a datasource are not supported");
}
dataSourceUserAuthorizationHelper.authorizeDataSource(
this.dataSourceService.getRawDataSourceMetadata(
fullyQualifiedTableName.getDatasourceName()));
String jobName =
getJobNameForIndexQuery(dispatchQueryRequest, indexDetails, fullyQualifiedTableName);
Map<String, String> tags =
getDefaultTagsForJobSubmission(dispatchQueryRequest, fullyQualifiedTableName);
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()));
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.getIndexName());
tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
constructSparkParameters(fullyQualifiedTableName.getDatasourceName()),
constructSparkParameters(dispatchQueryRequest.getDatasource()),
tags);
return startJobRequest;
}

private static Map<String, String> getDefaultTagsForJobSubmission(
DispatchQueryRequest dispatchQueryRequest, FullyQualifiedTableName fullyQualifiedTableName) {
DispatchQueryRequest dispatchQueryRequest) {
Map<String, String> tags = new HashMap<>();
tags.put(CLUSTER_NAME_TAG_KEY, dispatchQueryRequest.getClusterName());
tags.put(DATASOURCE_TAG_KEY, fullyQualifiedTableName.getDatasourceName());
tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource());
return tags;
}

private static String getJobNameForIndexQuery(
DispatchQueryRequest dispatchQueryRequest,
IndexDetails indexDetails,
FullyQualifiedTableName fullyQualifiedTableName) {
return dispatchQueryRequest.getClusterName()
+ ":"
+ fullyQualifiedTableName.getFullyQualifiedName()
+ "."
+ indexDetails.getIndexName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class DispatchQueryRequest {
private final String applicationId;
private final String query;
private final String datasource;
private final LangType langType;
private final String executionRoleARN;
private final String clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,43 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.Validate;
import org.opensearch.core.xcontent.XContentParser;

@Data
@AllArgsConstructor
public class CreateAsyncQueryRequest {

private String query;
private String datasource;
private LangType lang;

public CreateAsyncQueryRequest(String query, String datasource, LangType lang) {
this.query = Validate.notNull(query, "Query can't be null");
this.datasource = Validate.notNull(datasource, "Datasource can't be null");
this.lang = Validate.notNull(lang, "lang can't be null");
}

public static CreateAsyncQueryRequest fromXContentParser(XContentParser parser)
throws IOException {
String query = null;
LangType lang = null;
String datasource = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else if (fieldName.equals("lang")) {
lang = LangType.fromString(parser.textOrNull());
String langString = parser.textOrNull();
lang = LangType.fromString(langString);
} else if (fieldName.equals("datasource")) {
datasource = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (lang == null || query == null) {
throw new IllegalArgumentException("lang and query are required fields.");
}
return new CreateAsyncQueryRequest(query, lang);
return new CreateAsyncQueryRequest(query, datasource, lang);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void testCreateAsyncQuery() {
new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, settings);
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest("select * from my_glue.default.http_logs", LangType.SQL);
new CreateAsyncQueryRequest(
"select * from my_glue.default.http_logs", "my_glue", LangType.SQL);
when(settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG))
.thenReturn(
"{\"applicationId\":\"00fd775baqpu4g0p\",\"executionRoleARN\":\"arn:aws:iam::270824043731:role/emr-job-execution-role\",\"region\":\"eu-west-1\"}");
Expand All @@ -58,6 +59,7 @@ void testCreateAsyncQuery() {
new DispatchQueryRequest(
"00fd775baqpu4g0p",
"select * from my_glue.default.http_logs",
"my_glue",
LangType.SQL,
"arn:aws:iam::270824043731:role/emr-job-execution-role",
TEST_CLUSTER_NAME)))
Expand All @@ -73,6 +75,7 @@ void testCreateAsyncQuery() {
new DispatchQueryRequest(
"00fd775baqpu4g0p",
"select * from my_glue.default.http_logs",
"my_glue",
LangType.SQL,
"arn:aws:iam::270824043731:role/emr-job-execution-role",
TEST_CLUSTER_NAME));
Expand Down
Loading

0 comments on commit 3f51079

Please sign in to comment.