您的位置:首页 > 其它

spark graphx实现共同好友的聚合

2018-01-20 16:01 337 查看
spark graphx是一款优秀的图计算框架,对于批量计算图计算借助于spark的计算引擎,实现数据的快速聚合。
对于最基本的 共同好友推荐可以很方便的实现,一下为实现代码:
数据源的数据格式:
 1 2

2 4
。。。

package mob

import org.apache.spark.graphx.{GraphLoader, VertexRDD}
import org.apache.spark.{SparkConf, SparkContext}

object GraphRale {
/**
* 数据列表的笛卡尔乘积:{1,2,3,4}=>{(1,2),(1,3),(1,4),(2,3),(2,4),(3,4)}
* @param input
* @return
*/
def ciculate(input:List[Long]):Set[String]={
var result = Set[String]()
input.foreach(x=>{
input.foreach(y=>{
if(x<y){
result += s"${x}|${y}"
}else if(x>y){
result += s"${y}|${x}"
}
})
})
return result;
}
def twoDegree()={
val conf = new SparkConf().setMaster("local").setAppName("graph")
val sc = new SparkContext(conf)
val graph = GraphLoader.edgeListFile(sc,"D:\\grap.txt")
val relate: VertexRDD[List[Long]] = graph.aggregateMessages[List[Long]](
triplet=>{
triplet.sendToDst(List(triplet.srcId))
},
(a,b)=>(a++b)
).filter(x=>x._2.length>1)

val re = relate.flatMap(x=>{
for{temp <- ciculate(x._2)}yield (temp,1)
}).reduceByKey(_+_)

re.foreach(println(_))
}
def main(args: Array[String]): Unit = {
twoDegree()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: