Skip to content

Commit

Permalink
AJ-1011: higher-level methods on JobDao, like running() (#375)
Browse files Browse the repository at this point in the history
higher-level methods on JobDao, like running()
  • Loading branch information
davidangb authored Oct 18, 2023
1 parent bb412b3 commit a607971
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,17 @@ public interface JobDao {

GenericJobServerModel updateStatus(UUID jobId, GenericJobServerModel.StatusEnum status);

GenericJobServerModel queued(UUID jobId);

GenericJobServerModel running(UUID jobId);

GenericJobServerModel succeeded(UUID jobId);

GenericJobServerModel fail(UUID jobId, String errorMessage);

GenericJobServerModel fail(UUID jobId, String errorMessage, StackTraceElement[] stackTrace);
GenericJobServerModel fail(UUID jobId, String errorMessage, Exception e);

GenericJobServerModel fail(UUID jobId, Exception e);

GenericJobServerModel getJob(UUID jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,39 @@ public GenericJobServerModel createJob(Job<JobInput, JobResult> job) {
return getJob(job.getJobId());
}

/**
* Mark a job as QUEUED.
*
* @param jobId the job to update
* @return the updated job
*/
@Override
public GenericJobServerModel queued(UUID jobId) {
return updateStatus(jobId, StatusEnum.QUEUED);
}

/**
* Mark a job as RUNNING.
*
* @param jobId the job to update
* @return the updated job
*/
@Override
public GenericJobServerModel running(UUID jobId) {
return updateStatus(jobId, StatusEnum.RUNNING);
}

/**
* Mark a job as SUCCEEDED.
*
* @param jobId the job to update
* @return the updated job
*/
@Override
public GenericJobServerModel succeeded(UUID jobId) {
return updateStatus(jobId, StatusEnum.SUCCEEDED);
}

/**
* update this import job with a new status. note that the table's trigger will automatically
* update the `updated` column's value. Do not use this method to mark a job as failed; use one of
Expand Down Expand Up @@ -95,17 +128,28 @@ public GenericJobServerModel fail(UUID jobId, String errorMessage) {
}

/**
* Mark a job as failed, specifying a short human-readable error message and a stack trace.
* Mark a job as failed, specifying the Exception that caused the failure
*
* @param jobId id of the job to update
* @param e the exception that caused this job to fail
* @return the updated job
*/
@Override
public GenericJobServerModel fail(UUID jobId, Exception e) {
return fail(jobId, e.getMessage(), e);
}

/**
* Mark a job as failed, specifying a short human-readable error message and the Exception that
* caused the failure
*
* @param jobId id of the job to update
* @param errorMessage a short error message, if the job is in error
* @param stackTrace a full stack trace for debugging, if the job is in error
* @return the updated job
*/
@Override
public GenericJobServerModel fail(
UUID jobId, String errorMessage, StackTraceElement[] stackTrace) {
return update(jobId, StatusEnum.ERROR, errorMessage, stackTrace);
public GenericJobServerModel fail(UUID jobId, String errorMessage, Exception e) {
return update(jobId, StatusEnum.ERROR, errorMessage, e.getStackTrace());
}

private GenericJobServerModel update(
Expand Down Expand Up @@ -147,6 +191,12 @@ private GenericJobServerModel update(
return getJob(jobId);
}

/**
* Retrieve a job.
*
* @param jobId the job to retrieve
* @return the retrieved job
*/
@Override
public GenericJobServerModel getJob(UUID jobId) {
return namedTemplate.queryForObject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.net.URL;
import java.util.UUID;
import org.databiosphere.workspacedataservice.dao.JobDao;
import org.databiosphere.workspacedataservice.generated.GenericJobServerModel;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
Expand Down Expand Up @@ -39,7 +38,7 @@ public void execute(JobExecutionContext context) throws org.quartz.JobExecutionE
UUID jobId = UUID.fromString(context.getJobDetail().getKey().getName());
try {
// mark this job as running
getJobDao().updateStatus(jobId, GenericJobServerModel.StatusEnum.RUNNING);
getJobDao().running(jobId);
// look for an auth token in the Quartz JobDataMap
String authToken = getJobDataString(context.getMergedJobDataMap(), ARG_TOKEN);
// and stash the auth token into job context
Expand All @@ -50,10 +49,10 @@ public void execute(JobExecutionContext context) throws org.quartz.JobExecutionE
// execute the specifics of this job
executeInternal(jobId, context);
// if we reached here, mark this job as successful
getJobDao().updateStatus(jobId, GenericJobServerModel.StatusEnum.SUCCEEDED);
getJobDao().succeeded(jobId);
} catch (Exception e) {
// on any otherwise-unhandled exception, mark the job as failed
getJobDao().fail(jobId, e.getMessage(), e.getStackTrace());
getJobDao().fail(jobId, e);
} finally {
JobContextHolder.destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public GenericJobServerModel createImport(
// schedule the job. after successfully scheduling, mark the job as queued
schedulerDao.schedule(schedulable);
logger.debug("Job {} scheduled", createdJob.getJobId());
jobDao.updateStatus(job.getJobId(), GenericJobServerModel.StatusEnum.QUEUED);
jobDao.queued(job.getJobId());

// return the queued job
return createdJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void update(StatusEnum status) {
"should properly update the job");
}

// update status with an error message
// fail the job with an error message
@Test
void failWithErrorMessage() {
JobType jobType = JobType.DATA_IMPORT;
Expand Down Expand Up @@ -154,15 +154,17 @@ void failWithErrorMessage() {
"should properly update the job with an error message");
}

// update status with an error message and stacktrace
// fail the job with an exception
@Test
void failWithErrorMessageAndStackTrace() throws JsonProcessingException {
void failWithException() throws JsonProcessingException {
JobType jobType = JobType.DATA_IMPORT;
String errorMessage = "my stack trace error message";
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
Exception e = new Exception(errorMessage);
e.setStackTrace(stackTrace);

GenericJobServerModel testJob = assertJobCreation(jobType);
jobDao.fail(testJob.getJobId(), errorMessage, stackTrace);
jobDao.fail(testJob.getJobId(), e);

// after updating the job, there should be exactly one row with:
// this jobId and type, the new status, the new error message, and the new stack trace
Expand All @@ -187,6 +189,69 @@ void failWithErrorMessageAndStackTrace() throws JsonProcessingException {
"should properly update the job with an error message and a stack trace");
}

// fail the job with a custom error message and an exception
@Test
void failWithExceptionAndMessage() throws JsonProcessingException {
JobType jobType = JobType.DATA_IMPORT;
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
Exception e = new Exception("The exception's error message");
e.setStackTrace(stackTrace);

String customErrorMessage = "my override of the error message";

GenericJobServerModel testJob = assertJobCreation(jobType);
jobDao.fail(testJob.getJobId(), customErrorMessage, e);

// after updating the job, there should be exactly one row with:
// this jobId and type, the new status, the new error message, and the new stack trace
// and updated timestamp greater than the created timestamp.
// input should still be {}, and result should still be null
var params = new MapSqlParameterSource("jobId", testJob.getJobId().toString());
params.addValue("type", jobType.name());
params.addValue("status", StatusEnum.ERROR.name());
params.addValue("error", customErrorMessage);
params.addValue("stacktrace", mapper.writeValueAsString(stackTrace));
assertDoesNotThrow(
() ->
namedTemplate.queryForObject(
"select id from sys_wds.job where id = :jobId and type = :type and status = :status "
+ "and error = :error "
+ "and stacktrace = :stacktrace::jsonb "
+ "and updated > created "
+ "and input = '{}'::jsonb "
+ "and result is null",
params,
String.class),
"should properly update the job with an error message and a stack trace");
}

@Test
void queue() {
JobType jobType = JobType.DATA_IMPORT;
GenericJobServerModel testJob = assertJobCreation(jobType);
jobDao.queued(testJob.getJobId());
GenericJobServerModel actualJob = jobDao.getJob(testJob.getJobId());
assertEquals(StatusEnum.QUEUED, actualJob.getStatus());
}

@Test
void running() {
JobType jobType = JobType.DATA_IMPORT;
GenericJobServerModel testJob = assertJobCreation(jobType);
jobDao.running(testJob.getJobId());
GenericJobServerModel actualJob = jobDao.getJob(testJob.getJobId());
assertEquals(StatusEnum.RUNNING, actualJob.getStatus());
}

@Test
void success() {
JobType jobType = JobType.DATA_IMPORT;
GenericJobServerModel testJob = assertJobCreation(jobType);
jobDao.succeeded(testJob.getJobId());
GenericJobServerModel actualJob = jobDao.getJob(testJob.getJobId());
assertEquals(StatusEnum.SUCCEEDED, actualJob.getStatus());
}

// TODO: AJ-1011 get job, does it deserialize correctly?
@Test
void getJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -45,9 +46,9 @@ void beforeAll() {
when(jobDao.createJob(any())).thenReturn(genericJobServerModel);
when(jobDao.getJob(any())).thenReturn(genericJobServerModel);
when(jobDao.updateStatus(any(), any())).thenReturn(genericJobServerModel);
when(jobDao.fail(any(), any()))
when(jobDao.fail(any(), any(Exception.class)))
.thenThrow(new RuntimeException("test failed via jobDao.fail()"));
when(jobDao.fail(any(), any(), any()))
when(jobDao.fail(any(), anyString()))
.thenThrow(new RuntimeException("test failed via jobDao.fail()"));
}

Expand Down

0 comments on commit a607971

Please sign in to comment.