Spark 代码阅读笔记
2014-03-02 21:50
232 查看
Spark 执行层包括三个模块,master、worker、client。
master 负责管理 worker 进程,worker 负责任务的执行并将结果提交给 master,client 负责向 master 提交作业。其中,master 和 worker 是后台常驻进程。client 在作业运行过程中由 SparkContext 初始化的时候启动,然后,client 向 master 注册作业。master、worker、client 由事件驱动的 RPC 库来负责任务状态信息的交换,这个库由 akka 框架负责。
Spark 中 RDD 的 shuffle 操作涉及了中间数据的传输。在 map-reduce 过程中,每一个 mapper 为每一个 reducer 分配了一个 bucket 的数据结构用来缓存数据;reducer 通过两种方式获得数据,一种是使用 NIO 建立 socket 连接去 fetch 数据,这种方式是默认方式;一种是 OIO 通过 netty server 去 fetch 数据。与 Hadoop MapReduce 不同的是 Spark 在 Reduce 端没有强制的 merge-sort 操作,而是通过采用了 hashmap 数据结构建立了 key 和 value 之间的对应关系,通过消耗内存的方式减少了"不必要"的操作。
Spark kmeans example 分析
// the entrance of spark which contains the operations of creating RDD, accumulation, broadcast, etc.
val sc =
new SparkContext(args(0),
"SparkLocalKMeans",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// load the data from input path, such as HDFS or local disks, it is a RDD
val lines = sc.textFile(args(1))
// convert the string format of the data to the double, it is a RDD
val data = lines.map(parseVector _).cache()
// k clusters
val K = args(2).toInt
// termination threshold
val convergeDist = args(3).toDouble
// initialize the centers of k clusters
var kPoints = data.takeSample(false, K,
42).toArray
var tempDist =
1.0
while(tempDist > convergeDist)
{
// choose the closest center to label each vector, the output is structured of (label, (vector, 1))
var closest = data.map (p =>
(closestPoint(p, kPoints),
(p,
1)))
// calculate the sum of vectors in each center, the output is structured (label, (sum of vector, number of vectors))
var pointStats = closest.reduceByKey{case
((x1, y1),
(x2, y2))
=>
(x1 + x2, y1 + y2)}
// calculate the average vector in each cluster, the output is structured (label, new centers defined by average vector)
var newPoints = pointStats.map {pair =>
(pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
tempDist =
0.0
// calculate the delta between current and previous vectors
for
(i <-
0 until K)
{
tempDist += kPoints(i).squaredDist(newPoints(i))
}
for
(newP <- newPoints)
{
kPoints(newP._1)
= newP._2
}
println("Finished iteration (delta = "
+ tempDist +
")")
}
将 K-means 应用类比于 Hadoop MR 编程模型,我们主要观察执行 reduceByKey 操作时,Spark 是如何在分布式环境中进行计算的。
[1]. Spark源码解析 – Shuffle http://blog.csdn.net/mango_song/article/details/17933115
[2]. Spark源码分析 – Shuffle /article/4893472.html
[3]. Spark reference http://jerryshao.me/tags.html#spark-ref
master 负责管理 worker 进程,worker 负责任务的执行并将结果提交给 master,client 负责向 master 提交作业。其中,master 和 worker 是后台常驻进程。client 在作业运行过程中由 SparkContext 初始化的时候启动,然后,client 向 master 注册作业。master、worker、client 由事件驱动的 RPC 库来负责任务状态信息的交换,这个库由 akka 框架负责。
Spark 中 RDD 的 shuffle 操作涉及了中间数据的传输。在 map-reduce 过程中,每一个 mapper 为每一个 reducer 分配了一个 bucket 的数据结构用来缓存数据;reducer 通过两种方式获得数据,一种是使用 NIO 建立 socket 连接去 fetch 数据,这种方式是默认方式;一种是 OIO 通过 netty server 去 fetch 数据。与 Hadoop MapReduce 不同的是 Spark 在 Reduce 端没有强制的 merge-sort 操作,而是通过采用了 hashmap 数据结构建立了 key 和 value 之间的对应关系,通过消耗内存的方式减少了"不必要"的操作。
Spark kmeans example 分析
// the entrance of spark which contains the operations of creating RDD, accumulation, broadcast, etc.
val sc =
new SparkContext(args(0),
"SparkLocalKMeans",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// load the data from input path, such as HDFS or local disks, it is a RDD
val lines = sc.textFile(args(1))
// convert the string format of the data to the double, it is a RDD
val data = lines.map(parseVector _).cache()
// k clusters
val K = args(2).toInt
// termination threshold
val convergeDist = args(3).toDouble
// initialize the centers of k clusters
var kPoints = data.takeSample(false, K,
42).toArray
var tempDist =
1.0
while(tempDist > convergeDist)
{
// choose the closest center to label each vector, the output is structured of (label, (vector, 1))
var closest = data.map (p =>
(closestPoint(p, kPoints),
(p,
1)))
// calculate the sum of vectors in each center, the output is structured (label, (sum of vector, number of vectors))
var pointStats = closest.reduceByKey{case
((x1, y1),
(x2, y2))
=>
(x1 + x2, y1 + y2)}
// calculate the average vector in each cluster, the output is structured (label, new centers defined by average vector)
var newPoints = pointStats.map {pair =>
(pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
tempDist =
0.0
// calculate the delta between current and previous vectors
for
(i <-
0 until K)
{
tempDist += kPoints(i).squaredDist(newPoints(i))
}
for
(newP <- newPoints)
{
kPoints(newP._1)
= newP._2
}
println("Finished iteration (delta = "
+ tempDist +
")")
}
将 K-means 应用类比于 Hadoop MR 编程模型,我们主要观察执行 reduceByKey 操作时,Spark 是如何在分布式环境中进行计算的。
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
...
...
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializerClass: String = null): RDD[(K, C)] = {
if (getKeyClass().isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
}, preservesPartitioning = true)
}
}
[1]. Spark源码解析 – Shuffle http://blog.csdn.net/mango_song/article/details/17933115
[2]. Spark源码分析 – Shuffle /article/4893472.html
[3]. Spark reference http://jerryshao.me/tags.html#spark-ref
相关文章推荐
- Spark源码阅读笔记:Standalone模式集群核心角色代码浅析
- linux0.00 代码阅读笔记
- 代码阅读总结之ASP.NET StartKit TimeTracker(QueryString之改进笔记)
- 代码阅读总结之ASP.NET StartKit TimeTracker(数据绑定之困惑笔记)
- 梦断代码阅读笔记1
- Caffe 代码阅读笔记
- 梦断代码--阅读笔记2
- Spark学习笔记04:案例代码编写
- 第一行代码阅读笔记---AndroidMainfest.xml分析
- 第49讲:Scala中Variance代码实战及其在Spark中的应用源码解析学习笔记
- MySQL源码阅读笔记之代码结构
- Discuz!NT 代码阅读笔记(8)--DNT的几个分页存储过程解析
- 《第一行代码--Android》阅读笔记之广播
- spark官方文档阅读笔记1
- 梦断代码阅读笔记03
- Cocos2d-x创建和转换场景学习笔记 分类: cocos2d代码编写 2015-07-27 17:38 8人阅读 评论(0) 收藏
- Spark源码阅读笔记之任务调度(一)
- cocos2d-x系列笔记技巧篇—关于CREATE_FUNC宏的用法 分类: cocos2d代码编写 2015-07-29 08:50 7人阅读 评论(0) 收藏
- Typecho 代码阅读笔记(一) - 页面渲染及路由机制
- Kryo简介及代码阅读笔记