-
Notifications
You must be signed in to change notification settings - Fork 141
Commit
Signed-off-by: Vamsi Manohar <[email protected]>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,244 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.rest; | ||
|
||
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; | ||
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE; | ||
import static org.opensearch.rest.RestRequest.Method.DELETE; | ||
import static org.opensearch.rest.RestRequest.Method.GET; | ||
import static org.opensearch.rest.RestRequest.Method.POST; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.OpenSearchException; | ||
import org.opensearch.client.node.NodeClient; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.rest.RestStatus; | ||
import org.opensearch.rest.BaseRestHandler; | ||
import org.opensearch.rest.BytesRestResponse; | ||
import org.opensearch.rest.RestChannel; | ||
import org.opensearch.rest.RestRequest; | ||
import org.opensearch.sql.datasources.exceptions.ErrorMessage; | ||
import org.opensearch.sql.datasources.utils.Scheduler; | ||
import org.opensearch.sql.spark.rest.model.CreateJobRequest; | ||
import org.opensearch.sql.spark.transport.TransportDeleteJobRequest; | ||
import org.opensearch.sql.spark.transport.TransportGetQueryResultRequest; | ||
import org.opensearch.sql.spark.transport.TransportCreateJobRequest; | ||
import org.opensearch.sql.spark.transport.model.DeleteJobActionRequest; | ||
import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse; | ||
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest; | ||
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse; | ||
import org.opensearch.sql.spark.transport.model.CreateJobActionRequest; | ||
import org.opensearch.sql.spark.transport.model.CreateJobActionResponse; | ||
|
||
public class RestJobManagementAction extends BaseRestHandler { | ||
|
||
public static final String JOB_ACTIONS = "job_actions"; | ||
public static final String BASE_JOB_ACTION_URL = "/_plugins/_query/_jobs"; | ||
|
||
private static final Logger LOG = LogManager.getLogger(RestJobManagementAction.class); | ||
|
||
@Override | ||
public String getName() { | ||
return JOB_ACTIONS; | ||
} | ||
|
||
@Override | ||
public List<Route> routes() { | ||
return ImmutableList.of( | ||
|
||
/* | ||
* | ||
* Create a new job with spark execution engine. | ||
* Request URL: POST | ||
* Request body: | ||
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest] | ||
* Response body: | ||
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse] | ||
*/ | ||
new Route(POST, BASE_JOB_ACTION_URL), | ||
|
||
/* | ||
* | ||
* GET jobs with in spark execution engine. | ||
* Request URL: GET | ||
* Request body: | ||
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest] | ||
* Response body: | ||
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse] | ||
*/ | ||
new Route(GET, String.format(Locale.ROOT, "%s/{%s}", BASE_JOB_ACTION_URL, "jobId")), | ||
new Route(GET, BASE_JOB_ACTION_URL), | ||
|
||
/* | ||
* | ||
* Cancel a job within spark execution engine. | ||
* Request URL: DELETE | ||
* Request body: | ||
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest] | ||
* Response body: | ||
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse] | ||
*/ | ||
new Route(DELETE, String.format(Locale.ROOT, "%s/{%s}", BASE_JOB_ACTION_URL, "jobId")), | ||
|
||
/* | ||
* GET query result from job {{jobId}} execution. | ||
* Request URL: GET | ||
* Request body: | ||
* Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest] | ||
* Response body: | ||
* Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse] | ||
*/ | ||
new Route( | ||
GET, | ||
String.format(Locale.ROOT, "%s/{%s}/result", BASE_JOB_ACTION_URL, "jobId"))); | ||
} | ||
|
||
@Override | ||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) | ||
throws IOException { | ||
switch (restRequest.method()) { | ||
case POST: | ||
return executePostRequest(restRequest, nodeClient); | ||
case GET: | ||
return executeGetRequest(restRequest, nodeClient); | ||
case DELETE: | ||
return executeDeleteRequest(restRequest, nodeClient); | ||
default: | ||
return restChannel -> | ||
restChannel.sendResponse( | ||
Check warning on line 116 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L115-L116
|
||
new BytesRestResponse( | ||
RestStatus.METHOD_NOT_ALLOWED, String.valueOf(restRequest.method()))); | ||
} | ||
} | ||
|
||
private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient) | ||
throws IOException { | ||
|
||
CreateJobRequest submitJobRequest | ||
= CreateJobRequest.fromXContentParser(restRequest.contentParser()); | ||
return restChannel -> | ||
Scheduler.schedule( | ||
Check warning on line 128 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L125-L128
|
||
nodeClient, | ||
() -> | ||
nodeClient.execute( | ||
TransportCreateJobRequest.ACTION_TYPE, | ||
new CreateJobActionRequest(submitJobRequest), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse( | ||
CreateJobActionResponse createJobActionResponse) { | ||
restChannel.sendResponse( | ||
new BytesRestResponse( | ||
RestStatus.CREATED, | ||
"application/json; charset=UTF-8", | ||
submitJobRequest.getQuery())); | ||
} | ||
Check warning on line 143 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L142-L143
|
||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
handleException(e, restChannel); | ||
} | ||
Check warning on line 148 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L147-L148
|
||
})); | ||
} | ||
|
||
private RestChannelConsumer executeGetRequest(RestRequest restRequest, NodeClient nodeClient) { | ||
String jobId = restRequest.param("jobId"); | ||
return restChannel -> | ||
Scheduler.schedule( | ||
Check warning on line 155 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L153-L155
|
||
nodeClient, | ||
() -> | ||
nodeClient.execute( | ||
TransportGetQueryResultRequest.ACTION_TYPE, | ||
new GetJobQueryResultActionRequest(jobId), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse( | ||
GetJobQueryResultActionResponse getJobQueryResultActionResponse) { | ||
restChannel.sendResponse( | ||
new BytesRestResponse( | ||
RestStatus.OK, | ||
"application/json; charset=UTF-8", | ||
getJobQueryResultActionResponse.getResult())); | ||
} | ||
Check warning on line 170 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L169-L170
|
||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
handleException(e, restChannel); | ||
} | ||
Check warning on line 175 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L174-L175
|
||
})); | ||
} | ||
|
||
|
||
private void handleException(Exception e, RestChannel restChannel) { | ||
if (e instanceof OpenSearchException) { | ||
OpenSearchException exception = (OpenSearchException) e; | ||
reportError(restChannel, exception, exception.status()); | ||
} else { | ||
LOG.error("Error happened during request handling", e); | ||
Check warning on line 185 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L182-L185
|
||
if (isClientError(e)) { | ||
reportError(restChannel, e, BAD_REQUEST); | ||
} else { | ||
reportError(restChannel, e, SERVICE_UNAVAILABLE); | ||
} | ||
} | ||
} | ||
|
||
|
||
private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) { | ||
String jobId = restRequest.param("jobId"); | ||
return restChannel -> | ||
Scheduler.schedule( | ||
Check warning on line 198 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L196-L198
|
||
nodeClient, | ||
() -> | ||
nodeClient.execute( | ||
TransportDeleteJobRequest.ACTION_TYPE, | ||
new DeleteJobActionRequest(jobId), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse( | ||
DeleteJobActionResponse deleteJobActionResponse) { | ||
restChannel.sendResponse( | ||
new BytesRestResponse( | ||
RestStatus.OK, | ||
"application/json; charset=UTF-8", | ||
deleteJobActionResponse.getResult())); | ||
} | ||
Check warning on line 213 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L212-L213
|
||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
handleException(e, restChannel); | ||
} | ||
Check warning on line 218 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L217-L218
|
||
})); | ||
} | ||
|
||
private String handleGetAllJobs() { | ||
return "all_jobs"; | ||
} | ||
|
||
private String handleGetJob() { | ||
return "job"; | ||
} | ||
|
||
private String handleJobResult() { | ||
return "all_jobs"; | ||
} | ||
|
||
|
||
private void reportError(final RestChannel channel, final Exception e, final RestStatus status) { | ||
channel.sendResponse( | ||
new BytesRestResponse(status, new ErrorMessage(e, status.getStatus()).toString())); | ||
} | ||
Check warning on line 238 in spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java#L236-L238
|
||
|
||
private static boolean isClientError(Exception e) { | ||
return e instanceof IllegalArgumentException | ||
|| e instanceof IllegalStateException; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.rest.model; | ||
|
||
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; | ||
|
||
import java.io.IOException; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Data; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
|
||
@Data | ||
@AllArgsConstructor | ||
public class CreateJobRequest { | ||
|
||
private String query; | ||
|
||
public static CreateJobRequest fromXContentParser(XContentParser parser) throws IOException { | ||
String query = null; | ||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); | ||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
if (fieldName.equals("query")) { | ||
query = parser.textOrNull(); | ||
} else { | ||
throw new IllegalArgumentException("Unknown field: " + fieldName); | ||
} | ||
} | ||
return new CreateJobRequest(query); | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* | ||
* * Copyright OpenSearch Contributors | ||
* * SPDX-License-Identifier: Apache-2.0 | ||
* | ||
*/ | ||
|
||
package org.opensearch.sql.spark.transport; | ||
|
||
import org.opensearch.action.ActionType; | ||
import org.opensearch.action.support.ActionFilters; | ||
import org.opensearch.action.support.HandledTransportAction; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.sql.spark.transport.model.CreateJobActionRequest; | ||
import org.opensearch.sql.spark.transport.model.CreateJobActionResponse; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.transport.TransportService; | ||
|
||
public class TransportCreateJobRequest | ||
extends HandledTransportAction<CreateJobActionRequest, CreateJobActionResponse> { | ||
|
||
public static final String NAME = "cluster:admin/opensearch/ql/jobs/write"; | ||
public static final ActionType<CreateJobActionResponse> ACTION_TYPE = | ||
Check warning on line 23 in spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequest.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequest.java#L23
|
||
new ActionType<>(NAME, CreateJobActionResponse::new); | ||
|
||
|
||
|
||
protected TransportCreateJobRequest(TransportService transportService, | ||
ActionFilters actionFilters) { | ||
super(NAME, transportService, actionFilters, CreateJobActionRequest::new); | ||
} | ||
Check warning on line 31 in spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequest.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequest.java#L30-L31
|
||
|
||
@Override | ||
protected void doExecute(Task task, CreateJobActionRequest request, | ||
ActionListener<CreateJobActionResponse> listener) { | ||
try { | ||
String responseContent = "submitted_job"; | ||
listener.onResponse(new CreateJobActionResponse(responseContent)); | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} | ||
} | ||
Check warning on line 42 in spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequest.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequest.java#L37-L42
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* | ||
* * Copyright OpenSearch Contributors | ||
* * SPDX-License-Identifier: Apache-2.0 | ||
* | ||
*/ | ||
|
||
package org.opensearch.sql.spark.transport; | ||
|
||
import org.opensearch.action.ActionType; | ||
import org.opensearch.action.support.ActionFilters; | ||
import org.opensearch.action.support.HandledTransportAction; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.sql.spark.transport.model.DeleteJobActionRequest; | ||
import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.transport.TransportService; | ||
|
||
public class TransportDeleteJobRequest | ||
extends HandledTransportAction<DeleteJobActionRequest, DeleteJobActionResponse> { | ||
|
||
public static final String NAME = "cluster:admin/opensearch/ql/jobs/delete"; | ||
public static final ActionType<DeleteJobActionResponse> ACTION_TYPE = | ||
Check warning on line 23 in spark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequest.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequest.java#L23
|
||
new ActionType<>(NAME, DeleteJobActionResponse::new); | ||
|
||
|
||
protected TransportDeleteJobRequest(TransportService transportService, | ||
ActionFilters actionFilters) { | ||
super(NAME, transportService, actionFilters, DeleteJobActionRequest::new); | ||
} | ||
Check warning on line 30 in spark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequest.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequest.java#L29-L30
|
||
|
||
@Override | ||
protected void doExecute(Task task, DeleteJobActionRequest request, | ||
ActionListener<DeleteJobActionResponse> listener) { | ||
try { | ||
String responseContent = "deleted_job"; | ||
listener.onResponse(new DeleteJobActionResponse(responseContent)); | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} | ||
} | ||
Check warning on line 41 in spark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequest.java Codecov / codecov/patchspark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequest.java#L36-L41
|
||
} |