您的位置:首页 > 其它

基于Spark的巨型矩阵分布式LU计算求逆【第一篇】

2018-01-27 19:06 429 查看

概述

本文将介绍如何利用Spark解决巨型矩阵分布式的LU法求逆的问题。本篇则将对LU求逆的前半部分——分布式LU分解做介绍。

【后半部分再我的队友整理完后,会以链接的形式补在这里】

正文

首先,我们看一下本地LU分解时有哪些特点,如下图



常规LU分解式为 A = L*U,而我们将A、L、U都按照相同的规格划分,不难看出这个矩阵等式可以得到

① A11 = L11*U11

② A21 = L21*U11

③ A12 = L11*U12

④ A22 = L21*U12 + L22*U22

发现式子①也是一个LU分解,而②、③可以结合式子①得出L21和U12。

那么现在就剩下一个恶心的式子④ 了,整理一下 ⑤ A22 - L21*U12 = L22*U22,其实也是个LU分解。

当一个矩阵很大时,经常采用分块的方式进行计算(比如大型矩阵乘法),所以LU也可以像上图这样分块。

而LU分块后的求解又包含了LU计算,那么迭代就可以解决了。

那么迭代哪里呢?
当然是最恶心的部分啦!



按照本地可算的矩阵大小,从左上角开始截取,那么式子①对A11的LU则本地可解[1]

式子② -> L21 = A21*(U11^-1)

       ③ -> U12 = (L11^-1)*A12

对L11的求逆本地可解,对U11的逆我们采用 :上三角矩阵的转置的逆=他逆的转置

U11^-1 = ((U1^t)^-1)^t

所以他们同为下三角矩阵求逆[2]

之后的相乘,虽然A21、A12也很大但我们发现,因为你已经将整个矩阵分块,这两个乘法相当于

[1]

[2] * [5]    , [5] * [1,2,3]   就是把A11这个单位块和A21、A12两个长条矩阵的每个单位块相乘。

[3]

这个Map一下就行了。

至于最后的A22 ,拿去迭代啦,不过要注意,你迭代的是A22 - L21*U12,

因为你下一次迭代是为了将剩下的大矩阵再分布式LU填充上一步,所以第一他满足LU,第二他的LU是现在缺的LU块。

激动人心的代码部分
本地矩阵的 LU 及 下三角求逆

def toLU(matrix:DenseMatrix): Tuple2[DenseMatrix,DenseMatrix] ={
//初始化必要元素
val N = matrix.numCols
val A = new linalg.DenseMatrix[Double](N,N,matrix.toArray)
if (N==1){
return (new DenseMatrix(N,N,A.data),new DenseMatrix(N,N,A.data))
}
val LT = new linalg.DenseMatrix[Double](N,N)
val UT = new linalg.DenseMatrix[Double](N,N)

//初始化 LT对角线
for(i <- 0.to(N-1,1))
LT(i,i) = 1
//求 LT UT
for (i <- 0.to(N-1,1)){
for(j <- i.to(N-1,1)){
var temp = 0.0
for(k <- 0.to(i-1,1))
temp += LT(i,k) * UT(k,j)
UT(i,j) = A(i,j) - temp //UT
}
for(j <- (i+1).to(N-1,1)){
var temp2 = 0.0
for(k <- 0.to(i-1,1))
temp2 += LT(j,k) * UT(k,i)
LT(j,i) = (A(j,i)-temp2) / UT(i,i)//LT
}
}
(new DenseMatrix(N,N,LT.data),new DenseMatrix(N,N,UT.data))
}

//本地下三角矩阵求逆
def toINV(matrix:DenseMatrix): DenseMatrix = {
//初始化必要元素
val N = matrix.numCols
val L = new linalg.DenseMatrix[Double](N,N,matrix.toArray);
val Linv = new linalg.DenseMatrix[Double](N,N)

for(i <- 0.to(N-1,1)){
for (j <- 0.to(N-1,1)){
if(i<j) Linv(i,j)=0
else if(i==j) Linv(i,j) = 1.0 / L(i,i)
else {
var temp = 0.0
for(k <- j.to(i-1,1))
temp += L(i,k)*Linv(k,j)
Linv(i, j) = (-1 / L(i, i)) * temp
}
}
}
new DenseMatrix(N,N,Linv.data)
}

迭代部分

@annotation.tailrec
def loop(Blocks_tmp:RDD[((Int,Int),DenseMatrix)],block_x:Int,block_y:Int,
L:RDD[((Int,Int),DenseMatrix)],
U:RDD[((Int,Int),DenseMatrix)]):
Tuple2[RDD[((Int,Int),DenseMatrix)],RDD[((Int,Int),DenseMatrix)]]={
//获得矩阵 B11, B12, B21, B22
val _B11 = Blocks_tmp.filter(blockId=>{
(blockId._1._1==block_x)&&(blockId._1._2==block_y)})
.map(_._2).collect()(0)
val _B21  = Blocks_tmp.filter(blockId=>{(blockId._1._1>block_x)&&(blockId._1._2==block_y)})
val _B12  = Blocks_tmp.filter(blockId=>{(blockId._1._1==block_x)&&(blockId._1._2>block_y)})
val _B22  = Blocks_tmp.filter(blockId=>{(blockId._1._1>block_x)&&(blockId._1._2>block_y)})

//求 B11的 LU -> L11, U11
val B11_LU = LocalMatrixApi.toLU(_B11)
val L1 = L.union(LocalMatrixApi.getRDD(B11_LU._1,block_x,block_y,spark))
val U1 = U.union(LocalMatrixApi.getRDD(B11_LU._2,block_x,block_y,spark))

//求 U12 = L11^-1 * B12
val L11_inv = LocalMatrixApi.toINV(B11_LU._1)
val U12 = _B12.map(m=>(m._1,L11_inv.multiply(m._2)))
val U2 = U1.union(U12)

//求 L21 = B21 * U11^-1
val U11_inv = LocalMatrixApi.toINV(B11_LU._2.transpose).transpose
val L21 = _B21.map(m=>(m._1,m._2.multiply(U11_inv)))
val L2 = L1.union(L21)

// 求下次迭代的 L22U22 --> B22-L21U12
val L21_bm = new BlockMatrix(L21.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps)
val U12_bm = new BlockMatrix(U12.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps)
val B22_bm = new BlockMatrix(_B22.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps)
val next_rdd = B22_bm.subtract(L21_bm.multiply(U12_bm))
.blocks
.filter(block=>{(block._1._1>block_x)&&(block._1._2>block_y)})
.map(block=>(block._1,new DenseMatrix(block._2.numRows,block._2.numCols,block._2.toArray)))

//跳出判断
if(step_now == total-1){
val B22 = next_rdd.map(_._2).collect()(0)
val B22_LU = LocalMatrixApi.toLU(B22)
val L3 = L2.union(LocalMatrixApi.getRDD(B22_LU._1,block_x+1,block_y+1,spark))
val U3 = U2.union(LocalMatrixApi.getRDD(B22_LU._2,block_x+1,block_y+1,spark))
(L3,U3)
}
else {step_now += 1;loop(next_rdd,block_x+1,bloc
a55d
k_y+1,L2,U2)}
}
//启动迭代
val LU_final = loop(blocks,0,0,L_1,U_1)
val L = new BlockMatrix(LU_final._1.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps)
val U = new BlockMatrix(LU_final._2.map(block=>(block._1,Matrices.dense(block._2.numRows,block._2.numCols,block._2.toArray))),steps,steps)

总结

到这里,剩下的问题就是优化了,针对LU当前的可优化点如下:

1. 迭代中依然使用了大型分块矩阵BlockMatrix的运算——A22 - L21*U12。

    我们发现,如果将迭代过程倒过来,从右下角向左上角扩散则能去掉所有的BlockMatrix的运算,

    只有RDD的运算。【这个马上就能改出来,测试完就发】

2.将频繁使用的结果Persist到内存&硬盘,节省运算

3.减少Shuffle!!!【2、3我还在研究,研究完再上传吧】

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐