基于Spark的巨型矩阵分布式LU计算求逆【第一篇】
2018-01-27 19:06
429 查看
概述
本文将介绍如何利用Spark解决巨型矩阵分布式的LU法求逆的问题。本篇则将对LU求逆的前半部分——分布式LU分解做介绍。【后半部分再我的队友整理完后,会以链接的形式补在这里】
正文
首先,我们看一下本地LU分解时有哪些特点,如下图常规LU分解式为 A = L*U,而我们将A、L、U都按照相同的规格划分,不难看出这个矩阵等式可以得到
① A11 = L11*U11
② A21 = L21*U11
③ A12 = L11*U12
④ A22 = L21*U12 + L22*U22
发现式子①也是一个LU分解,而②、③可以结合式子①得出L21和U12。
那么现在就剩下一个恶心的式子④ 了,整理一下 ⑤ A22 - L21*U12 = L22*U22,其实也是个LU分解。
当一个矩阵很大时,经常采用分块的方式进行计算(比如大型矩阵乘法),所以LU也可以像上图这样分块。
而LU分块后的求解又包含了LU计算,那么迭代就可以解决了。
那么迭代哪里呢?
当然是最恶心的部分啦!
按照本地可算的矩阵大小,从左上角开始截取,那么式子①对A11的LU则本地可解[1],
式子② -> L21 = A21*(U11^-1)
③ -> U12 = (L11^-1)*A12
对L11的求逆本地可解,对U11的逆我们采用 :上三角矩阵的转置的逆=他逆的转置
U11^-1 = ((U1^t)^-1)^t
所以他们同为下三角矩阵求逆[2] 。
之后的相乘,虽然A21、A12也很大但我们发现,因为你已经将整个矩阵分块,这两个乘法相当于
[1]
[2] * [5] , [5] * [1,2,3] 就是把A11这个单位块和A21、A12两个长条矩阵的每个单位块相乘。
[3]
这个Map一下就行了。
至于最后的A22 ,拿去迭代啦,不过要注意,你迭代的是A22 - L21*U12,
因为你下一次迭代是为了将剩下的大矩阵再分布式LU填充上一步,所以第一他满足LU,第二他的LU是现在缺的LU块。
激动人心的代码部分
本地矩阵的 LU 及 下三角求逆
def toLU(matrix:DenseMatrix): Tuple2[DenseMatrix,DenseMatrix] ={ //初始化必要元素 val N = matrix.numCols val A = new linalg.DenseMatrix[Double](N,N,matrix.toArray) if (N==1){ return (new DenseMatrix(N,N,A.data),new DenseMatrix(N,N,A.data)) } val LT = new linalg.DenseMatrix[Double](N,N) val UT = new linalg.DenseMatrix[Double](N,N) //初始化 LT对角线 for(i <- 0.to(N-1,1)) LT(i,i) = 1 //求 LT UT for (i <- 0.to(N-1,1)){ for(j <- i.to(N-1,1)){ var temp = 0.0 for(k <- 0.to(i-1,1)) temp += LT(i,k) * UT(k,j) UT(i,j) = A(i,j) - temp //UT } for(j <- (i+1).to(N-1,1)){ var temp2 = 0.0 for(k <- 0.to(i-1,1)) temp2 += LT(j,k) * UT(k,i) LT(j,i) = (A(j,i)-temp2) / UT(i,i)//LT } } (new DenseMatrix(N,N,LT.data),new DenseMatrix(N,N,UT.data)) }
//本地下三角矩阵求逆 def toINV(matrix:DenseMatrix): DenseMatrix = { //初始化必要元素 val N = matrix.numCols val L = new linalg.DenseMatrix[Double](N,N,matrix.toArray); val Linv = new linalg.DenseMatrix[Double](N,N) for(i <- 0.to(N-1,1)){ for (j <- 0.to(N-1,1)){ if(i<j) Linv(i,j)=0 else if(i==j) Linv(i,j) = 1.0 / L(i,i) else { var temp = 0.0 for(k <- j.to(i-1,1)) temp += L(i,k)*Linv(k,j) Linv(i, j) = (-1 / L(i, i)) * temp } } } new DenseMatrix(N,N,Linv.data) }
迭代部分
@annotation.tailrec def loop(Blocks_tmp:RDD[((Int,Int),DenseMatrix)],block_x:Int,block_y:Int, L:RDD[((Int,Int),DenseMatrix)], U:RDD[((Int,Int),DenseMatrix)]): Tuple2[RDD[((Int,Int),DenseMatrix)],RDD[((Int,Int),DenseMatrix)]]={ //获得矩阵 B11, B12, B21, B22 val _B11 = Blocks_tmp.filter(blockId=>{ (blockId._1._1==block_x)&&(blockId._1._2==block_y)}) .map(_._2).collect()(0) val _B21 = Blocks_tmp.filter(blockId=>{(blockId._1._1>block_x)&&(blockId._1._2==block_y)}) val _B12 = Blocks_tmp.filter(blockId=>{(blockId._1._1==block_x)&&(blockId._1._2>block_y)}) val _B22 = Blocks_tmp.filter(blockId=>{(blockId._1._1>block_x)&&(blockId._1._2>block_y)}) //求 B11的 LU -> L11, U11 val B11_LU = LocalMatrixApi.toLU(_B11) val L1 = L.union(LocalMatrixApi.getRDD(B11_LU._1,block_x,block_y,spark)) val U1 = U.union(LocalMatrixApi.getRDD(B11_LU._2,block_x,block_y,spark)) //求 U12 = L11^-1 * B12 val L11_inv = LocalMatrixApi.toINV(B11_LU._1) val U12 = _B12.map(m=>(m._1,L11_inv.multiply(m._2))) val U2 = U1.union(U12) //求 L21 = B21 * U11^-1 val U11_inv = LocalMatrixApi.toINV(B11_LU._2.transpose).transpose val L21 = _B21.map(m=>(m._1,m._2.multiply(U11_inv))) val L2 = L1.union(L21) // 求下次迭代的 L22U22 --> B22-L21U12 val L21_bm = new BlockMatrix(L21.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps) val U12_bm = new BlockMatrix(U12.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps) val B22_bm = new BlockMatrix(_B22.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps) val next_rdd = B22_bm.subtract(L21_bm.multiply(U12_bm)) .blocks .filter(block=>{(block._1._1>block_x)&&(block._1._2>block_y)}) .map(block=>(block._1,new DenseMatrix(block._2.numRows,block._2.numCols,block._2.toArray))) //跳出判断 if(step_now == total-1){ val B22 = next_rdd.map(_._2).collect()(0) val B22_LU = LocalMatrixApi.toLU(B22) val L3 = L2.union(LocalMatrixApi.getRDD(B22_LU._1,block_x+1,block_y+1,spark)) val U3 = U2.union(LocalMatrixApi.getRDD(B22_LU._2,block_x+1,block_y+1,spark)) (L3,U3) } else {step_now += 1;loop(next_rdd,block_x+1,bloc a55d k_y+1,L2,U2)} } //启动迭代 val LU_final = loop(blocks,0,0,L_1,U_1) val L = new BlockMatrix(LU_final._1.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps) val U = new BlockMatrix(LU_final._2.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps)
总结
到这里,剩下的问题就是优化了,针对LU当前的可优化点如下:1. 迭代中依然使用了大型分块矩阵BlockMatrix的运算——A22 - L21*U12。
我们发现,如果将迭代过程倒过来,从右下角向左上角扩散则能去掉所有的BlockMatrix的运算,
只有RDD的运算。【这个马上就能改出来,测试完就发】
2.将频繁使用的结果Persist到内存&硬盘,节省运算
3.减少Shuffle!!!【2、3我还在研究,研究完再上传吧】
相关文章推荐
- 【转】Spark是基于内存的分布式计算引擎
- GraphX:基于Spark的弹性分布式图计算系统
- Spark RDD编程(Python和Scala版本)----Spark中的RDD就是一个不可变的分布式对象集合,是一种具有兼容性的基于内存的集群计算抽象方法,Spark则是这个方法的抽象。 Spa
- 基于Spark框架的大型分布式矩阵求逆运算实现(二)——大型下三角矩阵求逆运算
- 基于Mesos和Docker的分布式计算平台
- 基于案例贯通 Spark Streaming 流计算框架的运行源码
- 安装基于hadoop集群的高可用完全分布式的spark高可用集群
- 如何基于 Spark Streaming 构建实时计算平台
- Hive数据分析——Spark是一种基于rdd(弹性数据集)的内存分布式并行处理框架,比于Hadoop将大量的中间结果写入HDFS,Spark避免了中间结果的持久化
- <一>基于Fourinone实现分布式计算上手指南和demo
- 如何在spark中读写cassandra数据 ---- 分布式计算框架spark学习之六
- 如何基于 Spark Streaming 构建实时计算平台
- 基于分布式计算平台的流数据挖掘框架设计
- Python基于泊松分布产生随机数及可视化显示,并将其形成矩阵对其相关计算
- Spark:一个高效的分布式计算系统
- Spark:一个高效的分布式计算系统
- 雅虎开源CaffeOnSpark:基于Hadoop/Spark的分布式深度学习
- Spark:一个高效的分布式计算系统
- Spark:一个高效的分布式计算系统
- 弹性分布式数据集RDDs:基于内存的集群计算的容错性抽象