future 1.3.0 is available on CRAN. With futures, it is easy to write R code once, which the user can choose to evaluate in parallel using whatever resources s/he has available, e.g. a local machine, a set of local machines, a set of remote machines, a high-end compute cluster (via future.BatchJobs and soon also future.batchtools), or in the cloud (e.g. via googleComputeEngineR).

Silent movie clip of man in a cart catching a ride with a car passing by using a giant magnet Futures makes it easy to harness any resources at hand.

Thanks to great feedback from the community, this new version provides:

  • A convenient lapply() function

    • Added future_lapply() that works like lapply() and gives identical results with the difference that futures are used internally. Depending on user’s choice of plan(), these calculations may be processed sequential, in parallel, or distributed on multiple machines.
    • Load balancing can be controlled by argument future.scheduling, which is a scalar adjusting how many futures each worker should process.
    • Perfect reproducible random number generation (RNG) is guaranteed given the same initial seed, regardless of the type of futures used and choice of load balancing. Argument future.seed = TRUE (default) will use a random initial seed, which may also be specified as future.seed = <integer>. L’Ecuyer-CMRG RNG streams are used internally.
  • Clarifies distinction between developer and end user

    • The end user controls what future strategy to use by default, e.g. plan(multiprocess) or plan(cluster, workers = c("machine1", "machine2", "remote.server.org")).
    • The developer controls whether futures should be resolved eagerly (default) or lazily, e.g. f <- future(..., lazy = TRUE). Because of this, plan(lazy) is now deprecated.
  • Is even more friendly to multi-tenant compute environments

    • availableCores() returns the number of cores available to the current R process. On a regular machine, this typically corresponds to the number of cores on the machine (parallel::detectCores()). If option mc.cores or environment variable MC_CORES is set, then that will be returned. However, on compute clusters using schedulers such as SGE, Slurm, and TORQUE / PBS, the function detects the number of cores allotted to the job by the scheduler and returns that instead. This way developers don’t have to adjust their code to match a certain compute environment; the default works everywhere.
    • With the new version, it is possible to override the fallback value used when nothing else is specified to not be the number of cores on the machine but to option future.availableCores.fallback or environment variable R_FUTURE_AVAILABLE_FALLBACK. For instance, by using R_FUTURE_AVAILABLE_FALLBACK=1 system-wide in HPC environments, any user running outside of the scheduler will automatically use single-core processing unless explicitly requesting more cores. This lowers the risk of overloading the CPU by mistake.
    • Analogously to how availableCores() returns the number of cores, the new function availableWorkers() returns the host names available to the R process. The default is rep("localhost", times = availableCores()), but when using HPC schedulers it may be the host names of other compute notes allocated to the job.

For full details on updates, please see the NEWS file. The future package installs out-of-the-box on all operating systems.

A quick example

The bootstrap example of help("clusterApply", package = "parallel") adapted to make use of futures.

library("future")
library("boot")

run <- function(...) {
  cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
  cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
  boot(cd4, corr, R = 5000, sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle)
}

# base::lapply()
system.time(boot <- lapply(1:100, FUN = run))
###    user  system elapsed 
### 133.637   0.000 133.744
   
# Sequentially on the local machine
plan(sequential)
system.time(boot0 <- future_lapply(1:100, FUN = run, future.seed = 0xBEEF))
###    user  system elapsed 
### 134.916   0.003 135.039 

# In parallel on the local machine (with 8 cores)
plan(multisession)
system.time(boot1 <- future_lapply(1:100, FUN = run, future.seed = 0xBEEF))
###    user  system elapsed
###   0.960   0.041  29.527 
stopifnot(all.equal(boot1, boot0))

What’s next?

The future.BatchJobs package, which builds on top of BatchJobs, provides future strategies for various HPC schedulers, e.g. SGE, Slurm, and TORQUE / PBS. For example, by using plan(batchjobs_torque) instead of plan(multiprocess) your futures will be resolved distributed on a compute cluster instead of parallel on your local machine. That’s it! However, since last year, the BatchJobs package has been decommissioned and the authors recommend everyone to use their new batchtools package instead. Just like BatchJobs, it is a very well written package, but at the same time it is more robust against cluster problems and it also supports more types of HPC schedulers. Because of this, I’ve been working on future.batchtools which I hope to be able to release soon.

Finally, I’m really keen on looking into how futures can be used with Shaun Jackman’s lambdar, which is a proof-of-concept that allows you to execute R code on Amazon’s “serverless” AWS Lambda framework. My hope is that, in a not too far future (pun not intended*), we’ll be able to resolve our futures on AWS Lambda using plan(aws_lambda).

Happy futuring!

(*) Alright, I admit, it was intended.

See also