Spark组件之GraphX学习9--使用pregel函数求单源最短路径
2016-05-04 15:03
387 查看
更多代码请见:https://github.com/xubo245/SparkLearning
1解释
使用pregel函数求单源最短路径
GraphX中的单源点最短路径例子,使用的是类Pregel的方式。
核心部分是三个函数:
1.节点处理消息的函数 vprog: (VertexId, VD, A) => VD (节点id,节点属性,消息) => 节点属性
2.节点发送消息的函数 sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)] (边元组) => Iterator[(目标节点id,消息)]
3.消息合并函数 mergeMsg: (A, A) => A) (消息,消息) => 消息
具体请参考【3】
主要代码:
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
源码:
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations.
*
* @tparam A the Pregel message type
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
2.代码:
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html * time 20160503
*/
package org.apache.spark.graphx.learning
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.util.GraphGenerators
object Pregeloperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CollectingNeighbors").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 2 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
println("graph:");
println("vertices:");
graph.vertices.collect.foreach(println)
println("edges:");
graph.edges.collect.foreach(println)
println();
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
println("initialGraph:");
println("vertices:");
initialGraph.vertices.collect.foreach(println)
println("edges:");
initialGraph.edges.collect.foreach(println)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println();
println("sssp:");
println("vertices:");
println(sssp.vertices.collect.mkString("\n"))
println("edges:");
sssp.edges.collect.foreach(println)
}
}
3.结果:
分析:
由上诉结果画图可得:
黑色部分为初始化图Graph的点和边,initGraph会将除了第二个节点外的所有节点的值初始化为无穷大,自己设为0,然后从0开始pregel处理。红色部分为实际求单源最短路径可能的路线,所以节点2到节点1为1,到3为2,到4为3,到0为4
参考
【1】 http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
【2】https://github.com/xubo245/SparkLearning
【3】http://blog.csdn.net/li385805776/article/details/20487219
1解释
使用pregel函数求单源最短路径
GraphX中的单源点最短路径例子,使用的是类Pregel的方式。
核心部分是三个函数:
1.节点处理消息的函数 vprog: (VertexId, VD, A) => VD (节点id,节点属性,消息) => 节点属性
2.节点发送消息的函数 sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)] (边元组) => Iterator[(目标节点id,消息)]
3.消息合并函数 mergeMsg: (A, A) => A) (消息,消息) => 消息
具体请参考【3】
主要代码:
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
源码:
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations.
*
* @tparam A the Pregel message type
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration
*
* @param maxIterations the maximum number of iterations to run for
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
object Pregel extends Logging { /** * Execute a Pregel-like iterative vertex-parallel abstraction. The * user-defined vertex-program `vprog` is executed in parallel on * each vertex receiving any inbound messages and computing a new * value for the vertex. The `sendMsg` function is then invoked on * all out-edges and is used to compute an optional message to the * destination vertex. The `mergeMsg` function is a commutative * associative function used to combine messages destined to the * same vertex. * * On the first iteration all vertices receive the `initialMsg` and * on subsequent iterations if a vertex does not receive a message * then the vertex-program is not invoked. * * This function iterates until there are no remaining messages, or * for `maxIterations` iterations. * * @tparam VD the vertex data type * @tparam ED the edge data type * @tparam A the Pregel message type * * @param graph the input graph. * * @param initialMsg the message each vertex will receive at the first * iteration * * @param maxIterations the maximum number of iterations to run for * * @param activeDirection the direction of edges incident to a vertex that received a message in * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only * out-edges of vertices that received a message in the previous round will run. The default is * `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message * in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where * *both* vertices received a message. * * @param vprog the user-defined vertex program which runs on each * vertex and receives the inbound message and computes a new vertex * value. On the first iteration the vertex program is invoked on * all vertices and is passed the default message. On subsequent * iterations the vertex program is only invoked on those vertices * that receive messages. * * @param sendMsg a user supplied function that is applied to out * edges of vertices that received messages in the current * iteration * * @param mergeMsg a user supplied function that takes two incoming * messages of type A and merges them into a single message of type * A. ''This function must be commutative and associative and * ideally the size of A should not increase.'' * * @return the resulting graph at the end of the computation * */ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() // compute the messages var messages = g.mapReduceTriplets(sendMsg, mergeMsg) var activeMessages = messages.count() // Loop var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. prevG = g g = g.joinVertices(messages)(vprog).cache() val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache // messages so it can be materialized on the next line, allowing us to uncache the previous // iteration. messages = g.mapReduceTriplets( sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages // and the vertices of g). activeMessages = messages.count() logInfo("Pregel finished iteration " + i) // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking = false) prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) // count the iteration i += 1 } g } // end of apply
2.代码:
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html * time 20160503
*/
package org.apache.spark.graphx.learning
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.util.GraphGenerators
object Pregeloperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CollectingNeighbors").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 2 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
println("graph:");
println("vertices:");
graph.vertices.collect.foreach(println)
println("edges:");
graph.edges.collect.foreach(println)
println();
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
println("initialGraph:");
println("vertices:");
initialGraph.vertices.collect.foreach(println)
println("edges:");
initialGraph.edges.collect.foreach(println)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println();
println("sssp:");
println("vertices:");
println(sssp.vertices.collect.mkString("\n"))
println("edges:");
sssp.edges.collect.foreach(println)
}
}
3.结果:
graph: vertices: (4,3) (0,3) (1,2) (2,3) (3,4) edges: Edge(0,0,1.0) Edge(0,0,1.0) Edge(0,4,1.0) Edge(1,1,1.0) Edge(1,3,1.0) Edge(2,1,1.0) Edge(2,1,1.0) Edge(2,1,1.0) Edge(3,1,1.0) Edge(3,2,1.0) Edge(3,2,1.0) Edge(3,4,1.0) Edge(4,0,1.0) Edge(4,2,1.0) Edge(4,4,1.0) initialGraph: vertices: (4,Infinity) (0,Infinity) (1,Infinity) (2,0.0) (3,Infinity) edges: Edge(0,0,1.0) Edge(0,0,1.0) Edge(0,4,1.0) Edge(1,1,1.0) Edge(1,3,1.0) Edge(2,1,1.0) Edge(2,1,1.0) Edge(2,1,1.0) Edge(3,1,1.0) Edge(3,2,1.0) Edge(3,2,1.0) Edge(3,4,1.0) Edge(4,0,1.0) Edge(4,2,1.0) Edge(4,4,1.0) 2016-05-04 14:43:01 WARN BlockManager:71 - Block rdd_23_1 already exists on this machine; not re-adding it sssp: vertices: (4,3.0) (0,4.0) (1,1.0) (2,0.0) (3,2.0) edges: Edge(0,0,1.0) Edge(0,0,1.0) Edge(0,4,1.0) Edge(1,1,1.0) Edge(1,3,1.0) Edge(2,1,1.0) Edge(2,1,1.0) Edge(2,1,1.0) Edge(3,1,1.0) Edge(3,2,1.0) Edge(3,2,1.0) Edge(3,4,1.0) Edge(4,0,1.0) Edge(4,2,1.0) Edge(4,4,1.0)
分析:
由上诉结果画图可得:
黑色部分为初始化图Graph的点和边,initGraph会将除了第二个节点外的所有节点的值初始化为无穷大,自己设为0,然后从0开始pregel处理。红色部分为实际求单源最短路径可能的路线,所以节点2到节点1为1,到3为2,到4为3,到0为4
参考
【1】 http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
【2】https://github.com/xubo245/SparkLearning
【3】http://blog.csdn.net/li385805776/article/details/20487219
相关文章推荐
- 第三届 山东省ACM省赛 Mine Number(二进制查分+暴力)
- 针对Logstash吞吐量一次优化
- JVM类加载机制
- Oracle之分页查询
- C语言(算法)
- FTP断点续传
- 微软推荐通用 Windows 应用开发者使用 SQLite
- javaweb学习总结(八)——HttpServletResponse对象(二)
- python学习笔记(virtualenv下载安装)
- 产品外观设计在线资源汇总(待完善)
- Centos6.5安装ftp组件
- 每天laravel-20160730| Container -2
- C# vs2012中 -- 不可访问,因为它受保护级别限制
- WPF中的事件及冒泡事件和隧道事件的区别
- 分页功能测试点
- 报错:Xcode 7.3 cannot create __weak reference in file using manual reference counting
- log4jdbc数据库访问日志框架使用
- java用double和float进行小数计算精度不准确
- javaweb学习总结(七)——HttpServletResponse对象(一)
- maven的应用(三):上传第三方jar到maven私服