Spark MLlib 之 数据类型与大规模数据集的相似度计算原理探索
转载请注明出处:http://blog.csdn.net/gamer_gyt
博主微博:http://weibo.com/234654758
Github:https://github.com/thinkgamer
公众号:搜索与推荐Wiki
个人网站:http://thinkgamer.github.io
本文出自「xingoo」在原文的基础上加以小编自己的理解形成的学习笔记,希望对读者有帮助。原文出自:Spark MLlib 之 大规模数据集的相似度计算原理探索
背景
最急小编在做的是计算两两用户的粉丝重合度,根据粉丝重合度去评估两个用户之间的相似度,根据条件进行过滤之后大概有3000个用户,但每个用户的粉丝量参差不齐,有上百万的,有几千的,这样在去构建笛卡尔积的时候,进行粉丝数据关联,得到的用户集就会特别大,spark运行的时候就会很慢,而且会出现很严重的数据倾斜。这个时候了解到了spark支持的数据类型,看到了CoordinateMatrix,然后深究其原理,便看到了这篇文章,经过整理形成了此文。
Spark支持的数据类型
官方文档地址:https://spark.apache.org/docs/latest/mllib-data-types.html
1.Local Vector(本地向量)
本地向量是从0开始的下标和double类型的数据组成,存储在本地机器上,所以称为Local Vector。它支持两种形式:
- Dense (密集的向量)
- Sparse (稀疏的向量)
比如一个向量[1.0,0.0,3.0],用Dense表示为:[1.0,0.0,3.0],用Sparse表示为:(3,[0,2],[1.0,3.0]),其中3为向量的长度,[0,2]表示元素[1.0,3.0]的位置,可见sparse形式下0.0是不存储的。
import org.apache.spark.mllib.linalg.Vectors val denseVector = Vectors.dense(1.0,0.0,3.0) val sparseVector1 = Vectors.sparse(3,Array(0,2),Array(1.0,3.0)) val sparseVector2 = Vectors.sparse(3,Seq((0,1.0),(2,3.0))) println(s"DenseVector is : $denseVector") println(s"DenseVector to Sparse is : ${denseVector.toSparse}") println(s"sparseVector1 is : $sparseVector1") println(s"sparseVector1 to Dense is : ${sparseVector1.toDense}") println(s"sparseVector2 is : $sparseVector2") println(s"sparseVector2 to Dense is : ${sparseVector2.toDense}")
输出为:
DenseVector is : [1.0,0.0,3.0] DenseVector to Sparse is : (3,[0,2],[1.0,3.0]) sparseVector1 is : (3,[0,2],[1.0,3.0]) sparseVector1 to Dense is : [1.0,0.0,3.0] sparseVector2 is : (3,[0,2],[1.0,3.0]) sparseVector2 to Dense is : [1.0,0.0,3.0]
2. Labeled point(带标签的点)
labeled point由本地向量组成,既可以是dense向量,也可以是sparse向量。在mllib中常用于监督类算法,使用double类型来保存该类型的数据,因为也可以用于回归和分类算法。例如二分类,label可以是0(负例)或1(正例),对于多分类,label可以是0,1,2…
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint val pos = LabeledPoint(1.0, Vectors.dense(1.0,0.0,3.0)) val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
sparse data
稀疏数据存储是非常普遍的现象,mllib支持读取libsvm格式的数据,其数据格式如下:
label index1:value1,index2:value2 ...
其读取方式包括:
import org.apache.spark.mllib.util.MLUtils // method 1 spark.read.format("libsvm") .load("libsvm data path") // method 2 MLUtils.loadLibSVMFile(spark.sparkContext, "libsvm data path")
3. Local Matrix(本地矩阵)
local matrix由行下标,列索引和double类型的值组成,存储在本地机器上,mllib支持密集矩阵和稀疏矩阵,其存储是按照列进行存储的。
例如下面的为密集矩阵:
通过数组存储的形式为: [1.0, 3.0, 5.0, 2.0, 4.0, 6.0],矩阵大小为[3,2]
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val denseMatrix = Matrices.dense(3,2, Array(1.0,3.0,5.0,2.0,4.0,6.0)) println(s"denseMatrix is : $denseMatrix") // Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) val sparseMatrix = Matrices.sparse(3,2, Array(0,1,3),Array(0,2,1),Array(9,6,8)) println(s"sparseMatrix is : $sparseMatrix")
注:稀疏矩阵解释,首先指定矩阵是3行2列,Array(0, 1, 3)是指,第0个非零元素在第一列,第一第二个非零元素在第二列。
Array(0, 2, 1)是指,第一个非零元素在第0行,第二个非零元素在第2行,第三个非零元素在第1行。
此处设计比较好,假设100个元素分两列,不需要把每个元素所在列都标出来,只需要记录3个数字即可。Array(9, 6, 8)表示按顺序存储非零元素.
Array(0,1,3)比较难理解,可以参考以下文章:
4. Distributed Matrix(分布式矩阵)
一个分布式矩阵由下标和double类型的数据组成,不过分布式的矩阵的下标不是int类型,而是long类型,数据保存在一个或多个rdd中,选择一个正确的格式去存储分布式矩阵是非常重要的。分布式矩阵转换成不同的格式需要一个全局的shuffle(global shuffle),而全局shuffle的代价会非常高。到目前为止,Spark MLlib中已经实现了三种分布式矩阵。
最基本的分布式矩阵是RowMatrix,它是一个行式的分布式矩阵,没有行索引。比如一系列特征向量的集合。RowMatrix由一个RDD代表所有的行,每一行是一个本地向量。假设一个RowMatrix的列数不是特别巨大,那么一个简单的本地向量能够与driver进行联系,并且数据可以在单个节点上保存或使用。IndexedRowMatrix与RowMatrix类似但是有行索引,行索引可以用来区分行并且进行连接等操作。CoordinateMatrix是一个以协同列表(coordinate list)格式存储数据的分布式矩阵,数据以RDD形式存储。
注意:因为我们需要缓存矩阵的大小,所以分布式矩阵的RDDs格式是需要确定的,使用非确定RDDs的话会报错。
Row Matrix
RowMatrix它是一个行式的分布式矩阵,没有行索引。比如一系列特征向量的集合。RowMatrix由一个RDD代表所有的行,每一行是一个本地向量。因为每一行代表一个本地向量,所以它的列数被限制在Integer.max的范围内,在实际应用中不会太大。
一个RowMatrix可以由一个RDD[Vector]的实例创建。因此我们可以计算统计信息或者进行分解。QR分解(QR decomposition)是A=QR,其中Q是一个矩阵,R是一个上三角矩阵。对sigular value decomposition(SVD和principal component analysis(PCA),可以去参考降维的部分。
// Row Matrix println("Row Matrix ...") val arr = Array(Vectors.dense(1,0),Vectors.dense(0,1)) val rows = spark.sparkContext.parallelize(arr) val mat: RowMatrix = new RowMatrix(rows) val m = mat.numRows() val n = mat.numCols() val qrResult = mat.tallSkinnyQR(true) println(s"m is: $m,n is $n,\nqrResult is :") qrResult.Q.rows.foreach(println) println() qrResult.R.rowIter.foreach(println)
输出为:
Row Matrix ... m is: 2,n is 2, qrResult is : [1.0,0.0] [0.0,1.0] [1.0,0.0] [0.0,1.0]
IndexedRowMatrix
IndexedRowMatrix与RowMatrix类似,但是它有行索引。由一个行索引RDD表示,索引每一行由一个long型行索引和一个本地向量组成。
一个IndexedRowMatrix可以由RDD[IndexedRow]的实例来生成,IndexedRow是一个(Long, Vector)的封装。去掉行索引,IndexedRowMatrix能够转换成RowMatrix。
// IndexedRowMatrix println("Indexed Row Matrix ...") val arr2 = Array( IndexedRow(0,Vectors.dense(1,0)), IndexedRow(1,Vectors.dense(0,1)) ) val rows2: RDD[IndexedRow] = spark.sparkContext.parallelize(arr2) val mat2 = new IndexedRowMatrix(rows2) val m2 = mat2.numRows() val n2 = mat2.numCols() // 去掉行索引,转换成RowMatrix val qrResult2 = mat2.toRowMatrix().tallSkinnyQR(true) println(s"m2 is: $m2,n2 is $n2,\nqrResult2 is :") qrResult2.Q.rows.foreach(println) println() qrResult2.R.rowIter.foreach(println)
输出为:
Indexed Row Matrix ... m2 is: 2,n2 is 2, qrResult2 is : [1.0,0.0] [0.0,1.0] [1.0,0.0] [0.0,1.0]
CoordinateMatrix
CoordinateMatrix是一个分布式矩阵,其实体集合是一个RDD,每一个是一个三元组(i:Long, j:Long, value:Double)。其中i是行索引,j是列索引,value是实体的值。当矩阵的维度很大并且是稀疏矩阵时,才使用CoordinateMatrix。
一个CoordinateMatrix可以通过一个RDD[MatrixEntry]的实例来创建,MatrixEntry是一个(Long, Long, Double)的封装。CoordinateMatrix可以通过调用toIndexedRowMatrix转换成一个IndexedRowMatrix。CoordinateMatrix的其他降维方法暂时还不支持(Spark-1.6.2)。
// CoordinateMatrix println("Coordinate Matrix ...") val arr3 = Array( MatrixEntry(0,0,1), MatrixEntry(1,1,1) ) val entries = spark.sparkContext.parallelize(arr3) val mat3 = new CoordinateMatrix(entries) val m3 = mat.numRows() val n3 = mat.numCols() val qrResult3 = mat3.toIndexedRowMatrix().toRowMatrix().tallSkinnyQR(true) println(s"m3 is: $m3,n3 is $n3,\nqrResult3 is :") qrResult3.Q.rows.foreach(println) println() qrResult3.R.rowIter.foreach(println)
输出为:
Coordinate Matrix ... m3 is: 2,n3 is 2, rowMat3 is : [1.0,0.0] [0.0,1.0] [1.0,0.0] [0.0,1.0]
BlockMatrix
一个BlockMatrix是一个分布式的矩阵,由一个MatrixBlocks的RDD组成。MatrixBlock是一个三元组((Int, Int), Matrix),其中(Int, Int)是block的索引,Matrix是一个在指定位置上的维度为rowsPerBlock * colsPerBlock的子矩阵。BlockMatrix支持与另一个BlockMatrix对象的add和multiply操作。BlockMatrix提供了一个帮助方法validate,这个方法可以用于检测该`BlockMatrix·是否正确。
可以通过IndexedRowMatrix或者CoordinateMatrix调用toBlockMatrix快速得到BlockMatrix对象。默认情况下toBlockMatrix方法会得到一个1024 x 1024的BlockMatrix。使用时可以通过手动传递维度值来设置维度,toBlockMatrix(rowsPerBlock, colsPerBlock)。
// BlockMatrix println("Block Matrix ...") val arr4 = Array( MatrixEntry(0,0,1), MatrixEntry(1,1,1) ) val entries4: RDD[MatrixEntry] = spark.sparkContext.parallelize(arr4) val coordMat: CoordinateMatrix = new CoordinateMatrix(entries4) val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // 检测BlockMatrix格式是否正确,错误的话会抛出异常,正确的话无其他影响 matA.validate() matA.blocks.foreach(println) val m4 = matA.numRowBlocks val n4 = matA.numColBlocks println(s"m4 is: $m4,n4 is $n4") // 计算A^T * A. val ata = matA.transpose.multiply(matA) ata.blocks.foreach(println)
输出为:
Block Matrix ... ((0,0),2 x 2 CSCMatrix (0,0) 1.0 (1,1) 1.0) m4 is: 1,n4 is 1 ((0,0),1.0 0.0 0.0 1.0 )
相似度计算原理探索
无论是ICF基于物品的协同过滤、UCF基于用户的协同过滤、基于内容的推荐,最基本的环节都是计算相似度。如果样本特征维度很高或者<user, item, score>的维度很大,都会导致无法直接计算。设想一下100w*100w的二维矩阵,计算相似度怎么算?
在spark中RowMatrix提供了一种并行计算相似度的思路,下面就来看看其中的奥妙吧!
相似度计算
相似度有很多种,每一种适合的场景都不太一样。比如:
- 欧氏距离,在几何中最简单的计算方法
- 夹角余弦,通过方向计算相似度,通常在用户对商品评分、NLP等场景使用
- 杰卡德距离,在不考虑每一样的具体值时使用
- 皮尔森系数,与夹角余弦类似,但是可以去中心化。比如评分时,有人倾向于打高分,有人倾向于打低分,他们的最后效果在皮尔森中是一样的
- 曼哈顿距离,一般在路径规划、地图类中常用,比如A*算法中使用曼哈顿来作为每一步代价值的一部分(F=G+H, G是从当前点移动到下一个点的距离,H是距离目标点的距离,这个H就可以用曼哈顿距离表示)
上面两个向量(x1,y1)和(x2,y2)计算夹角的余弦值就是两个向量方向的相似度,其公式为:
cos(θ)=a⋅b∣∣a∣∣∗∣∣b∣∣=x1∗x2+y1∗y2x12+x22∗y12+y22
cos(\theta )=\frac { a\cdot b }{ ||a||\ast ||b|| } \\ =\quad \frac { { x }_{ 1 }\ast { x }_{ 2 }\quad +\quad { y }_{ 1 }\ast y_{ 2 } }{ \sqrt { { x }_{ 1 }^{ 2 }+{ x }_{ 2 }^{ 2 } } \ast \sqrt { { y }_{ 1 }^{ 2 }+{ y }_{ 2 }^{ 2 } } }
cos(θ)=∣∣a∣∣∗∣∣b∣∣a⋅b=x12+x22∗y12+y22x1∗x2+y1∗y2
其中,||a||表示a的模,即每一项的平方和再开方。
公式拆解
那么如果向量不只是两维,而是n维呢?比如有两个向量:
第一个向量:(x1,x2,x3,...,xn)第二个向量:(y1,y2,y3,...,yn)
第一个向量:({x}_{1}, {x}_{2}, {x}_{3}, ..., {x}_{n})\\
第二个向量:({y}_{1}, {y}_{2}, {y}_{3}, ..., {y}_{n})
第一个向量:(x1,x2,x3,...,xn)第二个向量:(y1,y2,y3,...,yn)
他们的相似度计算方法套用上面的公式为:
cos(θ)=∑i=1n(xi∗yi)∑i=1nxi2∗∑i=1nyi2=x1∗y1+x2∗y2+...+xn∗yn∑i=1nxi2∗∑i=1nyi2=x1∗y1∑i=1nxi2∗∑i=1nyi2+x2∗y2∑i=1nxi2∗∑i=1nyi2+...+xn∗yn∑i=1nxi2∗∑i=1nyi2=x1∑i=1nxi2∗y1∑i=1nyi2+x2∑i=1nxi2∗y2∑i=1nyi2+...+xn∑i=1nxi2∗yn∑i=1nyi2
cos(\theta )\quad =\quad \frac { \sum _{ i=1 }^{ n }{ ({ x }_{ i }\ast { y }_{ i }) } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 }\ast { y }_{ 1 }+{ x }_{ 2 }\ast { y }_{ 2 }+...+{ x }_{ n }\ast { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 }\ast { y }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +\frac { { x }_{ 2 }\ast { y }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +...+\frac { { x }_{ n }\ast { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +\frac { { x }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +...+\frac { { x }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } }
cos(θ)=∑i=1nxi2∗∑i=1nyi2∑i=1n(xi∗yi)=∑i=1nxi2∗∑i=1nyi2x1∗y1+x2∗y2+...+xn∗yn=∑i=1nxi2∗∑i=1nyi2x1∗y1+∑i=1nxi2∗∑i=1nyi2x2∗y2+...+∑i=1nxi2∗∑i=1nyi2xn∗yn=∑i=1nxi2x1∗∑i=1nyi2y1+∑i=1nxi2x2∗∑i=1nyi2y2+...+∑i=1nxi2xn∗∑i=1nyi2yn
通过上面的公式就可以发现,夹角余弦可以拆解成每一项与另一项对应位置的乘积x1∗y1,再除以每个向量自己的
∑i=1nxi2
\sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } }
i=1∑nxi2
就可以了。
矩阵并行
画个图看看,首先创建下面的矩阵:
注意,矩阵里面都是一列代表一个向量…上面是创建矩阵时的三元组,如果在spark中想要创建matrix,可以这样:
val df = spark.createDataFrame(Seq( (0, 0, 1.0), (1, 0, 1.0), (2, 0, 1.0), (3, 0, 1.0), (0, 1, 2.0), (1, 1, 2.0), (2, 1, 1.0), (3, 1, 1.0), (0, 2, 3.0), (1, 2, 3.0), (2, 2, 3.0), (0, 3, 1.0), (1, 3, 1.0), (3, 3, 4.0) )) val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD)
然后计算每一个向量的normL2,即平方和开根号。
那么在Spark如何快速并行处理呢?通过上面的例子,可以看到两个向量的相似度,需要把每一维度乘积后相加,但是一个向量一般都是跨RDD保存的,所以可以先计算所有向量的第一维,得出结果
(向量1的第1维,向量2的第1维,value)(向量1的第2维,向量2的第2维,value)...(向量1的第n维,向量2的第n维,value)(向量1的第1维,向量3的第1维,value)..(向量1的第n维,向量3的第n维,value)
(向量1的第1维,向量2的第1维,value)\\
(向量1的第2维,向量2的第2维,value)\\
...\\
(向量1的第n维,向量2的第n维,value)\\
(向量1的第1维,向量3的第1维,value)\\
..\\
(向量1的第n维,向量3的第n维,value)\\
(向量1的第1维,向量2的第1维,value)(向量1的第2维,向量2的第2维,value)...(向量1的第n维,向量2的第n维,value)(向量1的第1维,向量3的第1维,
7ff7
value)..(向量1的第n维,向量3的第n维,value)
最后对做一次reduceByKey累加结果即可…
阅读源码
首先创建dataframe形成matrix:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.sql.SparkSession object MatrixSimTest { def main(args: Array[String]): Unit = { // 创建dataframe,转换成matrix val spark = SparkSession.builder().master("local[*]").appName("sim").getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val df = spark.createDataFrame(Seq( (0, 0, 1.0), (1, 0, 1.0), (2, 0, 1.0), (3, 0, 1.0), (0, 1, 2.0), (1, 1, 2.0), (2, 1, 1.0), (3, 1, 1.0), (0, 2, 3.0), (1, 2, 3.0), (2, 2, 3.0), (0, 3, 1.0), (1, 3, 1.0), (3, 3, 4.0) )) val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD)// 调用sim方法 val x = matrix.toRowMatrix().columnSimilarities() // 得到相似度结果 x.entries.collect().foreach(println) } }
得到的结果为:
MatrixEntry(0,3,0.7071067811865476) MatrixEntry(0,2,0.8660254037844386) MatrixEntry(2,3,0.2721655269759087) MatrixEntry(0,1,0.9486832980505139) MatrixEntry(1,2,0.9128709291752768) MatrixEntry(1,3,0.596284793999944)
直接进入columnSimilarities方法看看是怎么个流程吧!
def columnSimilarities(): CoordinateMatrix = { columnSimilarities(0.0) }
内部调用了带阈值的相似度方法,这里的阈值是指相似度小于该值时,输出结果时,会自动过滤掉。
def columnSimilarities(threshold: Double): CoordinateMatrix = { //检查参数... val gamma = if (threshold < 1e-6) { Double.PositiveInfinity } else { 10 * math.log(numCols()) / threshold } columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) }
这里的gamma用于采样,具体的做法咱们来继续看源码。然后看一下computeColumnSummaryStatistics().normL2.toArray这个方法:
def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = { val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)( (aggregator, data) => aggregator.add(data), (aggregator1, aggregator2) => aggregator1.merge(aggregator2)) updateNumRows(summary.count) summary }
之前有介绍这个treeAggregate是一种带“预reduce”的map-reduce,返回的summary,里面帮我们统计了每一个向量的很多指标,比如
currMean 为 每一个向量的平均值 currM2 为 每个向量的每一维的平方和 currL1 为 每个向量的绝对值的和 currMax 为 每个向量的最大值 currMin 为 每个向量的最小值 nnz 为 每个向量的非0个数
这里我们只需要currM2,它是每个向量的平方和。summary调用的normL2方法:
override def normL2: Vector = { require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") val realMagnitude = Array.ofDim[Double](n) var i = 0 val len = currM2.length while (i < len) { realMagnitude(i) = math.sqrt(currM2(i)) i += 1 } Vectors.dense(realMagnitude) }
上面这步就是对平方和开个根号,这样就求出来了每个向量的分母部分。
下面就是最关键的地方了:
private[mllib] def columnSimilaritiesDIMSUM( colMags: Array[Double], gamma: Double): CoordinateMatrix = { // 一些参数校验 // 对gamma进行开方 val sg = math.sqrt(gamma) // sqrt(gamma) used many times // 这里把前面算的平方根的值设置一个默认值,因为如果为0,除0会报异常,所以设置为1 val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x) // 把抽样概率数组 和 平方根数组进行广播 val sc = rows.context val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c)) val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c))) // 遍历每一行,计算每个向量该维的乘积,形成三元组 val sims = rows.mapPartitionsWithIndex { (indx, iter) => val p = pBV.value val q = qBV.value // 获得随机值 val rand = new XORShiftRandom(indx) val scaled = new Array[Double](p.size) iter.flatMap { row => row match { case SparseVector(size, indices, values) => // 如果是稀疏向量,遍历向量的每一维,除以平方根 val nnz = indices.size var k = 0 while (k < nnz) { scaled(k) = values(k) / q(indices(k)) k += 1 } // 遍历向量数组,计算每一个数值与其他数值的乘机。 // 比如向量(1, 2, 0 ,1) // 得到的结果为 (0,1,value)(0,3,value)(2,3,value) Iterator.tabulate (nnz) { k => val buf = new ListBuffer[((Int, Int), Double)]() val i = indices(k) val iVal = scaled(k) // 判断当前列是否符合采样范围,如果小于采样值,就忽略 if (iVal != 0 && rand.nextDouble() < p(i)) { var l = k + 1 while (l < nnz) { val j = indices(l) val jVal = scaled(l) if (jVal != 0 && rand.nextDouble() < p(j)) { // 计算每一维与其他维的值 buf += (((i, j), iVal * jVal)) } l += 1 } } buf }.flatten case DenseVector(values) => // 跟稀疏同理 val n = values.size var i = 0 while (i < n) { scaled(i) = values(i) / q(i) i += 1 } Iterator.tabulate (n) { i => val buf = new ListBuffer[((Int, Int), Double)]() val iVal = scaled(i) if (iVal != 0 && rand.nextDouble() < p(i)) { var j = i + 1 while (j < n) { val jVal = scaled(j) if (jVal != 0 && rand.nextDouble() < p(j)) { buf += (((i, j), iVal * jVal)) } j += 1 } } buf }.flatten } } // 最后再执行一个reduceBykey,累加所有的值,就是i和j的相似度 }.reduceByKey(_ + _).map { case ((i, j), sim) => MatrixEntry(i.toLong, j.toLong, sim) } new CoordinateMatrix(sims, numCols(), numCols()) }
这样把所有向量的平方和广播后,每一行都可以在不同的节点并行处理了。
总结来说,Spark提供的这个计算相似度的方法有两点优势:
- 通过拆解公式,使得每一行独立计算,加快速度
- 提供采样方案,以采样方式抽样固定的特征维度计算相似度
不过杰卡德目前并不能使用这种方法来计算,因为杰卡德中间有一项需要对向量求dot,这种方式就不适合了;如果杰卡德想要快速计算,可以去参考LSH局部敏感哈希算法,这里就不详细说明了。
注:《推荐系统开发实战》是小编的最新出版的技术图书,已经在京东,当当上线,感兴趣的朋友可以进行购买阅读!
- Spark MLlib 之 大规模数据集的相似度计算原理探索
- 大规模数据相似度计算时,解决数据倾斜的问题的思路之一(分块思想)
- 大规模数据相似度计算时,解决数据倾斜的问题的思路之一(分块思想)
- 一文详解大规模数据计算处理原理及操作重点 - 大数据
- 一文详解大规模数据计算处理原理及操作重点 - 大数据
- 大规模数据相似度计算时,解决数据倾斜的问题的思路之一(分块思想)
- JAVA基础学习笔记 day004_作业01(数据类型计算训练)
- java数据类型传递探索
- ALTER TABLE 失败,因为下列 SET 选项的设置不正确:'ARITHABORT'。请确保 SET 选项可正确用于计算列和/或查询通知和/或 xml 数据类型方法的索引视图和/或索引。
- JOSN传递对象数据类型的原理
- JOSN传递对象数据类型的原理
- 托管C++探索01-数据类型
- Spark Mllib里的如何对两组数据用皮尔逊计算相关系数
- 【Scikit-Learn 中文文档】大规模计算的策略: 更大量的数据 - 用户指南 | ApacheCN
- 第四周作业-键盘输入数据 和 数据类型----用键盘输入圆的半径计算圆的面积
- 利用知网数据计算词语相似度
- 徐海蛟:Matlab计算大规模图片数据集的L1距离
- 进制转换,计算 还有各种数据类型和字符
- 结合谷歌地图多边形(polygon)与Sql Server 2008的空间数据类型计算某个点是否在多边形内的注意事项
- 怎么计算c++中各个数据类型的取值范围