您的位置:首页 > 其它

spark graphX求最短路径以及中间结点

2015-11-26 17:49 225 查看
老外论坛找了一个最短路径的代码,并保存中间节点,分享一下

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{EdgeDirection, VertexId, Graph}
import org.apache.spark.graphx.util.GraphGenerators

object Pregel_SSSP {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Pregel_SSSP")
val sc = new SparkContext(conf)
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
graph.edges.foreach(println)
val sourceId: VertexId = 0 // The ultimate source

// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph : Graph[(Double, List[VertexId]), Double] = graph.mapVertices((id, _) =>
if (id == sourceId) (0.0, List[VertexId](sourceId))
else (Double.PositiveInfinity, List[VertexId]()))

val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), Int.MaxValue, EdgeDirection.Out)(

// Vertex Program
(id, dist, newDist) => if (dist._1 < newDist._1) dist else newDist,

// Send Message
triplet => {
if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr ) {
Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr , triplet.srcAttr._2 :+ triplet.dstId)))
} else {
Iterator.empty
}
},
//Merge Message
(a, b) => if (a._1 < b._1) a else b)
println(sssp.vertices.collect.mkString("\n"))
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: