SC2

Statistical Computing 2

2. RcppParallel


Here we briefly introduce the RcppParallel R package. As explained in the previous section, Rcpp and R’s C API are not guaranteed to be thread-safe, hence calling them within parallel code is ‘for experts only’. RcppParallel provides tools to access R vectors and matrices in a thread-safe way, thus making parallel coding easier. It also provides simple tools to parallelise your code at a higher level of abstraction, e.g. without explicitly handling parallel threads. Here we introduce the library via some basic examples, for more details see RcppParallel’s website.

Thread-safe accessors

Consider the problem of computing the error function. In R this can be done by:

x <- rnorm(1e5)
2 * pnorm(x * sqrt(2)) - 1

An Rcpp function for doing this is:

library(Rcpp)
sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
using namespace Rcpp;

// [[Rcpp::depends(BH)]]

// [[Rcpp::export(erf)]]
NumericVector erf(NumericVector x)
{

 size_t n = x.size();
 NumericVector out(n);
 
 for(size_t ii = 0; ii < n; ii++)
 {
  out[ii] = boost::math::erf(x[ii]);
 }
 
 return out;

 }
')

Note that we use the error function defined in the Boost C++ library, which can be accessed via the BH package. Let’s see whether it works:

x <- rnorm(1e6)
max(abs( (2 * pnorm(x * sqrt(2)) - 1) - erf(x)))
## [1] 4.440892e-16

The numerical difference seems tolerable, however:

library(microbenchmark)
options(microbenchmark.unit="relative")
microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1, 
               erf(x), 
               times = 100)
## Unit: relative
##    expr     min       lq     mean   median       uq     max neval
##       R 1.00000 1.000000 1.000000 1.000000 1.000000 1.00000   100
##  erf(x) 1.82931 1.776034 1.742589 1.753452 1.744764 1.14656   100

our Rcpp function does not seem very efficient. Let’s see whether we can do any better by parallelising the code via RcppParallel and OpenMP. In particular, consider the function:

library(Rcpp)
sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;

// [[Rcpp::depends(BH)]]
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::plugins(openmp)]]

// [[Rcpp::export(erfOmp)]]
NumericVector erfOmp(NumericVector x, int ncores)
{

 size_t n = x.size();
 NumericVector out(n);
 RcppParallel::RVector<double> wo(out);
 RcppParallel::RVector<double> wx(x);
 
 #if defined(_OPENMP)
  #pragma omp parallel for num_threads(ncores)
 #endif
 for(size_t ii = 0; ii < n; ii++)
 {
  wo[ii] = boost::math::erf(wx[ii]);
 }
 
 return out;

 }
')

Note that the main loop has been parallelised via the OpenMP directive:

#pragma omp parallel for num_threads(ncores)

which we have already seen in a previous section. However, within the parallel for loop, we access the input (x) and output (out) vectors via two wrappers of class RVector<double>. The RVector class provides wrappers around Rcpp vectors, which can be accessed in a thread-safe way. Importantly, no copy is taken. Let us test the function:

x <- rnorm(1e6)
max(abs( (2 * pnorm(x * sqrt(2)) - 1) - erfOmp(x, 4)))
## [1] 4.440892e-16

Looks close enough. On my Intel i7-3820 3.60GHz CPU with 4 cores 8 threads I get the following relative performance:

microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1, 
               erfOmp(x, 1), 
               erfOmp(x, 4),
               erfOmp(x, 8),
               times = 100)
Unit: relative
         expr      min       lq     mean   median       uq       max neval
            R 3.951898 3.717663 3.388519 3.683268 3.173645 1.8356637   100
 erfOmp(x, 1) 5.690604 5.098202 4.574459 5.023652 4.261512 1.5492959   100
 erfOmp(x, 4) 1.462215 1.462516 1.471631 1.668728 1.501872 0.5660757   100
 erfOmp(x, 8) 1.000000 1.000000 1.000000 1.000000 1.000000 1.0000000   100

So our Rcpp code is not very efficient, but using 8 threads reduces the computational time by a factor of over 3, relative to the basic R code.

Here we simply used the RVector class from RcppParallel, but the latter offers also a RMatrix class, which is a thread-safe accessor for Rcpp matrices (e.g., NumericMatrix). See here for an example.

Parallel for loops with RcppParallel

So far we simply used the RVector wrapper provided by RcppParallel, now we aim at exploiting also its parallelisation tools. Before doing that, consider the following function:

sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>

// [[Rcpp::depends(BH)]]

double myFun(double y){
  return boost::math::erf(y);
} 

// [[Rcpp::export(erfStd)]]
Rcpp::NumericVector erfStd(Rcpp::NumericVector x) {
  
  Rcpp::NumericVector out(x.length());
  
  std::transform(x.begin(), x.end(), out.begin(), myFun);
  
  return out;
}
')

which is analogous to our original erf function, but now we use std::trasform in place of the explicit for loop. The performance is not much different:

microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1, 
               erfStd(x),
               times = 100)
Unit: relative
      expr      min       lq     mean  median     uq      max neval
         R 1.000000 1.000000 1.000000 1.00000 1.0000 1.000000  1000
 erfStd(x) 1.441561 1.441671 1.413499 1.44087 1.4398 1.244145  1000

but now its easier to explain the next step, which entails using RcppParallel to parallelise the computation. In particular, consider the following function:

sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace RcppParallel;

// [[Rcpp::depends(BH)]]
// [[Rcpp::depends(RcppParallel)]]

double myFun(double y){
  return boost::math::erf(y);
} 

struct ErfVec : public Worker
{
   const RVector<double> in;
   
   RVector<double> out;
   
   ErfVec(const Rcpp::NumericVector in_, Rcpp::NumericVector out_) 
      : in(in_), out(out_) {}
   
   void operator()(std::size_t begin, std::size_t end) {
      std::transform(in.begin() + begin, 
                     in.begin() + end, 
                     out.begin() + begin, 
                     myFun);
   }
};

// [[Rcpp::export(erfPar)]]
Rcpp::NumericVector erfPar(Rcpp::NumericVector x) {
  
  Rcpp::NumericVector out(x.length());
  
  ErfVec obj(x, out);
  
  parallelFor(0, x.length(), obj);
  
  return out;
}
')

Before explaining how this works, let’s check whether it gives correct results:

max(abs( (2 * pnorm(x * sqrt(2)) - 1) - erfPar(x)))
## [1] 4.440892e-16

and let’s check its computational performance:

microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1, 
               erfOmp(x, 4),
               erfPar(x),
               times = 100)
Unit: relative
         expr      min       lq     mean   median       uq      max neval
            R 3.865710 4.034473 3.667210 3.697507 3.548906 1.734676   100
 erfOmp(x, 4) 1.428727 1.425952 1.413645 1.384761 1.364873 1.092303   100
    erfPar(x) 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000   100

which seems quite good! Now, let’s examine the key steps of erfPar. The parallel for loop is performed by the function:

void parallelFor(std::size_t begin,
                 std::size_t end, 
                 Worker& worker,
                 std::size_t grainSize = 1)

defined in the RcppParallel namespace. The first two arguments define the beginning and end of the for loop, the third is an object of type Worker (which we will discuss below) and the last argument determines the minimal chunk size for parallelization. That is, if we set grainSize = 10, then each thread will perform at least 10 iterations of the for loop. In the code above, an argument of type ErfVec is passed to RcppParallel, via the Worker argument. Objects of type ErfVec are data structures which inherit the RcppParallel::Worker type. The structure has two elements or members:

const RVector<double> in;
RVector<double> out;

which will be used to store the input and output vectors. The members are initialised by the constructor:

ErfVec(const Rcpp::NumericVector in_, Rcpp::NumericVector out_) 
      : in(in_), out(out_) {}

which is called when we do:

Rcpp::NumericVector out(x.length());
ErfVec obj(x, out);

Hence, we allocate memory for the output vector and then we pass it to the ErfVec constructor, which will wrap it within an RVector. Then, the structure contains the operator:

void operator()(std::size_t begin, std::size_t end) {
    std::transform(in.begin() + begin, 
                   in.begin() + end, 
                   out.begin() + begin, 
                   myFun);
}

which will be called by parallelFor on a section of the input vector delimited by begin and end.

If we look at the source code in RcppParallel, we see that parallelFor is itself a wrapper around a call to:

tbb::parallel_for(tbb::blocked_range<size_t>(begin, end, grainSize), 
                     tbbWorker);

which is defined in Intel’s Threading Building Blocks (TBB) C++ library and shipped with RcppParallel. TBB is available on Windows, OS X, Linux, and Solaris x86, while on other platforms parallelFor will fall back on:

void ttParallelFor(std::size_t begin, std::size_t end, 
                          Worker& worker, std::size_t grainSize = 1)

which is a less performing version, based on the (more portable) TinyThread++ library (also shipped via RcppParallel).

Parallel reductions with RcppParallel

Consider the problem of calculating the log-likelihood of a sample \(x_1, \dots, x_n\) under a Gaussian model. In R we would do it by:

x <- rnorm(10)
sum(dnorm(x, 0.5, 2, log = TRUE))

The following code provides an Rcpp implementation:

sourceCpp(code = ' 
#include <Rcpp.h>
#include <cmath>
using namespace Rcpp;

// [[Rcpp::plugins(cpp14)]] 

// [[Rcpp::export]]
double dnormSeq(const NumericVector x, const double m, const double s) {

  auto gld = [m_ = m, s_ = s] (const double x1, const double x2) { 
   return x1 - (x2-m_) * (x2-m_) / (s_*s_*2.0); 
  };

  double out = std::accumulate(x.begin(), x.end(), 0.0, gld);
  
  out -= x.length() * ::log(2 * M_PI * s * s) / 2;
  
  return out;
}
')

Here these lines:

 auto gld = [m_ = m, s_ = s] (const double x1, const double x2) { 
   return x1 - (x2-m_) * (x2-m_) / (s_*s_*2.0); 
  };

define a parametrised function, gld, with parameters m_ and s_, which evaluates the log-density of a normal at \(x_2\) (as in dnorm(x2, m_, s_)) and adds it to x1. In particular, here we are using a C++ lambda function to fix parameters m_ and s_ and to create a function with explicit arguments x1 and x2, which can then be passed to std::accumulate. The equivalent R code is:

# Create closure where m_ and s_ are fixed
funCreator <- function(m_, s_){
  function(x1, x2)  x1 - (x2-m_) * (x2-m_) / (s_*s_*2.0)
}
gld <- funCreator(0.5, 1) # Analogous to lambda function in C++

# Perform reduction
x <- rnorm(10)
Reduce(gld, x) # Analogous to std::accumulate
## [1] -6.119262

Some of the features we are using to define the lambda function are defined in the C++14 standard, hence we added the Rcpp::plugins(cpp14) attribute. Let’s see whether the Rcpp function works:

x <- rnorm(1e6)
dnormSeq(x, 0.5, 2)
## [1] -1768386
sum(dnorm(x, 0.5, 2, log = TRUE))
## [1] -1768386

it seems so, and it is much quicker than base R:

microbenchmark(R = sum(dnorm(x, 0.5, 2, log = TRUE)), 
               dnormSeq(x, 0.5, 2))
Unit: relative
                expr      min       lq     mean   median       uq      max neval
                   R 12.60742 12.38649 12.55583 12.25376 12.39503 11.30485   100
 dnormSeq(x, 0.5, 2)  1.00000  1.00000  1.00000  1.00000  1.00000  1.00000   100

Now, consider the following parallel version:

sourceCpp(code = ' 
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace RcppParallel;

// [[Rcpp::depends(BH)]]
// [[Rcpp::depends(RcppParallel)]]

// [[Rcpp::plugins(cpp14)]] 
  
struct Gau : public Worker
{
   const RVector<double> input;
   
   const double m;
   const double s;
  
   double value;
   
   Gau(const Rcpp::NumericVector input, const double m_, const double s_) : input(input), m(m_), s(s_), value(0) { }
   Gau(const Gau& obj, Split) : input(obj.input), m(obj.m), s(obj.s), value(0) { }

   void operator()(std::size_t begin, std::size_t end) {
      auto gld = [m1 = m, s1 = s](const double x1, const double x2) { 
       return x1 - (x2-m1) * (x2-m1) / (s1*s1*2.0); 
      };
      value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0, gld);
   }

   void join(const Gau& rhs) {
      value += rhs.value;
   }
};

// [[Rcpp::export]]
double dnormPar(Rcpp::NumericVector x, double m, double s) {

   Gau obj(x, m, s);

   parallelReduce(0, x.length(), obj);

   return obj.value - x.length() * ::log(2 * M_PI * s * s) / 2;
}

')

Before explaining how this works, let’s see whether it produces correct results:

x <- rnorm(1e6)
dnormPar(x, 0.5, 2)
## [1] -1768325
sum(dnorm(x, 0.5, 2, log = TRUE))
## [1] -1768325

which seems to be the case. Let’s look at the speed-up, which on the CPU described above is:

microbenchmark(R = sum(dnorm(x, 0.5, 2, log = TRUE)), 
               RcppSeq = dnormSeq(x, 0.5, 2), 
               RcppPar = dnormPar(x, 0.5, 2))
Unit: relative
    expr       min        lq      mean    median        uq       max neval
       R 48.132613 47.623084 47.100746 47.014235 46.438556 43.741062   100
 RcppSeq  3.820691  3.851518  3.677526  3.635321  3.570393  3.351935   100
 RcppPar  1.000000  1.000000  1.000000  1.000000  1.000000  1.000000   100

So on a 4 cores machine we are over 3 times faster. Let’s try to understand how this works. The dnormPar function is relatively similar to the erfPar example, described above. In particular, std::accumulate has been substituted by

parallelReduce(0, x.length(), obj);

which is defined in the RcppParallel namespace and has the same arguments as parallelFor. However, here the structure of the worker obj is a bit more complex, let us examine it. Objects of type Gaus have members:

const RVector<double> input;
const double m;
const double s;
double value;

The first is the input vector, the second and third are the mean and standard deviation of the Gaussian, and the last is the value of the log-likelihood accumulated so far. The object has two constructors, the first is:

Gau(const Rcpp::NumericVector input, const double m_, const double s_) : 
  input(input), m(m_), s(s_), value(0) { }

This is the constructor being called when we do Gau obj(x, m, s); in dnormPar. The second constructor is:

Gau(const Gau& obj, Split) : input(obj.input), m(obj.m), s(obj.s), value(0) { }

and will be called by parallelReduce. Note that it takes a reference to an object of type Gau as input, and it uses its members (e.g. obj.input) to initialise its own members. The Split argument is used by parallelReduce to split the computation. Then, we have the operator:

void operator()(std::size_t begin, std::size_t end) {
 auto gld = [m1 = m, s1 = s](const double x1, const double x2) { 
  return x1 - (x2-m1) * (x2-m1) / (s1*s1*2.0); 
 };
 value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0, gld);
}

which is used to perform the evaluation and accumulation of the log-likelihood (as done in the sequential version, above), but only on a subset of the input vector. Note that we create the lambda function gld at this point, with argument m1 and s1 fixed to their values (m and s) within the Gau object. The join operator:

void join(const Gau& rhs) {
    value += rhs.value;
}

is used by parallelReduce to perform the reduction.

It might be possible to accelerate the dnormPar further. In fact, if we substitute

auto gld = [m1 = m, s1 = s]

with

auto gld = [m1 = 0.5, s1 = 2.0]

and recompile, we get a much better performance:

Unit: relative
    expr       min       lq       mean    median         uq       max neval
       R 167.53804 152.9173 123.351021 131.14373 114.322668 29.540746   100
 RcppSeq  13.31754  12.3467   9.513437  10.03514   8.662867  1.953358   100
 RcppPar   1.00000   1.0000   1.000000   1.00000   1.000000  1.000000   100

Understanding why this is the case might be a good exercise (I don’t know the answer).

Here we illustrated some basic tools provided by RcppParallel, see the package webpage for more details.

References