您的位置:首页 > 大数据

spark scala Apriori

2016-08-18 10:36 337 查看
val mydata = Array(Array(1,3,4,5),Array(2,3,5),Array(1,2,3,4,5),Array(2,3,4,5))
val pamydata = sc.parallelize(mydata)
val C1 = pamydata.flatMap(_.toSet).distinct().collect().map(Set(_))
val D = mydata.map(_.toSet)
val D_bc = sc.broadcast(D)
val length = mydata.length
var limit = 0.70
def f1(a:Set[Int],B:Array[Set[Int]],length:Int,limit:Double) = {if(B.filter(b => a.subsetOf(b)).size/length.toDouble >= limit)(a,B.filter(b => a.subsetOf(b)).size/length.toDouble)}
var suppdata = sc.parallelize(C1).map(f1(_,D_bc.value,4,limit)).filter(_.!=(())).collect()
var L = Array[Array[Set[Int]]]()
val L1 = suppdata.map(_ match{case a:Tuple2[_,_] => a._1 match{ case b:Set[_] => b.asInstanceOf[Set[Int]]}})
L = L :+ L1
var k=2
while(L(k-2).length>0){
var CK = Array[Set[Int]]()
for((var1,index) <- L(k-2).zipWithIndex;var2 <- L(k-2).drop(index+1) if var1.take(k-2).equals(var2.take(k-2))){CK= CK :+ (var1|var2)}
val suppdata_temp = sc.parallelize(CK).map(f1(_,D_bc.value,4,limit)).filter(_.!=(())).collect()
suppdata = suppdata :+ suppdata_temp
L = L :+ suppdata_temp.map(_ match{case a:Tuple2[_,_] => a._1 match{ case b:Set[_] => b.asInstanceOf[Set[Int]]}})
k += 1
}
L = L.filter(_.nonEmpty)
L
suppdata
这里只写了挖掘频繁项集,发现关联规则的代码可以参考pyspark的版本来写
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息