2. RcppParallel
Here we briefly introduce the RcppParallel
R
package. As explained in the previous section, Rcpp and R’s C API are not guaranteed to be thread-safe, hence calling them within parallel code is ‘for experts only’. RcppParallel
provides tools to access R vectors and matrices in a thread-safe way, thus making parallel coding easier. It also provides simple tools to parallelise your code at a higher level of abstraction, e.g. without explicitly handling parallel threads. Here we introduce the library via some basic examples, for more details see RcppParallel’s website.
Thread-safe accessors
Consider the problem of computing the error function. In R
this can be done by:
x <- rnorm(1e5)
2 * pnorm(x * sqrt(2)) - 1
An Rcpp function for doing this is:
library(Rcpp)
sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::depends(BH)]]
// [[Rcpp::export(erf)]]
NumericVector erf(NumericVector x)
{
size_t n = x.size();
NumericVector out(n);
for(size_t ii = 0; ii < n; ii++)
{
out[ii] = boost::math::erf(x[ii]);
}
return out;
}
')
Note that we use the error function defined in the Boost
C++ library, which can be accessed via the BH
package. Let’s see whether it works:
x <- rnorm(1e6)
max(abs( (2 * pnorm(x * sqrt(2)) - 1) - erf(x)))
## [1] 4.440892e-16
The numerical difference seems tolerable, however:
library(microbenchmark)
options(microbenchmark.unit="relative")
microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1,
erf(x),
times = 100)
## Unit: relative
## expr min lq mean median uq max neval
## R 1.00000 1.000000 1.000000 1.000000 1.000000 1.00000 100
## erf(x) 1.82931 1.776034 1.742589 1.753452 1.744764 1.14656 100
our Rcpp
function does not seem very efficient. Let’s see whether we can do any better by parallelising the code via RcppParallel
and OpenMP. In particular, consider the function:
library(Rcpp)
sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;
// [[Rcpp::depends(BH)]]
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::export(erfOmp)]]
NumericVector erfOmp(NumericVector x, int ncores)
{
size_t n = x.size();
NumericVector out(n);
RcppParallel::RVector<double> wo(out);
RcppParallel::RVector<double> wx(x);
#if defined(_OPENMP)
#pragma omp parallel for num_threads(ncores)
#endif
for(size_t ii = 0; ii < n; ii++)
{
wo[ii] = boost::math::erf(wx[ii]);
}
return out;
}
')
Note that the main loop has been parallelised via the OpenMP
directive:
#pragma omp parallel for num_threads(ncores)
which we have already seen in a previous section. However, within the parallel for loop, we access the input (x
) and output (out
) vectors via two wrappers of class RVector<double>
. The RVector
class provides wrappers around Rcpp vectors, which can be accessed in a thread-safe way. Importantly, no copy is taken. Let us test the function:
x <- rnorm(1e6)
max(abs( (2 * pnorm(x * sqrt(2)) - 1) - erfOmp(x, 4)))
## [1] 4.440892e-16
Looks close enough. On my Intel i7-3820 3.60GHz CPU with 4 cores 8 threads I get the following relative performance:
microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1,
erfOmp(x, 1),
erfOmp(x, 4),
erfOmp(x, 8),
times = 100)
Unit: relative
expr min lq mean median uq max neval
R 3.951898 3.717663 3.388519 3.683268 3.173645 1.8356637 100
erfOmp(x, 1) 5.690604 5.098202 4.574459 5.023652 4.261512 1.5492959 100
erfOmp(x, 4) 1.462215 1.462516 1.471631 1.668728 1.501872 0.5660757 100
erfOmp(x, 8) 1.000000 1.000000 1.000000 1.000000 1.000000 1.0000000 100
So our Rcpp code is not very efficient, but using 8 threads reduces the computational time by a factor of over 3, relative to the basic R code.
Here we simply used the RVector
class from RcppParallel
, but the latter offers also a RMatrix
class, which is a thread-safe accessor for Rcpp matrices (e.g., NumericMatrix
). See here for an example.
Parallel for loops with RcppParallel
So far we simply used the RVector
wrapper provided by RcppParallel
, now we aim at exploiting also its parallelisation tools. Before doing that, consider the following function:
sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
// [[Rcpp::depends(BH)]]
double myFun(double y){
return boost::math::erf(y);
}
// [[Rcpp::export(erfStd)]]
Rcpp::NumericVector erfStd(Rcpp::NumericVector x) {
Rcpp::NumericVector out(x.length());
std::transform(x.begin(), x.end(), out.begin(), myFun);
return out;
}
')
which is analogous to our original erf
function, but now we use std::trasform
in place of the explicit for loop. The performance is not much different:
microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1,
erfStd(x),
times = 100)
Unit: relative
expr min lq mean median uq max neval
R 1.000000 1.000000 1.000000 1.00000 1.0000 1.000000 1000
erfStd(x) 1.441561 1.441671 1.413499 1.44087 1.4398 1.244145 1000
but now its easier to explain the next step, which entails using RcppParallel
to parallelise the computation. In particular, consider the following function:
sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace RcppParallel;
// [[Rcpp::depends(BH)]]
// [[Rcpp::depends(RcppParallel)]]
double myFun(double y){
return boost::math::erf(y);
}
struct ErfVec : public Worker
{
const RVector<double> in;
RVector<double> out;
ErfVec(const Rcpp::NumericVector in_, Rcpp::NumericVector out_)
: in(in_), out(out_) {}
void operator()(std::size_t begin, std::size_t end) {
std::transform(in.begin() + begin,
in.begin() + end,
out.begin() + begin,
myFun);
}
};
// [[Rcpp::export(erfPar)]]
Rcpp::NumericVector erfPar(Rcpp::NumericVector x) {
Rcpp::NumericVector out(x.length());
ErfVec obj(x, out);
parallelFor(0, x.length(), obj);
return out;
}
')
Before explaining how this works, let’s check whether it gives correct results:
max(abs( (2 * pnorm(x * sqrt(2)) - 1) - erfPar(x)))
## [1] 4.440892e-16
and let’s check its computational performance:
microbenchmark(R = 2 * pnorm(x * sqrt(2)) - 1,
erfOmp(x, 4),
erfPar(x),
times = 100)
Unit: relative
expr min lq mean median uq max neval
R 3.865710 4.034473 3.667210 3.697507 3.548906 1.734676 100
erfOmp(x, 4) 1.428727 1.425952 1.413645 1.384761 1.364873 1.092303 100
erfPar(x) 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000 100
which seems quite good! Now, let’s examine the key steps of erfPar
. The parallel for loop is performed by the function:
void parallelFor(std::size_t begin,
std::size_t end,
Worker& worker,
std::size_t grainSize = 1)
defined in the RcppParallel
namespace. The first two arguments define the beginning and end of the for
loop, the third is an object of type Worker
(which we will discuss below) and the last argument determines the minimal chunk size for parallelization. That is, if we set grainSize = 10
, then each thread will perform at least 10 iterations of the for loop. In the code above, an argument of type ErfVec
is passed to RcppParallel
, via the Worker
argument. Objects of type ErfVec
are data structures which inherit the RcppParallel::Worker
type. The structure has two elements or members:
const RVector<double> in;
RVector<double> out;
which will be used to store the input and output vectors. The members are initialised by the constructor:
ErfVec(const Rcpp::NumericVector in_, Rcpp::NumericVector out_)
: in(in_), out(out_) {}
which is called when we do:
Rcpp::NumericVector out(x.length());
ErfVec obj(x, out);
Hence, we allocate memory for the output vector and then we pass it to the ErfVec
constructor, which will wrap it within an RVector
. Then, the structure contains the operator:
void operator()(std::size_t begin, std::size_t end) {
std::transform(in.begin() + begin,
in.begin() + end,
out.begin() + begin,
myFun);
}
which will be called by parallelFor
on a section of the input vector delimited by begin
and end
.
If we look at the source code in RcppParallel
, we see that parallelFor
is itself a wrapper around a call to:
tbb::parallel_for(tbb::blocked_range<size_t>(begin, end, grainSize),
tbbWorker);
which is defined in Intel’s Threading Building Blocks (TBB) C++ library and shipped with RcppParallel
. TBB is available on Windows, OS X, Linux, and Solaris x86, while on other platforms parallelFor
will fall back on:
void ttParallelFor(std::size_t begin, std::size_t end,
Worker& worker, std::size_t grainSize = 1)
which is a less performing version, based on the (more portable) TinyThread++ library (also shipped via RcppParallel
).
Parallel reductions with RcppParallel
Consider the problem of calculating the log-likelihood of a sample \(x_1, \dots, x_n\) under a Gaussian model. In R
we would do it by:
x <- rnorm(10)
sum(dnorm(x, 0.5, 2, log = TRUE))
The following code provides an Rcpp
implementation:
sourceCpp(code = '
#include <Rcpp.h>
#include <cmath>
using namespace Rcpp;
// [[Rcpp::plugins(cpp14)]]
// [[Rcpp::export]]
double dnormSeq(const NumericVector x, const double m, const double s) {
auto gld = [m_ = m, s_ = s] (const double x1, const double x2) {
return x1 - (x2-m_) * (x2-m_) / (s_*s_*2.0);
};
double out = std::accumulate(x.begin(), x.end(), 0.0, gld);
out -= x.length() * ::log(2 * M_PI * s * s) / 2;
return out;
}
')
Here these lines:
auto gld = [m_ = m, s_ = s] (const double x1, const double x2) {
return x1 - (x2-m_) * (x2-m_) / (s_*s_*2.0);
};
define a parametrised function, gld
, with parameters m_
and s_
, which evaluates the log-density of a normal at \(x_2\) (as in dnorm(x2, m_, s_)
) and adds it to x1
. In particular, here we are using a C++ lambda function to fix parameters m_
and s_
and to create a function with explicit arguments x1
and x2
, which can then be passed to std::accumulate
. The equivalent R code is:
# Create closure where m_ and s_ are fixed
funCreator <- function(m_, s_){
function(x1, x2) x1 - (x2-m_) * (x2-m_) / (s_*s_*2.0)
}
gld <- funCreator(0.5, 1) # Analogous to lambda function in C++
# Perform reduction
x <- rnorm(10)
Reduce(gld, x) # Analogous to std::accumulate
## [1] -6.119262
Some of the features we are using to define the lambda function are defined in the C++14 standard, hence we added the Rcpp::plugins(cpp14)
attribute. Let’s see whether the Rcpp
function works:
x <- rnorm(1e6)
dnormSeq(x, 0.5, 2)
## [1] -1768386
sum(dnorm(x, 0.5, 2, log = TRUE))
## [1] -1768386
it seems so, and it is much quicker than base R
:
microbenchmark(R = sum(dnorm(x, 0.5, 2, log = TRUE)),
dnormSeq(x, 0.5, 2))
Unit: relative
expr min lq mean median uq max neval
R 12.60742 12.38649 12.55583 12.25376 12.39503 11.30485 100
dnormSeq(x, 0.5, 2) 1.00000 1.00000 1.00000 1.00000 1.00000 1.00000 100
Now, consider the following parallel version:
sourceCpp(code = '
#include <boost/math/special_functions/erf.hpp>
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace RcppParallel;
// [[Rcpp::depends(BH)]]
// [[Rcpp::depends(RcppParallel)]]
// [[Rcpp::plugins(cpp14)]]
struct Gau : public Worker
{
const RVector<double> input;
const double m;
const double s;
double value;
Gau(const Rcpp::NumericVector input, const double m_, const double s_) : input(input), m(m_), s(s_), value(0) { }
Gau(const Gau& obj, Split) : input(obj.input), m(obj.m), s(obj.s), value(0) { }
void operator()(std::size_t begin, std::size_t end) {
auto gld = [m1 = m, s1 = s](const double x1, const double x2) {
return x1 - (x2-m1) * (x2-m1) / (s1*s1*2.0);
};
value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0, gld);
}
void join(const Gau& rhs) {
value += rhs.value;
}
};
// [[Rcpp::export]]
double dnormPar(Rcpp::NumericVector x, double m, double s) {
Gau obj(x, m, s);
parallelReduce(0, x.length(), obj);
return obj.value - x.length() * ::log(2 * M_PI * s * s) / 2;
}
')
Before explaining how this works, let’s see whether it produces correct results:
x <- rnorm(1e6)
dnormPar(x, 0.5, 2)
## [1] -1768325
sum(dnorm(x, 0.5, 2, log = TRUE))
## [1] -1768325
which seems to be the case. Let’s look at the speed-up, which on the CPU described above is:
microbenchmark(R = sum(dnorm(x, 0.5, 2, log = TRUE)),
RcppSeq = dnormSeq(x, 0.5, 2),
RcppPar = dnormPar(x, 0.5, 2))
Unit: relative
expr min lq mean median uq max neval
R 48.132613 47.623084 47.100746 47.014235 46.438556 43.741062 100
RcppSeq 3.820691 3.851518 3.677526 3.635321 3.570393 3.351935 100
RcppPar 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000 100
So on a 4 cores machine we are over 3 times faster. Let’s try to understand how this works. The dnormPar
function is relatively similar to the erfPar
example, described above. In particular, std::accumulate
has been substituted by
parallelReduce(0, x.length(), obj);
which is defined in the RcppParallel
namespace and has the same arguments as parallelFor
. However, here the structure of the worker obj
is a bit more complex, let us examine it. Objects of type Gaus
have members:
const RVector<double> input;
const double m;
const double s;
double value;
The first is the input vector, the second and third are the mean and standard deviation of the Gaussian, and the last is the value of the log-likelihood accumulated so far. The object has two constructors, the first is:
Gau(const Rcpp::NumericVector input, const double m_, const double s_) :
input(input), m(m_), s(s_), value(0) { }
This is the constructor being called when we do Gau obj(x, m, s);
in dnormPar
. The second constructor is:
Gau(const Gau& obj, Split) : input(obj.input), m(obj.m), s(obj.s), value(0) { }
and will be called by parallelReduce
. Note that it takes a reference to an object of type Gau
as input, and it uses its members (e.g. obj.input
) to initialise its own members. The Split
argument is used by parallelReduce
to split the computation. Then, we have the operator:
void operator()(std::size_t begin, std::size_t end) {
auto gld = [m1 = m, s1 = s](const double x1, const double x2) {
return x1 - (x2-m1) * (x2-m1) / (s1*s1*2.0);
};
value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0, gld);
}
which is used to perform the evaluation and accumulation of the log-likelihood (as done in the sequential version, above), but only on a subset of the input vector. Note that we create the lambda function gld
at this point, with argument m1
and s1
fixed to their values (m
and s
) within the Gau
object. The join operator:
void join(const Gau& rhs) {
value += rhs.value;
}
is used by parallelReduce
to perform the reduction.
It might be possible to accelerate the dnormPar
further. In fact, if we substitute
auto gld = [m1 = m, s1 = s]
with
auto gld = [m1 = 0.5, s1 = 2.0]
and recompile, we get a much better performance:
Unit: relative
expr min lq mean median uq max neval
R 167.53804 152.9173 123.351021 131.14373 114.322668 29.540746 100
RcppSeq 13.31754 12.3467 9.513437 10.03514 8.662867 1.953358 100
RcppPar 1.00000 1.0000 1.000000 1.00000 1.000000 1.000000 100
Understanding why this is the case might be a good exercise (I don’t know the answer).
Here we illustrated some basic tools provided by RcppParallel
, see the package webpage for more details.
References
Allaire, J.J., François, R., Ushey, K., Vandenbrouck, G. and Geelnard, M., Intel (2018) RcppParallel: Parallel Programming Tools for Rcpp. R package version 4.4.2.
Chandra, R., Dagum, L., Kohr, D., Menon, R., Maydan, D. and McDonald, J., 2001. Parallel programming in OpenMP. Morgan kaufmann.
Chapman, B., Jost, G. and Van Der Pas, R., 2008. Using OpenMP: portable shared memory parallel programming (Vol. 10). MIT press.
Eddelbuettel, D., 2019. Parallel Computing With R: A Brief Review. arXiv preprint arXiv:1912.11144.