Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Sep 9, 2023
1 parent a7af359 commit d094cb7
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 3 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
3 changes: 3 additions & 0 deletions plugin/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ grant {

// ml-commons client
permission java.lang.RuntimePermission "setContextClassLoader";

// aws credentials
permission java.io.FilePermission "${user.home}${/}.aws${/}*", "read";
};
3 changes: 2 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ dependencies {

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.sql.spark.client;

public interface EmrServerlessClient {

String startJobRun(String query);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobDriver;
import com.amazonaws.services.emrserverless.model.SparkSubmit;
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import java.io.IOException;
import java.util.Set;
import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

public class EmrServerlessClientImpl implements EmrServerlessClient {

private final AWSEMRServerless emrServerless;
private final String applicationId;
private final String executionRoleArn;
private final FlintHelper flint;
private final String sparkApplicationJar;
private SparkResponse sparkResponse;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);
private static final Set<String> terminalStates = Set.of("CANCELLED", "FAILED", "SUCCESS");

public EmrServerlessClientImpl(
AWSEMRServerless emrServerless,
String applicationId,
String executionRoleArn,
FlintHelper flint,
String sparkApplicationJar,
SparkResponse sparkResponse) {
this.emrServerless = emrServerless;
this.applicationId = applicationId;
this.executionRoleArn = executionRoleArn;
this.flint = flint;
this.sparkApplicationJar =
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar;
this.sparkResponse = sparkResponse;
}

public String startJobRun(String jobName, String query) {
StartJobRunRequest request =
new StartJobRunRequest()
.withName(jobName)
.withApplicationId(applicationId)
.withExecutionRoleArn(executionRoleArn)
.withJobDriver(
new JobDriver()
.withSparkSubmit(
new SparkSubmit()
.withEntryPoint(sparkApplicationJar)
.withEntryPointArguments(
query,
SPARK_INDEX_NAME,
flint.getFlintHost(),
flint.getFlintPort(),
flint.getFlintScheme(),
flint.getFlintAuth(),
flint.getFlintRegion())
.withSparkSubmitParameters(
"--class org.opensearch.sql.SQLJob"
+ " --conf spark.driver.cores=1"
+ " --conf spark.driver.memory=1g"
+ " --conf spark.executor.cores=2"
+ " --conf spark.executor.memory=4g"
+ " --conf spark.jars="
+ flint.getFlintIntegrationJar()
+ " --conf spark.datasource.flint.host="
+ flint.getFlintHost()
+ " --conf spark.datasource.flint.port="
+ flint.getFlintPort()
+ " --conf spark.datasource.flint.scheme="
+ flint.getFlintScheme()
+ " --conf spark.datasource.flint.auth="
+ flint.getFlintAuth()
+ " --conf spark.datasource.flint.region="
+ flint.getFlintRegion()
+ " --conf"
+ " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf"
+ " spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf"
+ " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")));
StartJobRunResult response = emrServerless.startJobRun(request);
logger.info("Job Run ID: " + response.getJobRunId());
waitForJobToComplete(response.getJobRunId());
return response.getJobRunId();
}

@SneakyThrows
public void waitForJobToComplete(String jobRunId) {
logger.info(String.format("Waiting for job %s/%s", applicationId, jobRunId));
int retries = 0;
while (retries < 100) {
if (!terminalStates.contains(getJobRunState(jobRunId))) {
Thread.sleep(10000);
} else {
break;
}
retries++;
}

if (!terminalStates.contains(getJobRunState(jobRunId))) {
throw new RuntimeException("Job was not finished after 100 retries" + jobRunId);
}
}

public String getJobRunState(String jobRunId) {
GetJobRunRequest request =
new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId);
GetJobRunResult response = emrServerless.getJobRun(request);
logger.info("Job Run state: " + response.getJobRun().getState());
return response.getJobRun().getState();
}

public void cancelJobRun(String jobRunId) {
// Cancel the job run
emrServerless.cancelJobRun(
new CancelJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId));
}

@Override
public String startJobRun(String query) {
String jobId = startJobRun("temp", query);
sparkResponse.setValue(jobId);
getJobRunState(jobId);
return jobId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

public class SparkConstants {
public static final String EMR = "emr";
public static final String EMRS = "emr-serverless";
public static final String STEP_ID_FIELD = "stepId.keyword";
public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar";
public static final String SPARK_INDEX_NAME = ".query_execution_result";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.spark.client.EmrServerlessClient;

public class SparkQueryDispatcher {

private DataSourceService dataSourceService;
private EmrServerlessClient emrServerlessClient;

public void dispatch(String query) {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.jobs;

public interface JobManagementService {

String createJob(String query);

String getJob(String jobId);

String cancelJob(String jobIds);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.jobs;

public class JobManagementServiceImpl implements JobManagementService{

private JobMetadataStorageService jobMetadataStorageService;
@Override
public String createJob(String query) {
return null;
}

@Override
public String getJob(String jobId) {
return null;
}

@Override
public String cancelJob(String jobIds) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.sql.spark.jobs;

public interface JobMetadataStorageService {

void createJobMetadata();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_JOB_NAME;
import static org.opensearch.sql.spark.constants.TestConstants.QUERY;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

@ExtendWith(MockitoExtension.class)
public class EmrServerlessClientImplTest {
@Mock private AWSEMRServerless emrServerless;
@Mock private FlintHelper flint;
@Mock private SparkResponse sparkResponse;

@Test
void testStartJobRun() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImpl emrServerlessClient =
new EmrServerlessClientImpl(
emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, null, sparkResponse);

emrServerlessClient.startJobRun(EMRS_JOB_NAME, QUERY);
}

@Test
void testGetJobRunState() {
JobRun jobRun = new JobRun();
jobRun.setState("Running");
GetJobRunResult response = new GetJobRunResult();
response.setJobRun(jobRun);
when(emrServerless.getJobRun(any())).thenReturn(response);

EmrServerlessClientImpl emrServerlessClient =
new EmrServerlessClientImpl(
emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, "", sparkResponse);

emrServerlessClient.getJobRunState("123");
}

@Test
void testCancelJobRun() {
when(emrServerless.cancelJobRun(any())).thenReturn(new CancelJobRunResult());

EmrServerlessClientImpl emrServerlessClient =
new EmrServerlessClientImpl(
emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, null, sparkResponse);

emrServerlessClient.cancelJobRun("123");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@
public class TestConstants {
public static final String QUERY = "select 1";
public static final String EMR_CLUSTER_ID = "j-123456789";
public static final String EMRS_APPLICATION_ID = "xxxxx";
public static final String EMRS_EXECUTION_ROLE = "execution_role";
public static final String EMRS_JOB_NAME = "job_name";
}

0 comments on commit d094cb7

Please sign in to comment.