用C++11优化矩阵运算的空间和时间效率
2014-08-28 00:21
731 查看
最近在segmentfault.com上看到一个问题,提问者想要利用C++11的移动语义,减少矩阵相加时临时对象的构造。受此启发,我实现了一个简单的矩阵类,利用C++11标准中的一些特性,对矩阵运算进行了时间和空间效率的优化。(完整代码:matrix.h、matrix_test.cc、run.sh)
常用的加法函数的声明格式为 T operator+(const T& lhs, const T& rhs); ,例如std::plus。这样符合我们对加法的常识,两个加数在相加前后都不变(不被修改),返回一个临时对象作为结果。这样做的代价是,每次加法都要构造一个临时对象。(这里有个插曲,返回值优化会在 T
v = a1 + a2; 中省去一次临时对象构造,但在 T v = a1 + a2 + ... + an; 中仍然要进行n-1次临时对象的构造,临时对象构造不可避免)
而在矩阵运算中,构造临时矩阵代价就更大。因此,除了Matrix<T> operator+(const Matrix<T>& lhs, const Matrix<T>& rhs); 这种常用形式,可以引入另外两种声明形式:
在连续相加中产生的临时矩阵,匹配到右值引用参数,然后其空间被“偷”走,作为函数返回值传出。再加上返回值优化,就可以在整个加法表达式中不构造任何临时矩阵,全部实现原地运算,优化矩阵加法的空间效率。
实现代码如下:
与矩阵加法中的情况类似,矩阵乘法的常用声明形式为 Matrix<T> operator*(const Matrix& lhs, const Matrix& rhs); ,从而不可避免地要构造一个临时矩阵。为了消灭该临时矩阵构造的必要性,还是使用右值引用的声明形式:
但与矩阵加法、加法和数乘不同,矩阵相乘的结果矩阵与乘数矩阵维数未必相同,需要进行维数的修改。下面对以上两种声明形式分别讨论。
对于 Matrix<T> operator*(Matrix<T>&& lhs, const Matrix<T>& rhs); 而言,需要将lhs的存储“偷”走作为结果矩阵( Matrix<T> result(std::move(lhs); )。结果矩阵行数就是lhs的行数,故 result.data_.size()可以保持不变。
定义一个临时向量 vector<T> row_result(rhs.column_size()); ,计算结果矩阵的一行并存储到row_result中。考虑到result.data_[row]是std::vector<T>类型,可以利用 result.data_[row].swap(row_result); 进行原地置换,省去了复制的时间。继续以此方法计算并填充结果矩阵的所有行,完成矩阵相乘的计算逻辑。代码如下:
对于 Matrix<T> operator*(const Matrix<T>& lhs, Matrix<T>&& rhs); 而言,需要将rhs的存储“偷”走作为结果矩阵(Matrix<T> result(std::move(rhs)); )。结果矩阵行数为lhs.row_size()(未必等于rhs.row_size()),因此需要改变result.data_.size()。
定义一个临时向量 std::vector<T> column_result(lhs.row_size()); ,计算结果矩阵的一列并存储到column_result中,然后 std::copy(column_result.begin(), column_result.end(), result.column_begin(column)); 。继续以此方法计算并填充结果矩阵的所有列,完成矩阵相乘的计算逻辑。代码如下:
综上两种讨论,矩阵相乘的空间复杂度由经典方式的O(lhs.row_size() * rhs.column_size())降低到O(rhs.column_size())或O(lhs.row_size()),加上省去了临时结果矩阵的构造,实现了对矩阵相乘的空间效率的优化。
需要注意的一点是,并发是以后台线程为代价的,因此lambda函数的运行收益,需要能抵消线程的维护开销。lambda函数里应该放置比较耗时的操作,不要放置本来就很简短地操作,例如 result[row][column] = rhs[row][column] - result[row][column]; 。
对于矩阵加法,每个元素的计算都不会互相干扰,可以将各行的求值并发计算(也可选择按列计算,在此略),代码如下:
用C++11的移动语义,优化矩阵运算的空间效率
C++11的右值引用(T&&),可以将一个变量标记为“临时的”、“在此之后不再需要”,让该变量的使用者可以放心地将其内容“偷”走,而不用担心会有不良后果。利用该特性,可以减少矩阵运算中不必要的临时对象构造和拷贝,实现空间效率的优化。矩阵加法、减法和数乘
先讨论矩阵加法。常用的加法函数的声明格式为 T operator+(const T& lhs, const T& rhs); ,例如std::plus。这样符合我们对加法的常识,两个加数在相加前后都不变(不被修改),返回一个临时对象作为结果。这样做的代价是,每次加法都要构造一个临时对象。(这里有个插曲,返回值优化会在 T
v = a1 + a2; 中省去一次临时对象构造,但在 T v = a1 + a2 + ... + an; 中仍然要进行n-1次临时对象的构造,临时对象构造不可避免)
而在矩阵运算中,构造临时矩阵代价就更大。因此,除了Matrix<T> operator+(const Matrix<T>& lhs, const Matrix<T>& rhs); 这种常用形式,可以引入另外两种声明形式:
Matrix<T> operator+(const Matrix& lhs, Matrix&& rhs); Matrix<T> operator+(Matrix&& lhs, const Matrix& rhs);
在连续相加中产生的临时矩阵,匹配到右值引用参数,然后其空间被“偷”走,作为函数返回值传出。再加上返回值优化,就可以在整个加法表达式中不构造任何临时矩阵,全部实现原地运算,优化矩阵加法的空间效率。
实现代码如下:
template <typename T> Matrix<T> operator+(const Matrix<T>& lhs, Matrix<T>&& rhs) { matrix::CheckDimensionMatches(lhs, rhs); Matrix<T> result(std::move(rhs)); result += lhs; return result; } template <typename T> Matrix<T> operator+(Matrix<T>&& lhs, const Matrix<T>& rhs) { return rhs + std::move(lhs); }矩阵减法和矩阵数乘原理相同,仅列出函数声明:
template <typename T> Matrix<T> operator-(const Matrix<T>& lhs, Matrix<T>&& rhs); template <typename T> Matrix<T> operator-(Matrix<T>&& lhs, const Matrix<T>& rhs); template <typename T> Matrix<T> operator*(Matrix<T>&& mat, const T& scaler); template <typename T> Matrix<T> operator*(const T& scaler, Matrix<T>&& mat);测试代码:
void TestArithmetrics_Temporaries() { cout << "prepare a, b, c, scaler, expected" << endl; Matrix<int> a = { {1, 2}, {3, 4}, }, b = { {2, 3}, {4, 5}, }, c = { {1, 1}, {1, 1}, }, expected = { {2, 2}, {2, 2}, }; int scaler = 3; cout << "Matrix<int> d = std::move(c) * scaler + a - b;" << endl; Matrix<int> d = std::move(c) * scaler + a - b; assert(d.equal_to(expected)); cout << "done." << endl; }输出结果:
prepare a, b, c, scaler, expected Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>) Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>) Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>) Matrix::Matrix(std::initializer_list<std::initializer_list<value_type>>) Matrix<int> d = std::move(c) * scaler + a - b; Matrix::Matrix(Matrix&&) // std::move(c) * scaler Matrix::Matrix(Matrix&&) // tmp + a Matrix::Matrix(Matrix&&) // tmp + b done. // because of RVO, d's ctor is not called
矩阵和矩阵相乘
这里涉及到我的矩阵实现细节,代码如下:template <typename T> class Matrix { private: std::vector<std::vector<T>> data_; std::size_t column_size_; };其中,data_采用行优先存储,也即data_[row]存储矩阵的第row行。
与矩阵加法中的情况类似,矩阵乘法的常用声明形式为 Matrix<T> operator*(const Matrix& lhs, const Matrix& rhs); ,从而不可避免地要构造一个临时矩阵。为了消灭该临时矩阵构造的必要性,还是使用右值引用的声明形式:
template <typename T> Matrix<T> operator*(Matrix<T>&& lhs, const Matrix<T>& rhs); template <typename T>Matrix<T> operator*(const Matrix<T>& lhs, Matrix<T>&& rhs);
但与矩阵加法、加法和数乘不同,矩阵相乘的结果矩阵与乘数矩阵维数未必相同,需要进行维数的修改。下面对以上两种声明形式分别讨论。
对于 Matrix<T> operator*(Matrix<T>&& lhs, const Matrix<T>& rhs); 而言,需要将lhs的存储“偷”走作为结果矩阵( Matrix<T> result(std::move(lhs); )。结果矩阵行数就是lhs的行数,故 result.data_.size()可以保持不变。
定义一个临时向量 vector<T> row_result(rhs.column_size()); ,计算结果矩阵的一行并存储到row_result中。考虑到result.data_[row]是std::vector<T>类型,可以利用 result.data_[row].swap(row_result); 进行原地置换,省去了复制的时间。继续以此方法计算并填充结果矩阵的所有行,完成矩阵相乘的计算逻辑。代码如下:
template <typename T> Matrix<T> operator*(Matrix<T>&& lhs, const Matrix& rhs) { matrix::CheckDimensionMultipliable(lhs, rhs); typedef typename Matrix<T>::size_type size_type; Matrix<T> result(std::move(lhs)); std::vector<T> row_result(rhs.column_size()); for (size_type row = 0; row < row_size(); ++row) { row_result.resize(rhs.column_size()); // Calculates single row in result matrix. for (size_type column = 0; column < rhs.column_size(); ++column) { row_result[column] = std::inner_product( result.row_begin(row), result.row_end(row), rhs.column_begin(column), T()); } // Inplace puts calculated row into result matrix. result.data_[row].swap(row_result); } result.column_size_ = rhs.column_size(); return result; }
对于 Matrix<T> operator*(const Matrix<T>& lhs, Matrix<T>&& rhs); 而言,需要将rhs的存储“偷”走作为结果矩阵(Matrix<T> result(std::move(rhs)); )。结果矩阵行数为lhs.row_size()(未必等于rhs.row_size()),因此需要改变result.data_.size()。
定义一个临时向量 std::vector<T> column_result(lhs.row_size()); ,计算结果矩阵的一列并存储到column_result中,然后 std::copy(column_result.begin(), column_result.end(), result.column_begin(column)); 。继续以此方法计算并填充结果矩阵的所有列,完成矩阵相乘的计算逻辑。代码如下:
template <typename T> Matrix<T> operator*(const Matrix<T>& lhs, Matrix<T>&& rhs) { matrix::CheckDimensionMultipliable(lhs, rhs); typedef typename Matrix<T>::size_type size_type; size_type rhs_row_size(rhs.row_size()), rhs_column_size(rhs.column_size()); Matrix<T> result(std::move(rhs)); result.row_resize(std::max(rhs_row_size, lhs.row_size())); std::vector<T> column_result(lhs.row_size()); for (size_type column = 0; column < rhs_column_size; ++column) { // Calculates single column in result matrix. for (size_type row = 0; row < lhs.row_size(); ++row) { column_result[row] = std::inner_product( lhs.row_begin(row), lhs.row_end(row), result.column_begin(column), T()); } // Puts calculated column into result matrix. std::copy(column_result.begin(), column_result.end(), result.column_begin(column)); } result.row_resize(lhs.row_size()); return result; }
综上两种讨论,矩阵相乘的空间复杂度由经典方式的O(lhs.row_size() * rhs.column_size())降低到O(rhs.column_size())或O(lhs.row_size()),加上省去了临时结果矩阵的构造,实现了对矩阵相乘的空间效率的优化。
用C++11的并发支持,优化矩阵运算的时间效率
C++11引入了对并发编程的支持,在此使用其中的std::async(),配合lambda函数,对矩阵运算中的一些步骤进行并发处理,从而利用多核CPU,优化矩阵的时间效率。并发处理工具函数
矩阵运算中很多地方都可以用到并发处理,因此我将并发处理抽象出来成为一个函数:void ConcurrentProcess(std::size_t count, std::function<void(std::size_t)> process) { std::vector<std::future<void>> futures(count); for (std::size_t index = 0; index < count; ++index) { futures[index] = std::async(std::launch::async, process, index); } for (std::size_t index = 0; index < count; ++index) { futures[index].wait(); } }在调用处,代码需要定义一个lambda函数,接收一个下标(index)作为参数,没有返回值,并且注意不同下标处理的数据不要有读写冲突。这样就实现了并发计算。
需要注意的一点是,并发是以后台线程为代价的,因此lambda函数的运行收益,需要能抵消线程的维护开销。lambda函数里应该放置比较耗时的操作,不要放置本来就很简短地操作,例如 result[row][column] = rhs[row][column] - result[row][column]; 。
矩阵运算的并发实现
下面以矩阵加法和矩阵相乘为例说明。对于矩阵加法,每个元素的计算都不会互相干扰,可以将各行的求值并发计算(也可选择按列计算,在此略),代码如下:
template <typename T> Matrix<T> operator+(Matrix<T>&& lhs, const Matrix<T>& rhs) { matrix::CheckDimensionMatches(lhs, rhs); typedef typename Matrix<T>::size_type size_type; Matrix<T> result(std::move(lhs)); matrix::ConcurrentProcess( lhs.row_size(), [&result, &rhs](size_type row) mutable { for (size_type column = 0; column < result.column_size(); ++column) { result.data_[row][column] += rhs.data_[row][column]; } }); return result; }对于矩阵相乘,以前述第一种实现为例。因为临时向量row_result是共享的,所以各行不能并发处理。但是一行内各列可以并发处理。处理每列时都需要算一次内积,其时间复杂度为O(lhs.column_size()),是一种耗时操作,在此(不加证明地)认定其并发收益能够抵消线程维护代价。代码如下:
template <typename T> Matrix<T> operator*(Matrix<T>&& lhs, const Matrix& rhs) { matrix::CheckDimensionMultipliable(lhs, rhs); typedef typename Matrix<T>::size_type size_type; Matrix<T> result(std::move(lhs)); std::vector<T> row_result(rhs.column_size()); for (size_type row = 0; row < row_size(); ++row) { row_result.resize(rhs.column_size()); // Concurrently calculates single row in result matrix. matrix::ConcurrentProcess( rhs.column_size(), [ 935b &result, &rhs, row, &row_result](size_type column) mutable { row_result[column] = std::inner_product( lhs.row_begin(row), lhs.row_end(row), rhs.column_begin(column), T()); }); // Inplace puts calculated row into result matrix. result.data_[row].swap(row_result); } result.column_size_ = rhs.column_size(); return result; }
编译选项设置
在我的Ubuntu上编译运行以上的并发代码时,会出现运行时错误:terminate called after throwing an instance of 'std::system_error' what(): Unknown error -1我的G++版本为:
$ g++ -v Using built-in specs. COLLECT_GCC=g++ COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.6/lto-wrapper Target: x86_64-linux-gnu Configured with: ../src/configure -v --with-pkgversion='Ubuntu/Linaro 4.6.3-1ubuntu5' --with-bugurl=file:///usr/share/doc/gcc-4.6/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.6 --enable-shared --enable-linker-build-id --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.6 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --enable-plugin --enable-objc-gc --disable-werror --with-arch-32=i686 --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu Thread model: posix gcc version 4.6.3 (Ubuntu/Linaro 4.6.3-1ubuntu5)Google了一下,找到一个解决方法:加上命令行参数-pthread就好了。不知道为什么会踩这个坑,也请大方之家帮忙解释下,先谢了。总之先跳过去这个坑再说。最终的编译命令行:
g++ -std=c++0x -pthread test.cc
总结
C++11标准已经出来很久了(C++14都快出来了),各家编译器对C++11的支持都基本到了可用的程度,是时候利用这些特性,来实现简洁、高效、优雅的代码了。本文通过对矩阵运算的实现,探讨了C++11中移动语义和并发处理的使用方法。希望能跟大牛们多多交流,看看C++11还有什么好玩的东西,可以拉出来遛一圈玩玩。相关文章推荐
- 剑指offer-第5章优化时间和空间效率(丑数)
- 剑指offer-第五章优化时间和空间效率(把数组排列成最小的数)
- 剑指offer-第五章优化时间和空间效率(在字符串中第一次出现切只出现一次的字符)
- 《剑指offer》 第五章优化时间和空间效率 总结
- 剑指offer-第五章优化时间和空间效率(两个链表的第一个公共节点)
- 【剑指Offer-优化时间和空间效率】连续子数组最大和
- 剑指offer-5-面试34:丑数(优化时间和空间效率)
- 【重构】人人都来写算法 之 矩阵顺时针旋转90度,空间效率O(1),时间效率O(n*n)
- 剑指offer-第五章优化时间和空间效率(最小的k个数)
- 剑指offer-第五章优化时间和空间效率(从1到n的整数中1出现的次数)
- 剑指offer——优化时间和空间效率
- 剑指offer 第五章 优化时间和空间效率
- 【面试编程题】-9剑指offer之优化时间和空间效率
- 人人都来写算法 之 矩阵顺时针旋转90度,空间效率O(1),时间效率O(n*n)
- 剑指offer-第五章优化时间和空间效率(数组中的逆序对的总数)
- 剑指offer-第五章优化时间和空间效率(连续子数组的最大和)
- Ch5优化时间和空间效率——31:连续子数组的最大和
- 剑指offer-第五章优化时间和空间效率(数组中出现次数超过一半的数字)
- 数组最小剑指Offer读书笔记之第五章优化时间空间效率
- 优化时间效率的方法