Skip to content

Commit

Permalink
Support for waiting on tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
richfitz committed Dec 15, 2023
1 parent d9b73c1 commit b8fe2b1
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 7 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ export(task_eval)
export(task_result)
export(task_status)
export(task_submit)
export(task_wait)
export(windows_credentials)
export(windows_path)
86 changes: 86 additions & 0 deletions R/task.R
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,82 @@ task_result <- function(id, root = NULL) {
}


status_to_logical <- function(status) {
switch(status,
# created = NA,
submitted = NA,
running = NA,
success = TRUE,
failure = FALSE,
cancelled = FALSE,
cli::cli_abort("Unhandled status '{status}'"))

Check warning on line 355 in R/task.R

View check run for this annotation

Codecov / codecov/patch

R/task.R#L354-L355

Added lines #L354 - L355 were not covered by tests
}


##' Wait for a single task to complete.
##'
##' @title Wait for a task to complete
##'
##' @inheritParams task_status
##'
##' @param timeout The time to wait for the task to complete. The
##' default is to wait forever.
##'
##' @param poll The interval to request an update; shorter values here
##' will return more quickly but will flail around a bit more. The
##' default is to check every second which means you wait for up to
##' a second longer than needed - for long running jobs you could
##' set this longer if you wanted.
##'
##' @param progress Logical value, indicating if a progress indicator
##' should be used. The default `NULL` uses the option
##' `hipercow.progress`, and if unset displays a progress bar in an
##' interactive session.
##'
##' @return Logical value, `TRUE` if the task completed successfully,
##' `FALSE` otherwise.
##'
##' @export
task_wait <- function(id, timeout = Inf, poll = 1, progress = NULL,
root = NULL) {
root <- hipercow_root(root)
path <- file.path(root$path$tasks, id)
status <- task_status(id, root = root)

if (status == "created") {
cli::cli_abort(
c("Cannot wait on '{id}', which has status '{status}'",
i = "You need to submit this task to wait on it"))
}

value <- status_to_logical(status)
progress <- show_progress(progress)
if (progress && is.na(value)) {
cli::cli_progress_bar(
format = paste("{cli::pb_spin} Waiting for task '{id}' |",
"{status} | {cli::pb_elapsed}"))
}

t_end <- Sys.time() + timeout
repeat {
if (!is.na(value)) {
break
}
if (Sys.time() > t_end) {
cli::cli_abort("Task '{id}' did not complete in time")
}
Sys.sleep(poll)
if (progress) {
cli::cli_progress_update()
}
status <- task_status(id, root = root)
value <- status_to_logical(status)
}

value
}


##' Cancel one or more tasks
##'
##' @title Cancel tasks
Expand Down Expand Up @@ -480,3 +556,13 @@ task_submit_maybe <- function(id, submit, root, call) {
task_submit(id, driver = driver, root = root)
TRUE
}


show_progress <- function(progress, call = NULL) {
if (is.null(progress)) {
getOption("hipercow.progress", interactive())
} else {
assert_scalar_logical(progress, call = call)
progress
}
}
35 changes: 35 additions & 0 deletions man/task_wait.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions tests/testthat/test-interface.R
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,24 @@ test_that("prevent autosubmission when more than one driver configured", {
"Submitted task")
expect_equal(task_status(id, root = root), "submitted")
})




test_that("can wait on a task, returning immediately", {
elsewhere_register()
path_here <- withr::local_tempdir()
path_there <- withr::local_tempdir()
init_quietly(path_here)
init_quietly(path_there)
suppressMessages(
hipercow_configure("elsewhere", path = path_there, root = path_here))
id <- withr::with_dir(
path_here,
suppressMessages(task_create_explicit(quote(sqrt(1)))))
expect_error(
task_wait(id, root = path_here, timeout = 0, progress = FALSE),
"Task '.+' did not complete in time")
task_eval(id, root = path_there)
expect_true(task_wait(id, root = path_here, timeout = 0, progress = FALSE))
})
71 changes: 64 additions & 7 deletions tests/testthat/test-task.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
test_that("Can create and run a simple task", {
path <- withr::local_tempdir()
init_quietly(path)
id <- withr::with_dir(path, task_create_explicit(sqrt(2)))
id <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
expect_type(id, "character")
expect_match(id, "^[[:xdigit:]]{32}$")
expect_equal(task_status(id, root = path), "created")
Expand Down Expand Up @@ -31,7 +31,7 @@ test_that("can run a task that uses local variables", {
test_that("tasks cannot be run twice", {
path <- withr::local_tempdir()
init_quietly(path)
id <- withr::with_dir(path, task_create_explicit(sqrt(2)))
id <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
expect_true(task_eval(id, root = path))
expect_error(
task_eval(id, root = path),
Expand All @@ -42,7 +42,7 @@ test_that("tasks cannot be run twice", {
test_that("throw if result not available", {
path <- withr::local_tempdir()
init_quietly(path)
id <- withr::with_dir(path, task_create_explicit(sqrt(2)))
id <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
expect_error(
task_result(id, root = path),
"Result for task '.+' not available, status is 'created'")
Expand Down Expand Up @@ -104,7 +104,7 @@ test_that("refuse to create a task outside of the root", {
test_that("use cache to avoid looking up known terminal states", {
path <- withr::local_tempdir()
init_quietly(path)
id <- withr::with_dir(path, task_create_explicit(sqrt(2)))
id <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
root <- hipercow_root(path)
root$cache$task_status_terminal[[id]] <- "failure"
expect_equal(task_status(id, root = path), "failure")
Expand All @@ -114,8 +114,8 @@ test_that("use cache to avoid looking up known terminal states", {
test_that("hipercow task status is vectorised", {
path <- withr::local_tempdir()
init_quietly(path)
id1 <- withr::with_dir(path, task_create_explicit(sqrt(1)))
id2 <- withr::with_dir(path, task_create_explicit(sqrt(2)))
id1 <- withr::with_dir(path, task_create_explicit(quote(sqrt(1))))
id2 <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
task_eval(id2, root = path)

expect_equal(task_status(character(), root = path), character(0))
Expand All @@ -130,7 +130,7 @@ test_that("hipercow task status is vectorised", {
test_that("protect against unknown task types", {
path <- withr::local_tempdir()
init_quietly(path)
id <- withr::with_dir(path, task_create_explicit(sqrt(2)))
id <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
p <- file.path(path, "hipercow", "tasks", id, "expr")
d <- readRDS(p)
d$type <- "magic"
Expand Down Expand Up @@ -199,3 +199,60 @@ test_that("can report about cancellation of a group of ids", {
"Failed to cancel the 1 eligible task (of the 10 requested)",
fixed = TRUE)
})


test_that("cannot wait on a task that has not been submitted", {
path <- withr::local_tempdir()
init_quietly(path)
id <- withr::with_dir(
path,
task_create_explicit(quote(identity(1))))
err <- expect_error(task_wait(id, root = path),
"Cannot wait on '.+', which has status 'created'")
expect_equal(err$body,
c(i = "You need to submit this task to wait on it"))
})


test_that("Can call progress while waiting", {
mock_progress_bar <- mockery::mock()
mock_progress_update <- mockery::mock()
mock_sleep <- mockery::mock()
mock_task_status <- mockery::mock("submitted", "running", "running", "success")
mockery::stub(task_wait, "cli::cli_progress_bar", mock_progress_bar)
mockery::stub(task_wait, "cli::cli_progress_update", mock_progress_update)
mockery::stub(task_wait, "Sys.sleep", mock_sleep)
mockery::stub(task_wait, "task_status", mock_task_status)

path <- withr::local_tempdir()
init_quietly(path)
id <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
expect_true(task_wait(id, progress = TRUE, poll = 0.5, root = path))

mockery::expect_called(mock_progress_bar, 1)
mockery::expect_called(mock_progress_update, 3)
expect_equal(mockery::mock_args(mock_progress_update),
rep(list(list()), 3))
mockery::expect_called(mock_sleep, 3)
expect_equal(mockery::mock_args(mock_sleep),
rep(list(list(0.5)), 3))
mockery::expect_called(mock_task_status, 4)
expect_equal(mockery::mock_args(mock_task_status),
rep(list(list(id, root = hipercow_root(path))), 4))
})


test_that("task wait is instant for completed tasks", {
path <- withr::local_tempdir()
init_quietly(path)
mock_sleep <- mockery::mock()
mockery::stub(task_wait, "Sys.sleep", mock_sleep)
id1 <- withr::with_dir(path, task_create_explicit(quote(sqrt(2))))
expect_true(task_eval(id1, root = path))
id2 <- withr::with_dir(path, task_create_explicit(quote(stop("error"))))
expect_false(task_eval(id2, root = path))

expect_true(task_wait(id1, timeout = 0, root = path))
expect_false(task_wait(id2, timeout = 0, root = path))
mockery::expect_called(mock_sleep, 0)
})

0 comments on commit b8fe2b1

Please sign in to comment.