Got compute?

future.apply 1.0.0 - Apply Function to Elements in Parallel using Futures - is on CRAN. With this milestone release, all* base R apply functions now have corresponding futurized implementations. This makes it easier than ever before to parallelize your existing apply(), lapply(), mapply(), … code - just prepend future_ to an apply call that takes a long time to complete. That’s it! The default is sequential processing but by using plan(multiprocess) it’ll run in parallel.

Table: All future_nnn() functions in the future.apply package. Each function takes the same arguments as the corresponding base function does.

Function Description
future_apply() Apply Functions Over Array Margins
future_lapply() Apply a Function over a List or Vector
future_sapply() - “ -
future_vapply() - “ -
future_replicate() - “ -
future_mapply() Apply a Function to Multiple List or Vector Arguments
future_Map() - “ -
future_eapply() Apply a Function Over Values in an Environment
future_tapply() Apply a Function Over a Ragged Array

* future_rapply() - Recursively Apply a Function to a List - is yet to be implemented.

## A Motivating Example

In the parallel package there is an example - in ?clusterApply - showing how to perform bootstrap simulations in parallel. After some small modifications to clarify the steps, it looks like the following:

library(parallel)
library(boot)

run1 <- function(...) {
library(boot)
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
boot(cd4, corr, R = 500, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
}

cl <- makeCluster(4) ## Parallelize using four cores
clusterSetRNGStream(cl, 123)
cd4.boot <- do.call(c, parLapply(cl, 1:4, run1))
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
stopCluster(cl)


The script defines a function run1() that produces 500 bootstrap samples, and then it calls this function four times, combines the four replicated samples into one cd4.boot, and at the end it uses boot.ci() to summarize the results.

The corresponding sequential implementation would look something like:

library(boot)

run1 <- function(...) {
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
boot(cd4, corr, R = 500, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
}

set.seed(123)
cd4.boot <- do.call(c, lapply(1:4, run1))
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)


We notice a few things about these two code snippets. First of all, in the parallel code, there are two library(boot) calls; one in the main code and one inside the run1() function. The reason for this is to make sure that the boot package is also attached in the parallel, background R session when run1() is called there. The boot package defines the boot.ci() function, as well as the boot() function and the cd4 data.frame - both used inside run1(). If boot is not attached inside the function, we would get an error on "object 'cd4' not found" when running the parallel code. In contrast, we do not need to do this in the sequential code. Also, if we later would turn our parallel script into a package, then R CMD check would complain if we kept the library(boot) call inside the run1() function.

Second, the example uses MASS::mvrnorm() in run1(). The reason for this is related to the above - if we use only mvrnorm(), we need to attach the MASS package using library(MASS) and also do so inside run1(). Since there is only one MASS function called, it’s easier and neater to use the form MASS::mvrnorm().

Third, the random-seed setup differs between the sequential and the parallel approach.

In summary, in order to turn the sequential script into a script that parallelizes using the parallel package, we would have to not only rewrite parts of the code but also be aware of important differences in order to avoid getting run-time errors due to missing packages or global variables.

One of the objectives of the future.apply package, and the future ecosystem in general, is to make transitions from writing sequential code to writing parallel code as simple and frictionless as possible.

Here is the same example parallelized using the future.apply package:

library(future.apply)
plan(multiprocess, workers = 4) ## Parallelize using four cores
library(boot)

run1 <- function(...) {
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
boot(cd4, corr, R = 500, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
}

set.seed(123)
cd4.boot <- do.call(c, future_lapply(1:4, run1, future.seed = TRUE))
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)


The difference between the sequential base-R implementation and the future.apply implementation is minimal. The future.apply package is attached, the parallel plan of four workers is set up, and the apply() function is replaced by future_apply(), where we specify future.seed = TRUE to get statistical sound and numerically reproducible parallel random number generation (RNG). More importantly, notice how there is no need to worry about which packages need to be attached on the workers and which global variables need to be exported. That is all taken care of automatically by the future framework.

## Q&A

Q. What are my options for parallelization?
A. Everything in future.apply is processed through the future framework. This means that all parallelization backends supported by the parallel package are supported out of the box, e.g. on your local machine, and on local or remote ad-hoc compute clusters (also in the cloud). Additional parallelization and distribution schemas are provided by backends such as future.callr (parallelization on your local machine) and future.batchtools (large-scale parallelization via HPC job schedulers). For other alternatives, see the CRAN Page for the future package and the High-Performance and Parallel Computing with R CRAN Task View.

Q. Righty-oh, so how do I specify which parallelization backend to use?
A. A fundamental design pattern of the future framework is that the end user decides how and where to parallelize while the developer decides what to parallelize. This means that you do not specify the backend via some argument to the future_nnn() functions. Instead, the backend is specified by the plan() function - you can almost think of it as a global option that the end user controls. For example, plan(multiprocess) will parallelize on the local machine, so will plan(future.callr::callr), whereas plan(cluster, workers = c("n1", "n2", "remote.server.org")) will parallelize on two local machines and one remote machine. Using plan(future.batchtools::batchtools_sge) will distribute the processing on your SGE-supported compute cluster. BTW, you can also have nested parallelization strategies, e.g. plan(list(tweak(cluster, workers = nodes), multiprocess)) where nodes = c("n1", "n2", "remote.server.org").

A. The default behavior of all functions is to distribute equally-sized chunks of elements to each available background worker - such that each worker process exactly one chunk (= one future). If the processing times vary significantly across chunks, you can increase the average number of chunks processed by each worker, e.g. to have them process two chunks on average, specify future.scheduling = 2.0. Alternatively, you can specify the number of elements processed per chunk, e.g. future.chunk.size = 10L (an analog to the chunk.size argument added to the parallel package in R 3.5.0).
A. Just add future.seed = TRUE and you’re good. This will use parallel safe and statistical sound L’Ecuyer-CMRG RNG, which is a well-established parallel RNG algorithm and used by the parallel package. The future.apply functions use this in a way that is also invariant to the future backend and the amount of “chunking” used. To produce numerically reproducible results, set set.seed(123) before (as in the above example), or simply use future.seed = 123.