您的位置:首页 > 编程语言 > C语言/C++

用C++11优化矩阵运算的空间和时间效率

2014-08-28 00:21 731 查看
最近在segmentfault.com上看到一个问题,提问者想要利用C++11的移动语义,减少矩阵相加时临时对象的构造。受此启发,我实现了一个简单的矩阵类,利用C++11标准中的一些特性,对矩阵运算进行了时间和空间效率的优化。(完整代码:matrix.hmatrix_test.ccrun.sh

用C++11的移动语义,优化矩阵运算的空间效率

C++11的右值引用(T&&),可以将一个变量标记为“临时的”、“在此之后不再需要”,让该变量的使用者可以放心地将其内容“偷”走,而不用担心会有不良后果。利用该特性,可以减少矩阵运算中不必要的临时对象构造和拷贝,实现空间效率的优化。

矩阵加法、减法和数乘

先讨论矩阵加法。

常用的加法函数的声明格式为 T operator+(const T& lhs, const T& rhs); ,例如std::plus。这样符合我们对加法的常识,两个加数在相加前后都不变(不被修改),返回一个临时对象作为结果。这样做的代价是,每次加法都要构造一个临时对象。(这里有个插曲,返回值优化会在 T
v = a1 + a2; 中省去一次临时对象构造,但在 T v = a1 + a2 + ... + an; 中仍然要进行n-1次临时对象的构造,临时对象构造不可避免)

而在矩阵运算中,构造临时矩阵代价就更大。因此,除了Matrix<T> operator+(const Matrix<T>& lhs, const Matrix<T>& rhs); 这种常用形式,可以引入另外两种声明形式:

Matrix<T> operator+(const Matrix& lhs, Matrix&& rhs);
Matrix<T> operator+(Matrix&& lhs, const Matrix& rhs);

在连续相加中产生的临时矩阵,匹配到右值引用参数,然后其空间被“偷”走,作为函数返回值传出。再加上返回值优化,就可以在整个加法表达式中不构造任何临时矩阵,全部实现原地运算,优化矩阵加法的空间效率。

实现代码如下:

template <typename T>
Matrix<T> operator+(const Matrix<T>& lhs, Matrix<T>&& rhs) {
matrix::CheckDimensionMatches(lhs, rhs);
Matrix<T> result(std::move(rhs));
result += lhs;
return result;
}

template <typename T>
Matrix<T> operator+(Matrix<T>&& lhs, const Matrix<T>& rhs) {
return rhs + std::move(lhs);
}
矩阵减法和矩阵数乘原理相同,仅列出函数声明:

template <typename T>
Matrix<T> operator-(const Matrix<T>& lhs, Matrix<T>&& rhs);
template <typename T>
Matrix<T> operator-(Matrix<T>&& lhs, const Matrix<T>& rhs);
template <typename T>
Matrix<T> operator*(Matrix<T>&& mat, const T& scaler);
template <typename T>
Matrix<T> operator*(const T& scaler, Matrix<T>&& mat);
测试代码:

void TestArithmetrics_Temporaries() {
cout << "prepare a, b, c, scaler, expected" << endl;
Matrix<int> a = {
{1, 2},
{3, 4},
}, b = {
{2, 3},
{4, 5},
}, c = {
{1, 1},
{1, 1},
}, expected = {
{2, 2},
{2, 2},
};
int scaler = 3;
cout << "Matrix<int> d = std::move(c) * scaler + a - b;" << endl;
Matrix<int> d = std::move(c) * scaler + a - b;
assert(d.equal_to(expected));
cout << "done." << endl;
}
输出结果:

prepare a, b, c, scaler, expected
Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>)
Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>)
Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>)
Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>)
Matrix<int> d = std::move(c) * scaler + a - b;
Matrix::Matrix(Matrix&&)  // std::move(c) * scaler
Matrix::Matrix(Matrix&&)  // tmp + a
Matrix::Matrix(Matrix&&)  // tmp + b
done.  // because of RVO, d's ctor is not called

矩阵和矩阵相乘

这里涉及到我的矩阵实现细节,代码如下:

template <typename T>
class Matrix {
private:
std::vector<std::vector<T>> data_;
std::size_t column_size_;
};
其中,data_采用行优先存储,也即data_[row]存储矩阵的第row行。

与矩阵加法中的情况类似,矩阵乘法的常用声明形式为 Matrix<T> operator*(const Matrix& lhs, const Matrix& rhs); ,从而不可避免地要构造一个临时矩阵。为了消灭该临时矩阵构造的必要性,还是使用右值引用的声明形式:

template <typename T>
Matrix<T> operator*(Matrix<T>&& lhs, const Matrix<T>& rhs);
template <typename T>Matrix<T>
operator*(const Matrix<T>& lhs, Matrix<T>&& rhs);


但与矩阵加法、加法和数乘不同,矩阵相乘的结果矩阵与乘数矩阵维数未必相同,需要进行维数的修改。下面对以上两种声明形式分别讨论。

对于 Matrix<T> operator*(Matrix<T>&& lhs, const Matrix<T>& rhs); 而言,需要将lhs的存储“偷”走作为结果矩阵( Matrix<T> result(std::move(lhs); )。结果矩阵行数就是lhs的行数,故 result.data_.size()可以保持不变。

定义一个临时向量 vector<T> row_result(rhs.column_size()); ,计算结果矩阵的一行并存储到row_result中。考虑到result.data_[row]是std::vector<T>类型,可以利用 result.data_[row].swap(row_result); 进行原地置换,省去了复制的时间。继续以此方法计算并填充结果矩阵的所有行,完成矩阵相乘的计算逻辑。代码如下:

template <typename T>
Matrix<T> operator*(Matrix<T>&& lhs, const Matrix& rhs) {
matrix::CheckDimensionMultipliable(lhs, rhs);
typedef typename Matrix<T>::size_type size_type;
Matrix<T> result(std::move(lhs));
std::vector<T> row_result(rhs.column_size());
for (size_type row = 0; row < row_size(); ++row) {
row_result.resize(rhs.column_size());

// Calculates single row in result matrix.
for (size_type column = 0; column < rhs.column_size(); ++column) {
row_result[column] = std::inner_product(
result.row_begin(row), result.row_end(row),
rhs.column_begin(column), T());
}

// Inplace puts calculated row into result matrix.
result.data_[row].swap(row_result);
}
result.column_size_ = rhs.column_size();
return result;
}


对于 Matrix<T> operator*(const Matrix<T>& lhs, Matrix<T>&& rhs); 而言,需要将rhs的存储“偷”走作为结果矩阵(Matrix<T> result(std::move(rhs)); )。结果矩阵行数为lhs.row_size()(未必等于rhs.row_size()),因此需要改变result.data_.size()。

定义一个临时向量 std::vector<T> column_result(lhs.row_size()); ,计算结果矩阵的一列并存储到column_result中,然后 std::copy(column_result.begin(), column_result.end(), result.column_begin(column)); 。继续以此方法计算并填充结果矩阵的所有列,完成矩阵相乘的计算逻辑。代码如下:

template <typename T>
Matrix<T> operator*(const Matrix<T>& lhs, Matrix<T>&& rhs) {
matrix::CheckDimensionMultipliable(lhs, rhs);
typedef typename Matrix<T>::size_type size_type;
size_type rhs_row_size(rhs.row_size()), rhs_column_size(rhs.column_size());
Matrix<T> result(std::move(rhs));
result.row_resize(std::max(rhs_row_size, lhs.row_size()));
std::vector<T> column_result(lhs.row_size());
for (size_type column = 0; column < rhs_column_size; ++column) {
// Calculates single column in result matrix.
for (size_type row = 0; row < lhs.row_size(); ++row) {
column_result[row] = std::inner_product(
lhs.row_begin(row), lhs.row_end(row),
result.column_begin(column), T());
}

// Puts calculated column into result matrix.
std::copy(column_result.begin(), column_result.end(),
result.column_begin(column));
}
result.row_resize(lhs.row_size());
return result;
}


综上两种讨论,矩阵相乘的空间复杂度由经典方式的O(lhs.row_size() * rhs.column_size())降低到O(rhs.column_size())或O(lhs.row_size()),加上省去了临时结果矩阵的构造,实现了对矩阵相乘的空间效率的优化。

用C++11的并发支持,优化矩阵运算的时间效率

C++11引入了对并发编程的支持,在此使用其中的std::async(),配合lambda函数,对矩阵运算中的一些步骤进行并发处理,从而利用多核CPU,优化矩阵的时间效率。

并发处理工具函数

矩阵运算中很多地方都可以用到并发处理,因此我将并发处理抽象出来成为一个函数:
void ConcurrentProcess(std::size_t count,
std::function<void(std::size_t)> process) {
std::vector<std::future<void>> futures(count);
for (std::size_t index = 0; index < count; ++index) {
futures[index] = std::async(std::launch::async, process, index);
}
for (std::size_t index = 0; index < count; ++index) {
futures[index].wait();
}
}
在调用处,代码需要定义一个lambda函数,接收一个下标(index)作为参数,没有返回值,并且注意不同下标处理的数据不要有读写冲突。这样就实现了并发计算。

需要注意的一点是,并发是以后台线程为代价的,因此lambda函数的运行收益,需要能抵消线程的维护开销。lambda函数里应该放置比较耗时的操作,不要放置本来就很简短地操作,例如 result[row][column] = rhs[row][column] - result[row][column]; 。

矩阵运算的并发实现

下面以矩阵加法和矩阵相乘为例说明。
对于矩阵加法,每个元素的计算都不会互相干扰,可以将各行的求值并发计算(也可选择按列计算,在此略),代码如下:
template <typename T>
Matrix<T> operator+(Matrix<T>&& lhs, const Matrix<T>& rhs) {
matrix::CheckDimensionMatches(lhs, rhs);
typedef typename Matrix<T>::size_type size_type;
Matrix<T> result(std::move(lhs));
matrix::ConcurrentProcess(
lhs.row_size(), [&result, &rhs](size_type row) mutable {
for (size_type column = 0; column < result.column_size(); ++column) {
result.data_[row][column] += rhs.data_[row][column];
}
});
return result;
}
对于矩阵相乘,以前述第一种实现为例。因为临时向量row_result是共享的,所以各行不能并发处理。但是一行内各列可以并发处理。处理每列时都需要算一次内积,其时间复杂度为O(lhs.column_size()),是一种耗时操作,在此(不加证明地)认定其并发收益能够抵消线程维护代价。代码如下:
template <typename T>
Matrix<T> operator*(Matrix<T>&& lhs, const Matrix& rhs) {
matrix::CheckDimensionMultipliable(lhs, rhs);
typedef typename Matrix<T>::size_type size_type;
Matrix<T> result(std::move(lhs));
std::vector<T> row_result(rhs.column_size());
for (size_type row = 0; row < row_size(); ++row) {
row_result.resize(rhs.column_size());

// Concurrently calculates single row in result matrix.
matrix::ConcurrentProcess(
rhs.column_size(),
[
935b
&result, &rhs, row, &row_result](size_type column) mutable {
row_result[column] = std::inner_product(
lhs.row_begin(row), lhs.row_end(row),
rhs.column_begin(column), T());
});

// Inplace puts calculated row into result matrix.
result.data_[row].swap(row_result);
}
result.column_size_ = rhs.column_size();
return result;
}

编译选项设置

在我的Ubuntu上编译运行以上的并发代码时,会出现运行时错误:
terminate called after throwing an instance of 'std::system_error'
what():  Unknown error -1
我的G++版本为:
$ g++ -v
Using built-in specs.
COLLECT_GCC=g++
COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.6/lto-wrapper
Target: x86_64-linux-gnu
Configured with: ../src/configure -v --with-pkgversion='Ubuntu/Linaro 4.6.3-1ubuntu5' --with-bugurl=file:///usr/share/doc/gcc-4.6/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.6 --enable-shared --enable-linker-build-id --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.6 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --enable-plugin --enable-objc-gc --disable-werror --with-arch-32=i686 --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu
Thread model: posix
gcc version 4.6.3 (Ubuntu/Linaro 4.6.3-1ubuntu5)
Google了一下,找到一个解决方法:加上命令行参数-pthread就好了。不知道为什么会踩这个坑,也请大方之家帮忙解释下,先谢了。总之先跳过去这个坑再说。最终的编译命令行:
g++ -std=c++0x -pthread test.cc

总结

C++11标准已经出来很久了(C++14都快出来了),各家编译器对C++11的支持都基本到了可用的程度,是时候利用这些特性,来实现简洁、高效、优雅的代码了。本文通过对矩阵运算的实现,探讨了C++11中移动语义和并发处理的使用方法。希望能跟大牛们多多交流,看看C++11还有什么好玩的东西,可以拉出来遛一圈玩玩。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐