Spark 代码阅读笔记

Spark 执行层包括三个模块,master、worker、client。

master 负责管理 worker 进程,worker 负责任务的执行并将结果提交给 master,client 负责向 master 提交作业。其中,master 和 worker 是后台常驻进程。client 在作业运行过程中由 SparkContext 初始化的时候启动,然后,client 向 master 注册作业。master、worker、client 由事件驱动的 RPC 库来负责任务状态信息的交换,这个库由 akka 框架负责。

Spark 中 RDD 的 shuffle 操作涉及了中间数据的传输。在 map-reduce 过程中,每一个 mapper 为每一个 reducer 分配了一个 bucket 的数据结构用来缓存数据;reducer 通过两种方式获得数据,一种是使用 NIO 建立 socket 连接去 fetch 数据,这种方式是默认方式;一种是 OIO 通过 netty server 去 fetch 数据。与 Hadoop MapReduce 不同的是 Spark 在 Reduce 端没有强制的 merge-sort 操作,而是通过采用了 hashmap 数据结构建立了 key 和 value 之间的对应关系,通过消耗内存的方式减少了"不必要"的操作。

Spark kmeans example 分析

// the entrance of spark which contains the operations of creating RDD, accumulation, broadcast, etc.

val sc =
new SparkContext(args(0),

System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))

// load the data from input path, such as HDFS or local disks, it is a RDD

val lines = sc.textFile(args(1))

// convert the string format of the data to the double, it is a RDD

val data = lines.map(parseVector _).cache()

// k clusters

val K = args(2).toInt

// termination threshold

val convergeDist = args(3).toDouble


// initialize the centers of k clusters

var kPoints = data.takeSample(false, K,

var tempDist =


while(tempDist > convergeDist)

// choose the closest center to label each vector, the output is structured of (label, (vector, 1))

var closest = data.map (p =>
(closestPoint(p, kPoints),

// calculate the sum of vectors in each center, the output is structured (label, (sum of vector, number of vectors))

var pointStats = closest.reduceByKey{case
((x1, y1),
(x2, y2))
(x1 + x2, y1 + y2)}

// calculate the average vector in each cluster, the output is structured (label, new centers defined by average vector)

var newPoints = pointStats.map {pair =>
(pair._1, pair._2._1 / pair._2._2)}.collectAsMap()


tempDist =

// calculate the delta between current and previous vectors

(i <-
0 until K)

tempDist += kPoints(i).squaredDist(newPoints(i))



(newP <- newPoints)

= newP._2


println("Finished iteration (delta = "
+ tempDist +


将 K-means 应用类比于 Hadoop MR 编程模型,我们主要观察执行 reduceByKey 操作时,Spark 是如何在分布式环境中进行计算的。


* Merge the values for each key using an associative reduce function. This will also perform

* the merging locally on each mapper before sending results to a reducer, similarly to a

* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/

* parallelism level.


def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {

reduceByKey(defaultPartitioner(self), func)




* Merge the values for each key using an associative reduce function. This will also perform

* the merging locally on each mapper before sending results to a reducer, similarly to a

* "combiner" in MapReduce.


def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {

combineByKey[V]((v: V) => v, func, func, partitioner)



class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])




* Generic function to combine the elements for each key using a custom set of aggregation

* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C

* Note that V and C can be different -- for example, one might group an RDD of type

* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:


* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)

* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)

* - `mergeCombiners`, to combine two C's into a single one.


* In addition, users can control the partitioning of the output RDD, and whether to perform

* map-side aggregation (if a mapper can produce multiple items with the same key).


def combineByKey[C](createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean = true,

serializerClass: String = null): RDD[(K, C)] = {

if (getKeyClass().isArray) {

if (mapSideCombine) {

throw new SparkException("Cannot use map-side combining with array keys.")


if (partitioner.isInstanceOf[HashPartitioner]) {

throw new SparkException("Default partitioner cannot partition array keys.")



val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

if (self.partitioner == Some(partitioner)) {

self.mapPartitionsWithContext((context, iter) => {

new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))

}, preservesPartitioning = true)

} else if (mapSideCombine) {

val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)

val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)


partitioned.mapPartitionsWithContext((context, iter) => {

new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))

}, preservesPartitioning = true)

} else {

// Don't apply map-side combiner.

// A sanity check to make sure mergeCombiners is not defined.

assert(mergeCombiners == null)

val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)

values.mapPartitionsWithContext((context, iter) => {

new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))

}, preservesPartitioning = true)




