Skip to content

Commit

Permalink
Status and termination
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Dec 8, 2023
1 parent 3ef1e8b commit 6480de8
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 4 deletions.
155 changes: 152 additions & 3 deletions R/crew_aws_batch_monitor.R
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ crew_class_aws_batch_monitor <- R6::R6Class(
message = paste(name, "must be NULL or a character of length 1.")
)
}
if (identical(memory_units, "gigabytes")) {
if (!is.null(memory) && identical(memory_units, "gigabytes")) {
memory <- memory * ((5L ^ 9L) / (2L ^ 11L))
}
args <- list()
Expand Down Expand Up @@ -279,7 +279,7 @@ crew_class_aws_batch_monitor <- R6::R6Class(
isTRUE(.) || isFALSE(.),
message = "propagate_tags must be NULL or TRUE or FALSE"
)
if (identical(memory_units, "gigabytes")) {
if (!is.null(memory) && identical(memory_units, "gigabytes")) {
memory <- memory * ((5L ^ 9L) / (2L ^ 11L))
}
args <- list()
Expand Down Expand Up @@ -547,7 +547,9 @@ crew_class_aws_batch_monitor <- R6::R6Class(
},
#' @description Submit a single AWS Batch job to the given job queue
#' under the given job definition.
#' @details Any jobs submitted this way are different from the
#' @details This method uses the job queue and job definition
#' that were supplied through [crew_aws_batch_monitor()].
#' Any jobs submitted this way are different from the
#' `crew` workers that the `crew` controller starts automatically
#' using the AWS Batch launcher plugin.
#' You may use the `submit()` method in the monitor for different
Expand Down Expand Up @@ -625,6 +627,153 @@ crew_class_aws_batch_monitor <- R6::R6Class(
arn = out$jobArn
)
# nocov end
},
#' @description Terminate an AWS Batch job.
#' @return `NULL` (invisibly).
#' @param id Character of length 1, ID of the AWS Batch job to terminate.
#' @param reason Character of length 1, natural language explaining
#' the reason the job was terminated.
terminate = function(
id,
reason = "terminated by crew.aws.batch monitor"
) {
# Covered in tests/interactive/definitions.R
# nocov start
crew::crew_assert(
id,
is.character(.),
!anyNA(.),
length(.) == 1L,
nzchar(.),
message = "job ID must be a valid character of length 1"
)
crew::crew_assert(
reason,
is.character(.),
!anyNA(.),
length(.) == 1L,
nzchar(.),
message = "'reason' must be a valid character of length 1"
)
client <- private$.client()
client$terminate_job(jobId = id, reason = reason)
invisible()
# nocov end
},
#' @description List all the jobs in the given job queue
#' with the given job definition.
#' @details This method uses the job queue and job definition
#' that were supplied through [crew_aws_batch_monitor()].
#' @return A `tibble` with one row per job and columns
#' with job information.
#' @param before Optional character of length 1, a time stamp.
#' Results are limited to jobs created before this time.
#' @param after Character of length 1, a time stamp.
#' Results are limited to jobs created after this time.
#' @param status Character vector of job states. Results are limited
#' to these job states.
jobs = function(
before = NULL,
after = NULL,
status = c(
"submitted",
"pending",
"runnable",
"starting",
"running",
"succeeded",
"failed"
)
) {
crew::crew_assert(
before %|||% "x",
is.character(.),
!anyNA(.),
length(.) == 1L,
nzchar(.),
message = "'before' must be NULL or a valid character of length 1"
)
crew::crew_assert(
after %|||% "x",
is.character(.),
!anyNA(.),
length(.) == 1L,
nzchar(.),
message = "'before' must be NULL or a valid character of length 1"
)
crew::crew_assert(
status,
is.character(.),
!anyNA(.),
nzchar(.),
message = "'status' must be a valid character of length 1"
)
crew::crew_assert(
status,
. %in% c(
"submitted",
"pending",
"runnable",
"starting",
"running",
"succeeded",
"failed"
),
message = paste(
"elements of 'status' must be \"submitted\", \"pending\",",
"\"runnable\", \"starting\", \"running\", \"succeeded\", or",
"\"failed\"."
)
)
status <- unique(status)
filters <- list(
definition = list(
name = "JOB_DEFINITION",
values = private$.job_definition
)
)
if (!is.null(before)) {
filters$before <- list(
name = "BEFORE_CREATED_AT",
values = before
)
}
if (!is.null(after)) {
filters$after <- list(
name = "AFTER_CREATED_AT",
values = after
)
}
filters <- unname(filters)
client <- private$.client()
pages <- paws.common::paginate(
Operation = client$list_jobs(
jobQueue = private$.job_queue,
filters = filters
)
)
out <- list()
for (page in pages) {
for (job in page$jobSummaryList) {
out[[length(out) + 1L]] <- tibble::tibble(
name = job$jobName,
id = job$jobId,
arn = job$jobArn,
status = job$status,
reason = if_any(
length(job$statusReason),
job$statusReason,
NA_character_
),
created = job$createdAt,
started = if_any(length(job$startedAt), job$startedAt, NA_real_),
stopped = if_any(length(job$stoppedAt), job$stoppedAt, NA_real_)
)
}
}
out <- do.call(what = rbind, args = out)
out$status <- tolower(out$status)
out[out$status %in% status, ]
}
)
)
5 changes: 4 additions & 1 deletion R/utils_aws_batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ crew_aws_batch_terminate <- function(args_client, job_id) {
config = args_client$config,
args = args_client
)
client$terminate_job(jobId = job_id, reason = "terminated by crew")
client$terminate_job(
jobId = job_id,
reason = "terminated by crew controller"
)
}

crew_aws_batch_client <- function(config, args) {
Expand Down
69 changes: 69 additions & 0 deletions man/crew_class_aws_batch_monitor.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tests/interactive/definitions.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
library(crew.aws.batch)
library(testthat)
x <- crew_aws_batch_monitor(
job_definition = "crew-aws-batch-test",
job_queue = "crew-aws-batch-job-queue",
region = "us-east-2"
)
Expand Down
30 changes: 30 additions & 0 deletions tests/interactive/jobs.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
library(crew.aws.batch)
library(testthat)
x <- crew_aws_batch_monitor(
job_definition = "crew-aws-batch-test",
job_queue = "crew-aws-batch-job-queue",
region = "us-east-2"
)
Expand All @@ -23,4 +24,33 @@ x$submit(
tags = c("crew_aws_batch_1", "crew_aws_batch_2"),
propagate_tags = TRUE
)
job <- x$submit()
out <- x$jobs()
expect_true(nrow(out) > 0L)
expect_true(job$name %in% out$name)
expect_true(job$id %in% out$id)
expect_true(job$arn %in% out$arn)
info <- out[out$name == job$name, ]
expect_true(is.na(info$reason))
expect_false(info$status %in% c("succeeded", "failed"))
good_reason <- "I have my reasons..."
x$terminate(id = info$id, reason = good_reason)
attempts <- 0
while (!info$status %in% c("succeeded", "failed")) {
message(
paste(
"checking terminated job",
sample(c("-", "\\", "|", "/"), size = 1L)
)
)
out <- x$jobs()
info <- out[out$name == job$name, ]
attempts <- attempts + 1L
if (attempts > 20L) {
stop("job did not terminate")
}
Sys.sleep(5)
}
expect_true(info$status %in% c("succeeded", "failed"))
expect_equal(info$reason, good_reason)
x$deregister()

0 comments on commit 6480de8

Please sign in to comment.