Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
richfitz committed Jul 25, 2024
1 parent 2930219 commit 9717960
Showing 1 changed file with 74 additions and 39 deletions.
113 changes: 74 additions & 39 deletions vignettes_src/workers.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ vignette: >

<!-- This vignette really requires a real cluster behind it to work
properly; we're going to submit a bunch of workers and tasks at
them. It requires a working redis server too, so is going to be
hard to get running on CI. Eventually, it would probably be good
to get this working with the example driver though as then it's
more likely to keep working -->

them, and the example driver cannot support this without
involving something better for the queue (otherwise it's very
racy and we can't get multiple runners going. We'll do the first
bit of the PR with a single runner using 'example', then switch
to the windows runner for some more exciting work. -->

```{r setup, include = FALSE}
source("../vignettes/common.R")
Expand Down Expand Up @@ -55,8 +55,7 @@ cleanup <- withr::with_dir(
vignette_root,
hipercow::hipercow_example_helper(with_logging = TRUE,
new_directory = FALSE,
initialise = FALSE,
runner = 3))
initialise = FALSE))
```

# The lightweight queue pattern
Expand All @@ -73,12 +72,26 @@ The interface to this controller is subject to change, but many of the ideas wil
r
```

The other thing you'll need are some workers. Let's submit four workers to the cluster, and wait for them to become available:
The other thing you'll need are some workers. Let's submit a single worker to the cluster, and wait for it to become available:

```{r}
info <- hipercow_rrq_workers_submit(1)
info
```

TODO:

* check on the job status while waiting for workers - this should be easy to do
* rrq not installed in the bootstrap, but it should be

The workers are submitted as task bundles and can be inspected using their bundle name like any other task:

```{r}
hipercow_rrq_workers_submit(2)
hipercow_bundle_status(info$bundle_name)
```

This worker will remain running for 10 minutes after the last piece of work it runs.

## Basic usage

We'll load the `rrq` packages to make the calls a little clearer to read:
Expand Down Expand Up @@ -119,13 +132,6 @@ system.time({
})
```

Passing in the `controller` argument here will possibly be annoying as you'll probably only ever have a single rrq controller, so you can use `rrq_default_controller_set` to set a default controller and then omit this argument:

```{r}
rrq_default_controller_set(r)
rrq_task_status(id)
```

## Scaling up

Let's submit 1,000 trivial tasks, using `rrq_task_create_bulk_expr`, taking the square root of the first thousand positive integers.
Expand All @@ -146,34 +152,38 @@ result <- rrq_task_results(ids)

```{r, include = FALSE}
t1 <- Sys.time()
elapsed <- round(as.numeric(Sys.time() - (Sys.time() - 20), "secs"), 1)
elapsed <- as.numeric(t1 - t0, "secs")
```

This process has taken `r elapsed` seconds, which is likely much faster than submitting this many tasks directly.

## Permanence

You should not treat data in a submitted task as permanent; it is subject for deletion at any point! So your aim should be to pull the data out of rrq as soon as you can. Practically we won't delete data from the database for at least a few days after creation, but we make no guarantees. We'll describe cleanup here later.
This process has taken `r round(elapsed, 2)` seconds, which is likely much faster than submitting this many tasks directly.

## Controlling the worker
```{r, include = FALSE}
rrq_destroy()
cleanup()
```

The workers will use the `rrq` environment if it exists, failing that the `default` environment. So if you need different packages and sources loaded on the workers on your normal tasks, you can do this by creating a different environment
# Interprocess commuication pattern

```{r}
hipercow_environment_create("rrq", packages = "cowsay")
```
You can submit a task to the cluster that accesses a pool of workers. This is quite difficult to demonstrate in a clear way, but bear with us.

**TODO**: *work out how to refresh this environment; I think that's just a message to send*
The general pattern we try to achieve here is this:

You can submit your workers with any resources and parallel control you want (see `vignettes("parallel")` for details); pass these as `resources` and `parallel` to `hipercow_rrq_workers_submit()`.
1. Submit a normal hipercow task to the cluster
2. This task distributes work over a set of workers; this might happen several times
3. Return some summary of this work from your main task

# Interprocess commuication pattern
A practical example of this approach might be to submit a hipercow task that runs an orderly report and within that you run an MCMC where you send each chain to a different worker (which may each use multiple cores). This gives you three levels of parallelism and the ability to span past a single node fairly easily. Though you may need to use a pen and paper to keep track of what you're doing.

You can submit a task to the cluster that accesses your pool of workers. We still have four workers already running in the previous example, and we already know how to distribute work over them.
For this example, we'll use the "windows" driver and submit work to a series of four workers running on the DIDE windows cluster.

The example here is contrived but contains all the ingredients needed to do a map-reduce style task, where you submit a task to the cluster and that task distributes subtasks across a pool of workers, performs some reduction on them, and returns.
```{r}
hipercow_init(driver = "windows")
r <- hipercow_rrq_controller()
```

```{r, include = FALSE}
vignette_root <- new_hipercow_root_path(TRUE)
set_vignette_root(vignette_root)
code <- c(
"example <- function(n) {",
" ids <- rrq:rrq_task_create_bulk_call(sqrt, seq_len(n))",
Expand All @@ -186,25 +196,32 @@ code <- c(
writeLines(code, "code.R")
```

We'll submit a hipercow task that runs this small piece of code:

```{r, echo = FALSE, results = "asis"}
writeLines("code.R")
r_output(readLines("code.R"))
```

There are several important points here:

* You do not need to use `controller` argument to any of the rrq functions; if you submit this task with the appropriate `parallel` argument (see below) the default controller will be configured for you
* We delete the tasks as soon as we collect their results
Hopefully this is fairly self-explanatory, if a bit pointless. Note that we delete the rrq tasks after completing them, which prevents rrq tasks accumulating.

Save this in our default environment:
This code needs to be available in the hipercow environment run by the task:

```{r}
hipercow_environment_create("default", sources = "code.R")
```

We also need some workers - let's submit 4 of them:

```{r}
info <- hipercow_rrq_workers_submit(4)
```

now submit a task that will call this function:

```{r}
id <- task_create_expr(example(16), parallel = hipercow_parallel(use_rrq = TRUE))
id <- task_create_expr(
example(16),
parallel = hipercow_parallel(use_rrq = TRUE))
id
```

Expand Down Expand Up @@ -240,3 +257,21 @@ hipercow_rrq_stop_workers_once_idle()
```

which is hopefully self-explanatory.

# General considerations

## Permanence

You should not treat data in a submitted task as permanent; it is subject for deletion at any point! So your aim should be to pull the data out of rrq as soon as you can. Practically we won't delete data from the database for at least a few days after creation, but we make no guarantees. We'll describe cleanup here later.

## Controlling the worker

The workers will use the `rrq` environment if it exists, failing that the `default` environment. So if you need different packages and sources loaded on the workers on your normal tasks, you can do this by creating a different environment

```{r}
hipercow_environment_create("rrq", packages = "cowsay")
```

**TODO**: *work out how to refresh this environment; I think that's just a message to send*

You can submit your workers with any resources and parallel control you want (see `vignettes("parallel")` for details); pass these as `resources` and `parallel` to `hipercow_rrq_workers_submit()`.

0 comments on commit 9717960

Please sign in to comment.