您的位置:首页 > 编程语言

Spark 代码阅读笔记

2014-03-02 21:50 232 查看
Spark 执行层包括三个模块,master、worker、client。

master 负责管理 worker 进程,worker 负责任务的执行并将结果提交给 master,client 负责向 master 提交作业。其中,master 和 worker 是后台常驻进程。client 在作业运行过程中由 SparkContext 初始化的时候启动,然后,client 向 master 注册作业。master、worker、client 由事件驱动的 RPC 库来负责任务状态信息的交换,这个库由 akka 框架负责。

Spark 中 RDD 的 shuffle 操作涉及了中间数据的传输。在 map-reduce 过程中,每一个 mapper 为每一个 reducer 分配了一个 bucket 的数据结构用来缓存数据;reducer 通过两种方式获得数据,一种是使用 NIO 建立 socket 连接去 fetch 数据,这种方式是默认方式;一种是 OIO 通过 netty server 去 fetch 数据。与 Hadoop MapReduce 不同的是 Spark 在 Reduce 端没有强制的 merge-sort 操作,而是通过采用了 hashmap 数据结构建立了 key 和 value 之间的对应关系,通过消耗内存的方式减少了"不必要"的操作。

Spark kmeans example 分析

// the entrance of spark which contains the operations of creating RDD, accumulation, broadcast, etc.

val sc =
new SparkContext(args(0),
"SparkLocalKMeans",

System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))

// load the data from input path, such as HDFS or local disks, it is a RDD

val lines = sc.textFile(args(1))

// convert the string format of the data to the double, it is a RDD

val data = lines.map(parseVector _).cache()

// k clusters

val K = args(2).toInt

// termination threshold

val convergeDist = args(3).toDouble

 

// initialize the centers of k clusters

var kPoints = data.takeSample(false, K,
42).toArray

var tempDist =
1.0

 

while(tempDist > convergeDist)
{

// choose the closest center to label each vector, the output is structured of (label, (vector, 1))

var closest = data.map (p =>
(closestPoint(p, kPoints),
(p,
1)))

// calculate the sum of vectors in each center, the output is structured (label, (sum of vector, number of vectors))

var pointStats = closest.reduceByKey{case
((x1, y1),
(x2, y2))
=>
(x1 + x2, y1 + y2)}

// calculate the average vector in each cluster, the output is structured (label, new centers defined by average vector)

var newPoints = pointStats.map {pair =>
(pair._1, pair._2._1 / pair._2._2)}.collectAsMap()

 

tempDist =
0.0

// calculate the delta between current and previous vectors

for
(i <-
0 until K)
{

tempDist += kPoints(i).squaredDist(newPoints(i))

}

 

for
(newP <- newPoints)
{

kPoints(newP._1)
= newP._2

}

println("Finished iteration (delta = "
+ tempDist +
")")

}

将 K-means 应用类比于 Hadoop MR 编程模型,我们主要观察执行 reduceByKey 操作时,Spark 是如何在分布式环境中进行计算的。

/**


* Merge the values for each key using an associative reduce function. This will also perform


* the merging locally on each mapper before sending results to a reducer, similarly to a


* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/


* parallelism level.


*/


def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {


reduceByKey(defaultPartitioner(self), func)


}


 

/**


* Merge the values for each key using an associative reduce function. This will also perform


* the merging locally on each mapper before sending results to a reducer, similarly to a


* "combiner" in MapReduce.


*/


def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {


combineByKey[V]((v: V) => v, func, func, partitioner)


}


 

class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])


...


...


/**


* Generic function to combine the elements for each key using a custom set of aggregation


* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C


* Note that V and C can be different -- for example, one might group an RDD of type


* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:


*


* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)


* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)


* - `mergeCombiners`, to combine two C's into a single one.


*


* In addition, users can control the partitioning of the output RDD, and whether to perform


* map-side aggregation (if a mapper can produce multiple items with the same key).


*/


def combineByKey[C](createCombiner: V => C,


mergeValue: (C, V) => C,


mergeCombiners: (C, C) => C,


partitioner: Partitioner,


mapSideCombine: Boolean = true,


serializerClass: String = null): RDD[(K, C)] = {


if (getKeyClass().isArray) {


if (mapSideCombine) {


throw new SparkException("Cannot use map-side combining with array keys.")


    }


if (partitioner.isInstanceOf[HashPartitioner]) {


throw new SparkException("Default partitioner cannot partition array keys.")


    }


  }


val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)


if (self.partitioner == Some(partitioner)) {


self.mapPartitionsWithContext((context, iter) => {


new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))


}, preservesPartitioning = true)


} else if (mapSideCombine) {


val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)


val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)


.setSerializer(serializerClass)


partitioned.mapPartitionsWithContext((context, iter) => {


new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))


}, preservesPartitioning = true)


} else {


// Don't apply map-side combiner.


// A sanity check to make sure mergeCombiners is not defined.


assert(mergeCombiners == null)


val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)


values.mapPartitionsWithContext((context, iter) => {


new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))


}, preservesPartitioning = true)


  }


}


  
 

[1]. Spark源码解析 – Shuffle http://blog.csdn.net/mango_song/article/details/17933115

[2]. Spark源码分析 – Shuffle /article/4893472.html

[3]. Spark reference http://jerryshao.me/tags.html#spark-ref

  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: