Skip to content

Commit

Permalink
Merge pull request #24 from mrc-ide/mrc-4800
Browse files Browse the repository at this point in the history
Task autosubmission
  • Loading branch information
weshinsley authored Dec 14, 2023
2 parents a91c9dc + 8ae214c commit 9908c8b
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 63 deletions.
39 changes: 31 additions & 8 deletions R/submit.R
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
##' Submit a task to a queue
##'
##' This is a lower-level function that you will not often need to call.
##' Submit a task to a queue. This is a lower-level function that you
##' will not often need to call. Typically a task will be submitted
##' automatically to your driver on creation (e.g., with
##' [hermod::task_create_expr()]), unless you specified `submit =
##' FALSE` or you had not yet configured a driver.
##'
##' @title Submit a task
##' @param id The task id
##'
##' @param id A vector of task ids
##'
##' @param ... Disallowed additional arguments, don't use.
##'
Expand All @@ -19,11 +22,31 @@ task_submit <- function(id, ..., driver = NULL, root = NULL) {
}
root <- hermod_root(root)

## This is a bit gross, could be tidied up later.
dat <- hermod_driver_prepare(driver, root, environment())
dat$driver$submit(id, dat$config, root$path$root)

writeLines(dat$name, file.path(root$path$tasks, id, STATUS_SUBMITTED))
root$cache$driver[[id]] <- dat$name
n <- length(id)
if (n == 0) {
return(invisible())
} else if (n > 1) {
fmt <- paste("Submitting tasks {cli::pb_bar} |",
"{cli::pb_current} / {n} {cli::pb_eta}")
cli::cli_progress_bar(format = fmt, total = n)
}

for (i in id) {
dat$driver$submit(i, dat$config, root$path$root)
writeLines(dat$name, file.path(root$path$tasks, i, STATUS_SUBMITTED))
root$cache$driver[[i]] <- dat$name
if (n > 1) {
cli::cli_progress_update()
}
}

if (n == 1) {
cli::cli_alert_success("Submitted task '{i}' using '{dat$name}'")
} else {
cli::cli_alert_success("Submitted {n} tasks using '{dat$name}'")
}

invisible()
}
99 changes: 91 additions & 8 deletions R/task.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@
##' @param environment Name of the hermod environment to evaluate the
##' task within.
##'
##' @param submit Control over task submission. This will expand over
##' time once we support specifying resources. The most simple
##' interface is to use `TRUE` here to automatically submit a task,
##' using your default configuration, or `FALSE` to prevent
##' submission. The default `NULL` will submit a task if a driver
##' is configured.
##'
##' @inheritParams task_eval
##'
##' @return A task id, a string of hex characters. Use this to
##' interact with the task.
##'
##' @export
task_create_explicit <- function(expr, export = NULL, envir = .GlobalEnv,
environment = "default", root = NULL) {
environment = "default", submit = NULL,
root = NULL) {
root <- hermod_root(root)
locals <- task_locals(export, envir)

Expand All @@ -47,6 +55,9 @@ task_create_explicit <- function(expr, export = NULL, envir = .GlobalEnv,
environment = environment)
saveRDS(data, file.path(dest, EXPR))
file.create(file.path(dest, STATUS_CREATED))

task_submit_maybe(id, submit, root, rlang::current_env())

id
}

Expand All @@ -63,7 +74,8 @@ task_create_explicit <- function(expr, export = NULL, envir = .GlobalEnv,
##'
##' @inherit task_create_explicit return
##' @export
task_create_expr <- function(expr, environment = "default", root = NULL) {
task_create_expr <- function(expr, environment = "default", submit = NULL,
root = NULL) {
root <- hermod_root(root)

quo <- rlang::enquo(expr)
Expand Down Expand Up @@ -113,6 +125,9 @@ task_create_expr <- function(expr, environment = "default", root = NULL) {
environment = environment)
saveRDS(data, file.path(dest, EXPR))
file.create(file.path(dest, STATUS_CREATED))

task_submit_maybe(id, submit, root, rlang::current_env())

id
}

Expand Down Expand Up @@ -344,19 +359,61 @@ task_result <- function(id, root = NULL) {
##' @export
task_cancel <- function(id, root = NULL) {
root <- hermod_root(root)
result <- rep(FALSE, length(id))
cancelled <- rep(FALSE, length(id))
status <- task_status(id, root)
i <- status %in% c("submitted", "running")
if (any(i)) {
eligible <- status %in% c("submitted", "running")
if (any(eligible)) {
task_driver <- vcapply(id, task_get_driver, root = root)
for (driver in unique(na_omit(task_driver))) {
dat <- hermod_driver_prepare(task_driver, root, environment())
j <- task_driver == driver
result[i][j] <- dat$driver$cancel(id[i][j], dat$config, root$path$root)
cancelled[eligible][j] <-
dat$driver$cancel(id[eligible][j], dat$config, root$path$root)
}
file.create(file.path(root$path$tasks, id[cancelled], STATUS_CANCELLED))
}
task_cancel_report(id, status, cancelled, eligible)
cancelled
}


## This is surprisingly disgusting.
task_cancel_report <- function(id, status, cancelled, eligible) {
n <- length(id)
if (n == 1) {
if (cancelled) {
cli::cli_alert_success("Successfully cancelled '{id}'")
} else if (!eligible) {
cli::cli_alert_warning(
"Did not try to cancel '{id}' as it had status '{status}'")
} else {
cli::cli_alert_danger(
"Did not manage to cancel '{id}' which had status '{status}'")
}
} else if (n > 1) {
m <- sum(eligible)
if (all(cancelled)) {
cli::cli_alert_success("Successfully cancelled {n} tasks")
} else if (!any(eligible)) {
cli::cli_alert_warning(
"Did not try to cancel any of {n} tasks as none were eligible")
} else if (all(cancelled[eligible])) {
cli::cli_alert_success(
paste("Successfully cancelled {cli::qty(m)}{?the/all} {m}",
"eligible {cli::qty(m)}task{?s}",
"(of the {n} requested)"))
} else if (!any(cancelled[eligible])) {
cli::cli_alert_danger(
paste("Failed to cancel {cli::qty(m)}{?the/all} {m}",
"eligible {cli::qty(m)}task{?s}",
"(of the {n} requested)"))
} else { # some cancelled, some not
k <- sum(cancelled[eligible])
cli::cli_alert_warning(
paste("Cancelled {k} of {m} eligible {cli::qty(m)}task{?s}",
"(of the {n} requested)"))
}
file.create(file.path(root$path$tasks, id[result], STATUS_CANCELLED))
}
result
}


Expand Down Expand Up @@ -397,3 +454,29 @@ task_locals <- function(names, envir) {
rlang::env_get_list(envir, names, inherit = TRUE, last = topenv())
}
}


task_submit_maybe <- function(id, submit, root, call) {
if (!is.null(submit)) {
## Could also allow character here soon.
assert_scalar_logical(submit, call = call)
}
has_config <- length(root$config) > 0
if (isFALSE(submit) || (!has_config && is.null(submit))) {
return(FALSE)
}
if (!has_config) {
cli::cli_abort(
c("Can't submit task because no driver configured",
i = "Run 'hermod::hermod_configure()' to configure a driver"),
call = call)
}
if (length(root$config) == 1) {
driver <- names(root$config)
} else {
cli::cli_abort("Can't cope with more than one driver configured yet",
call = call)
}
task_submit(id, driver = driver, root = root)
TRUE
}
29 changes: 22 additions & 7 deletions R/util_assert.R
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
assert_scalar <- function(x, name = deparse(substitute(x))) {
assert_scalar <- function(x, name = deparse(substitute(x)), call = NULL) {
if (length(x) != 1) {
stop(sprintf("'%s' must be a scalar", name), call. = FALSE)
cli::cli_abort("'{name}' must be a scalar", call = NULL)
}
}


assert_scalar_character <- function(x, name = deparse(substitute(x))) {
assert_scalar(x, name)
assert_character(x, name)
assert_scalar_character <- function(x, name = deparse(substitute(x)),
call = NULL) {
assert_scalar(x, name, call)
assert_character(x, name, call)
}


assert_character <- function(x, name = deparse(substitute(x))) {
assert_scalar_logical <- function(x, name = deparse(substitute(x)),
call = NULL) {
assert_scalar(x, name, call)
assert_logical(x, name, call)
}


assert_character <- function(x, name = deparse(substitute(x)), call = NULL) {
if (!is.character(x)) {
stop(sprintf("'%s' must be a character", name), call. = FALSE)
cli::cli_abort("'{name}' must be a character", call = call)
}
}


assert_logical <- function(x, name = deparse(substitute(x)), call = NULL) {
if (!is.logical(x)) {
cli::cli_abort("'{name}' must be logical", call = call)
}
}
2 changes: 1 addition & 1 deletion R/windows.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ windows_credentials <- function() {
##' @author Rich FitzJohn
windows_path <- function(name, path_local, path_remote, drive_remote) {
ns <- ensure_package("hermod.windows", rlang::current_env())
ns$windows_path(name, path_local,path_remote, drive_remote)
ns$windows_path(name, path_local, path_remote, drive_remote)
}
2 changes: 1 addition & 1 deletion drivers/windows/tests/testthat/test-batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ test_that("can write a runner batch file", {
config <- root$config$windows
id <- withr::with_dir(
path_root,
hermod::task_create_explicit(quote(sessionInfo())))
hermod::task_create_explicit(quote(sessionInfo()), submit = FALSE))
write_batch_task_run(id, config, path_root)
expect_true(file.exists(file.path(root$path$tasks, id, "run.bat")))
})
Expand Down
14 changes: 7 additions & 7 deletions drivers/windows/tests/testthat/test-driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test_that("can submit a task", {

id <- withr::with_dir(
path_root,
hermod::task_create_explicit(quote(sessionInfo())))
hermod::task_create_explicit(quote(sessionInfo()), submit = FALSE))

windows_submit(id, config, path_root)

Expand Down Expand Up @@ -44,7 +44,7 @@ test_that("can get a task status", {
config <- root$config$windows
id <- withr::with_dir(
path_root,
hermod::task_create_explicit(quote(sessionInfo())))
hermod::task_create_explicit(quote(sessionInfo()), submit = FALSE))

path_root <- root$path$root
config <- root$config$windows
Expand All @@ -65,7 +65,7 @@ test_that("can get a task result", {
config <- root$config$windows
id <- withr::with_dir(
path_root,
hermod::task_create_explicit(quote(sqrt(2))))
hermod::task_create_explicit(quote(sqrt(2)), submit = FALSE))
hermod::task_eval(id, root = path_root)
expect_silent(windows_result(id, config, path_root))
expect_equal(
Expand All @@ -81,7 +81,7 @@ test_that("can cancel a task", {
config <- root$config$windows
id <- withr::with_dir(
path_root,
hermod::task_create_explicit(quote(sqrt(2))))
hermod::task_create_explicit(quote(sqrt(2)), submit = FALSE))
writeLines("1234", file.path(root$path$tasks, id, "dide_id"))

mock_client <- list(
Expand Down Expand Up @@ -111,9 +111,9 @@ test_that("can cancel a bunch of tasks, in reverse order", {
path_root <- root$path$root
config <- root$config$windows
withr::with_dir(path_root, {
id1 <- hermod::task_create_explicit(quote(sqrt(1)))
id2 <- hermod::task_create_explicit(quote(sqrt(2)))
id3 <- hermod::task_create_explicit(quote(sqrt(3)))
id1 <- hermod::task_create_explicit(quote(sqrt(1)), submit = FALSE)
id2 <- hermod::task_create_explicit(quote(sqrt(2)), submit = FALSE)
id3 <- hermod::task_create_explicit(quote(sqrt(3)), submit = FALSE)
})
ids <- c(id1, id2, id3)
writeLines("1234", file.path(root$path$tasks, id1, "dide_id"))
Expand Down
8 changes: 8 additions & 0 deletions man/task_create_explicit.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion man/task_create_expr.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions man/task_submit.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9908c8b

Please sign in to comment.