Skip to content

Commit

Permalink
Cancel Job API (#2126)
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 22, 2023
1 parent 71f4155 commit be82714
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public List<Route> routes() {
new Route(GET, BASE_DATASOURCE_ACTION_URL),

/*
* GET datasources
* Request URL: GET
* PUT datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.UpdateDataSourceActionRequest]
Expand All @@ -100,8 +99,7 @@ public List<Route> routes() {
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* GET datasources
* Request URL: GET
* DELETE datasources
* Request body: Ref
* [org.opensearch.sql.plugin.transport.datasource.model.DeleteDataSourceActionRequest]
* Response body: Ref
Expand Down
27 changes: 17 additions & 10 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,13 @@ We make use of default aws credentials chain to make calls to the emr serverless
have pass role permissions for emr-job-execution-role mentioned in the engine configuration.



Async Query Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/create``.

HTTP URI: _plugins/_query/_async_query
HTTP URI: _plugins/_async_query
HTTP VERB: POST



Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
Expand All @@ -57,23 +54,19 @@ Sample Response::
"queryId": "00fd796ut1a7eg0q"
}


Async Query Result API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/result``.
Async Query Creation and Result Query permissions are orthogonal, so any user with result api permissions and queryId can query the corresponding query results irrespective of the user who created the async query.


HTTP URI: _plugins/_query/_async_query/{queryId}
HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: GET


Sample Request BODY::

curl --location --request GET 'http://localhost:9200/_plugins/_async_query/00fd796ut1a7eg0q' \
--header 'Content-Type: application/json' \
--data '{
"query" : "select * from default.http_logs limit 1"
}'

Sample Response if the Query is in Progress ::

Expand Down Expand Up @@ -106,3 +99,17 @@ Sample Response If the Query is successful ::
"total": 1,
"size": 1
}


Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: DELETE

Sample Request Body ::

curl --location --request DELETE 'http://localhost:9200/_plugins/_async_query/00fdalrvgkbh2g0q' \
--header 'Content-Type: application/json' \

Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ public interface AsyncQueryExecutorService {
* @return {@link AsyncQueryExecutionResponse}
*/
AsyncQueryExecutionResponse getAsyncQueryResults(String queryId);

/**
* Cancels running async query and returns the cancelled queryId.
*
* @param queryId queryId.
* @return {@link String} cancelledQueryId.
*/
String cancelQuery(String queryId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
}

@Override
public String cancelQuery(String queryId) {
Optional<AsyncQueryJobMetadata> asyncQueryJobMetadata =
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
if (asyncQueryJobMetadata.isPresent()) {
return sparkQueryDispatcher.cancelJob(
asyncQueryJobMetadata.get().getApplicationId(), queryId);
}
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
}

private void validateSparkExecutionEngineSettings() {
if (!isSparkJobExecutionEnabled) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunRequest;
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobDriver;
import com.amazonaws.services.emrserverless.model.SparkSubmit;
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -65,4 +68,21 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
logger.info("Job Run state: " + getJobRunResult.getJobRun().getState());
return getJobRunResult;
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
try {
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> emrServerless.cancelJobRun(cancelJobRunRequest));
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
} catch (ValidationException e) {
throw new IllegalArgumentException(
String.format("Couldn't cancel the queryId: %s due to %s", jobId, e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.opensearch.sql.spark.client;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;

public interface SparkJobClient {
Expand All @@ -19,4 +20,6 @@ String startJobRun(
String sparkSubmitParams);

GetJobRunResult getJobRunResult(String applicationId, String jobId);

CancelJobRunResult cancelJobRun(String applicationId, String jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRunState;
import java.net.URI;
Expand Down Expand Up @@ -64,6 +65,11 @@ public JSONObject getQueryResponse(String applicationId, String queryId) {
return result;
}

public String cancelJob(String applicationId, String jobId) {
CancelJobRunResult cancelJobRunResult = sparkJobClient.cancelJobRun(applicationId, jobId);
return cancelJobRunResult.getJobRunId();
}

// TODO: Analyze given query
// Extract datasourceName
// Apply Authorizaiton.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void onResponse(
CancelAsyncQueryActionResponse cancelAsyncQueryActionResponse) {
restChannel.sendResponse(

Check warning on line 195 in spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java#L195

Added line #L195 was not covered by tests
new BytesRestResponse(
RestStatus.OK,
RestStatus.NO_CONTENT,
"application/json; charset=UTF-8",
cancelAsyncQueryActionResponse.getResult()));
}

Check warning on line 200 in spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java#L199-L200

Added lines #L199 - L200 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionRequest;
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse;
import org.opensearch.tasks.Task;
Expand All @@ -21,21 +22,31 @@ public class TransportCancelAsyncQueryRequestAction
extends HandledTransportAction<CancelAsyncQueryActionRequest, CancelAsyncQueryActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/async_query/delete";
private final AsyncQueryExecutorServiceImpl asyncQueryExecutorService;
public static final ActionType<CancelAsyncQueryActionResponse> ACTION_TYPE =
new ActionType<>(NAME, CancelAsyncQueryActionResponse::new);

@Inject
public TransportCancelAsyncQueryRequestAction(
TransportService transportService, ActionFilters actionFilters) {
TransportService transportService,
ActionFilters actionFilters,
AsyncQueryExecutorServiceImpl asyncQueryExecutorService) {
super(NAME, transportService, actionFilters, CancelAsyncQueryActionRequest::new);
this.asyncQueryExecutorService = asyncQueryExecutorService;
}

@Override
protected void doExecute(
Task task,
CancelAsyncQueryActionRequest request,
ActionListener<CancelAsyncQueryActionResponse> listener) {
String responseContent = "deleted_job";
listener.onResponse(new CancelAsyncQueryActionResponse(responseContent));
try {
String jobId = asyncQueryExecutorService.cancelQuery(request.getQueryId());
listener.onResponse(
new CancelAsyncQueryActionResponse(
String.format("Deleted async query with id: %s", jobId)));
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

@AllArgsConstructor
@Getter
public class CancelAsyncQueryActionRequest extends ActionRequest {

private String queryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,34 @@ void testGetAsyncQueryResultsWithDisabledExecutionEngine() {
+ " to enable Async Query APIs",
illegalArgumentException.getMessage());
}

@Test
void testCancelJobWithJobNotFound() {
AsyncQueryExecutorService asyncQueryExecutorService =
new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, settings);
when(asyncQueryJobMetadataStorageService.getJobMetadata(EMR_JOB_ID))
.thenReturn(Optional.empty());
AsyncQueryNotFoundException asyncQueryNotFoundException =
Assertions.assertThrows(
AsyncQueryNotFoundException.class,
() -> asyncQueryExecutorService.cancelQuery(EMR_JOB_ID));
Assertions.assertEquals(
"QueryId: " + EMR_JOB_ID + " not found", asyncQueryNotFoundException.getMessage());
verifyNoInteractions(sparkQueryDispatcher);
verifyNoInteractions(settings);
}

@Test
void testCancelJob() {
AsyncQueryExecutorService asyncQueryExecutorService =
new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, settings);
when(asyncQueryJobMetadataStorageService.getJobMetadata(EMR_JOB_ID))
.thenReturn(Optional.of(new AsyncQueryJobMetadata(EMR_JOB_ID, EMRS_APPLICATION_ID)));
when(sparkQueryDispatcher.cancelJob(EMRS_APPLICATION_ID, EMR_JOB_ID)).thenReturn(EMR_JOB_ID);
String jobId = asyncQueryExecutorService.cancelQuery(EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, jobId);
verifyNoInteractions(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
package org.opensearch.sql.spark.client;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_JOB_NAME;
import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID;
import static org.opensearch.sql.spark.constants.TestConstants.QUERY;
import static org.opensearch.sql.spark.constants.TestConstants.SPARK_SUBMIT_PARAMETERS;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import com.amazonaws.services.emrserverless.model.ValidationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -45,4 +50,28 @@ void testGetJobRunState() {
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123");
}

@Test
void testCancelJobRun() {
when(emrServerless.cancelJobRun(any()))
.thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID));
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
CancelJobRunResult cancelJobRunResult =
emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId());
}

@Test
void testCancelJobRunWithValidationException() {
doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID));
Assertions.assertEquals(
"Couldn't cancel the queryId: job-123xxx due to Error (Service: null; Status Code: 0; Error"
+ " Code: null; Request ID: null; Proxy: null)",
illegalArgumentException.getMessage());
}
}
Loading

0 comments on commit be82714

Please sign in to comment.