Skip to content

Commit

Permalink
Merge pull request #20 from mrc-ide/mrc-4793
Browse files Browse the repository at this point in the history
Support for cancelling tasks
  • Loading branch information
weshinsley authored Dec 13, 2023
2 parents b1dd7fa + 6d17537 commit 71f4846
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 23 deletions.
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ Imports:
rprojroot,
withr
Suggests:
callr,
conan2,
mockery,
ps,
testthat (>= 3.0.0)
Config/testthat/edition: 3
Remotes:
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export(hermod_environment_list)
export(hermod_environment_show)
export(hermod_init)
export(hermod_provision)
export(hermod_task_cancel)
export(hermod_task_create_explicit)
export(hermod_task_create_expression)
export(hermod_task_driver)
Expand Down
9 changes: 8 additions & 1 deletion R/configure.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,24 @@ hermod_configure <- function(driver, ..., root = NULL) {
##' available (i.e., we've already checked that the task status is
##' terminal)
##'
##' @param cancel Cancel one or more tasks. Takes a vector of task
##' ids, and requests that these tasks are cancelled, returning a
##' logical vector the same length indicating if cancellation was
##' successful.
##'
##' @param provision Provision a library. Works with conan, and must
##' accept `method`, `config`, `path_root` followed by `...` to pass
##' through to `conan2::conan_configure`. It is expected this
##' function will trigger running conan to provision a library.
##'
##' @export
hermod_driver <- function(configure, submit, status, result, provision) {
hermod_driver <- function(configure, submit, status, result, cancel,
provision) {
structure(list(configure = configure,
submit = submit,
status = status,
result = result,
cancel = cancel,
provision = provision),
class = "hermod_driver")
}
Expand Down
3 changes: 2 additions & 1 deletion R/constants.R
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# nolint start
STATUS_CREATED <- "status-created"
STATUS_SUBMITTED <- "status-submitted"
STATUS_STARTED <- "status-started"
STATUS_RUNNING <- "status-running"
STATUS_SUCCESS <- "status-success"
STATUS_FAILURE <- "status-failure"
STATUS_CANCELLED <- "status-cancelled"
RESULT <- "result"
EXPR <- "expr"
# nolint end
47 changes: 40 additions & 7 deletions R/task.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ hermod_task_create_expression <- function(expr, environment = "default",
hermod_task_eval <- function(id, envir = .GlobalEnv, root = NULL) {
root <- hermod_root(root)
path <- file.path(root$path$tasks, id)
if (file.exists(file.path(path, STATUS_STARTED))) {
## TODO: we could report more about when it was started?
cli::cli_abort("Task '{id}' has already been started")
status <- hermod_task_status(id, root = root)
if (status %in% c("running", "success", "failure", "cancelled")) {
cli::cli_abort("Can't start task '{id}', which has status '{status}'")
}
file.create(file.path(path, STATUS_STARTED))
file.create(file.path(path, STATUS_RUNNING))
data <- readRDS(file.path(path, EXPR))

top <- rlang::current_env() # not quite right, but better than nothing
Expand Down Expand Up @@ -181,7 +181,7 @@ hermod_task_eval <- function(id, envir = .GlobalEnv, root = NULL) {
##'
##' * `created`
##' * `submitted`
##' * `started`
##' * `running`
##' * `success`, `failure`, `cancelled`
##'
##' These occur in increasing order and the result of this function is
Expand Down Expand Up @@ -230,7 +230,9 @@ hermod_task_status <- function(id, root = NULL) {
return(status)
}

terminal <- c(success = STATUS_SUCCESS, failure = STATUS_FAILURE)
terminal <- c(success = STATUS_SUCCESS,
failure = STATUS_FAILURE,
cancelled = STATUS_CANCELLED)

## Next, check to see if we have a terminal status for each
## task. This will be the case (with the above exit being missed) in
Expand Down Expand Up @@ -262,7 +264,7 @@ hermod_task_status <- function(id, root = NULL) {
## know that they are not in a terminal state:
i <- is.na(status)
if (any(i)) {
for (s in c(STATUS_STARTED, STATUS_CREATED)) {
for (s in c(STATUS_RUNNING, STATUS_CREATED)) {
if (any(j <- file.exists(file.path(path[i], s)))) {
status[i][j] <- sub("status-", "", s)
i <- is.na(status)
Expand Down Expand Up @@ -334,6 +336,37 @@ hermod_task_result <- function(id, root = NULL) {
}


##' Cancel one or more tasks
##'
##' @title Cancel tasks
##'
##' @param id The task id or task ids to cancel
##'
##' @inheritParams hermod_task_status
##'
##' @return A logical vector the same length as `id` indicating if the
##' task was cancelled. This will be `FALSE` if the job was already
##' completed, not running, etc.
##'
##' @export
hermod_task_cancel <- function(id, root = NULL) {
root <- hermod_root(root)
result <- rep(FALSE, length(id))
status <- hermod_task_status(id, root)
i <- status %in% c("submitted", "running")
if (any(i)) {
task_driver <- vcapply(id, hermod_task_driver, root = root)
for (driver in unique(na_omit(task_driver))) {
dat <- hermod_driver_prepare(task_driver, root, environment())
j <- task_driver == driver
result[i][j] <- dat$driver$cancel(id[i][j], dat$config, root$path$root)
}
file.create(file.path(root$path$tasks, id[result], STATUS_CANCELLED))
}
result
}


task_eval_explicit <- function(data, envir, root) {
if (!is.null(data$locals)) {
list2env(data$locals, envir)
Expand Down
7 changes: 4 additions & 3 deletions R/windows.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
##' the same as your Imperial credentials. We store this information
##' securely using the [keyring](https://keyring.r-lib.org/) package,
##' so when unlocking your credentials you will be prompted for your
##' **computer** password, which will be your DIDE password if you use a windows
##' machine connected to the DIDE domain, but will likely differ from either your DIDE or
##' Imperial password if you are outside the DIDE domain, or if you don't use Windows.
##' **computer** password, which will be your DIDE password if you use
##' a windows machine connected to the DIDE domain, but will likely
##' differ from either your DIDE or Imperial password if you are
##' outside the DIDE domain, or if you don't use Windows.
##'
##' @title DIDE windows credentials
##'
Expand Down
15 changes: 14 additions & 1 deletion drivers/windows/R/driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ hermod_driver_windows <- function() {
submit = windows_submit,
status = windows_status,
result = windows_result,
cancel = windows_cancel,
provision = windows_provision)
}

Expand Down Expand Up @@ -37,7 +38,7 @@ windows_status <- function(id, config, path_root) {
status <- rep(NA_character_, length(id))
check <- c("success" = "status-success",
"failure" = "status-failure",
"started" = "status-started")
"running" = "status-running")
path <- file.path(path_root, "hermod", "tasks", id)
for (s in names(check)) {
i <- is.na(status)
Expand All @@ -54,3 +55,15 @@ windows_result <- function(id, config, path_root) {
## Nothing to do here, but we might want to do something in the
## cases where the result is not found but the task has failed.
}


windows_cancel <- function(id, config, path_root) {
path_dide_id <- file.path(path_root, "hermod", "tasks", id, DIDE_ID)
dide_id <- vcapply(path_dide_id, readLines, USE.NAMES = FALSE)
dide_id <- dide_id[order(as.integer(dide_id), decreasing = TRUE)]
client <- get_web_client()
## Cancel here returns a named vector of "OK", and will return
## "WRONG_STATE" if cancellation fails.
res <- client$cancel(dide_id)
unname(res == "OK")
}
67 changes: 65 additions & 2 deletions drivers/windows/tests/testthat/test-driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ test_that("can get a task status", {
config <- root$config$windows
expect_equal(windows_status(id, config, path_root), "submitted")

file.create(file.path(path_root, "hermod", "tasks", id, "status-started"))
expect_equal(windows_status(id, config, path_root), "started")
file.create(file.path(path_root, "hermod", "tasks", id, "status-running"))
expect_equal(windows_status(id, config, path_root), "running")

file.create(file.path(path_root, "hermod", "tasks", id, "status-success"))
expect_equal(windows_status(id, config, path_root), "success")
Expand All @@ -72,3 +72,66 @@ test_that("can get a task result", {
hermod::hermod_task_result(id, root = path_root),
sqrt(2))
})


test_that("can cancel a task", {
mount <- withr::local_tempfile()
root <- example_root(mount, "b/c")
path_root <- root$path$root
config <- root$config$windows
id <- withr::with_dir(
path_root,
hermod::hermod_task_create_explicit(quote(sqrt(2))))
writeLines("1234", file.path(root$path$tasks, id, "dide_id"))

mock_client <- list(
cancel = mockery::mock(c("1234" = "OK"), c("1234" = "WRONG_STATE")))
mock_get_client <- mockery::mock(mock_client, cycle = TRUE)
mockery::stub(windows_cancel, "get_web_client", mock_get_client)

expect_true(windows_cancel(id, config, path_root))

mockery::expect_called(mock_get_client, 1)
expect_equal(mockery::mock_args(mock_get_client)[[1]], list())
mockery::expect_called(mock_client$cancel, 1)
expect_equal(mockery::mock_args(mock_client$cancel)[[1]], list("1234"))

expect_false(windows_cancel(id, config, path_root))

mockery::expect_called(mock_get_client, 2)
expect_equal(mockery::mock_args(mock_get_client)[[2]], list())
mockery::expect_called(mock_client$cancel, 2)
expect_equal(mockery::mock_args(mock_client$cancel)[[2]], list("1234"))
})


test_that("can cancel a bunch of tasks, in reverse order", {
mount <- withr::local_tempfile()
root <- example_root(mount, "b/c")
path_root <- root$path$root
config <- root$config$windows
withr::with_dir(path_root, {
id1 <- hermod::hermod_task_create_explicit(quote(sqrt(1)))
id2 <- hermod::hermod_task_create_explicit(quote(sqrt(2)))
id3 <- hermod::hermod_task_create_explicit(quote(sqrt(3)))
})
ids <- c(id1, id2, id3)
writeLines("1234", file.path(root$path$tasks, id1, "dide_id"))
writeLines("1235", file.path(root$path$tasks, id2, "dide_id"))
writeLines("1236", file.path(root$path$tasks, id3, "dide_id"))

mock_client <- list(
cancel = mockery::mock(c("1236" = "OK", "1235" = "OK",
"1234" = "WRONG_STATE")))
mock_get_client <- mockery::mock(mock_client)
mockery::stub(windows_cancel, "get_web_client", mock_get_client)

expect_equal(windows_cancel(ids, config, path_root),
c(TRUE, TRUE, FALSE))

mockery::expect_called(mock_get_client, 1)
expect_equal(mockery::mock_args(mock_get_client)[[1]], list())
mockery::expect_called(mock_client$cancel, 1)
expect_equal(mockery::mock_args(mock_client$cancel)[[1]],
list(c("1236", "1235", "1234")))
})
7 changes: 6 additions & 1 deletion man/hermod_driver.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions man/hermod_task_cancel.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/hermod_task_status.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions man/windows_credentials.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions tests/testthat/helper-hermod.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ elsewhere_driver <- function() {
submit = elsewhere_submit,
status = elsewhere_status,
result = elsewhere_result,
cancel = elsewhere_cancel,
provision = elsewhere_provision)
}

Expand Down Expand Up @@ -45,9 +46,7 @@ elsewhere_submit <- function(id, config, path_root) {


elsewhere_status <- function(id, config, path_root) {
## Once we rework this to use callr, this might hit the process id?
status <- hermod_task_status(id, root = config$path)
## this is really the worst we can do:
status[is.na(status)] <- "submitted"
status
}
Expand All @@ -60,6 +59,21 @@ elsewhere_result <- function(id, config, path_root) {
}


elsewhere_cancel <- function(id, config, path_root) {
queue <- file.path(config$path, "elsewhere.queue")
if (file.exists(queue)) {
queued <- readLines(queue)
writeLines(setdiff(queued, id), queue)
file.create(
file.path(config$path, "hermod", "tasks", intersect(id, queued),
"status-cancelled"))
} else {
queued <- character()
}
id %in% queued
}


elsewhere_provision <- function(method, config, path_root, environment, ...) {
conan_config <- conan2::conan_configure(
method,
Expand Down
25 changes: 25 additions & 0 deletions tests/testthat/test-interface.R
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,28 @@ test_that("can call provision", {
mockery::mock_args(mock_provision)[[1]],
list(NULL, config, path_root, environment, show_log = FALSE))
})



test_that("can cancel tasks", {
elsewhere_register()
path_here <- withr::local_tempdir()
path_there <- withr::local_tempdir()
init_quietly(path_here)
init_quietly(path_there)
root <- hermod_root(path_here)
hermod_configure("elsewhere", path = path_there, root = path_here)
id <- withr::with_dir(
path_here,
hermod_task_create_explicit(quote(sqrt(2))))

expect_equal(hermod_task_status(id, root = path_here), "created")
withr::with_dir(path_here, hermod_task_submit(id))
expect_equal(hermod_task_status(id, root = path_here), "submitted")
expect_true(hermod_task_cancel(id, root = path_here))
expect_false(hermod_task_cancel(id, root = path_here))

expect_error(
withr::with_dir(path_here, hermod_task_eval(id)),
"Can't start task '[[:xdigit:]]{32}', which has status 'cancelled'")
})
Loading

0 comments on commit 71f4846

Please sign in to comment.