您的位置:首页 > 编程语言

Spark源码分析(1) 从WordCount示例看Spark延迟计算原理

2014-08-22 12:54 811 查看
WordCount示例:

val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

在执行counts.saveAsTextFile("hdfs://...")前,spark其实没有进行真正的运算,只是在构造计算的过程——有点类似Scala中的lazy运算。当执行counts.saveAsTextFile才真正开始了整个计算。

源代码分析:

从Spark源码看,这种延迟计算的实现原理是如此的简单、明了!简单来说,原理如下:
每个运算(如flatMap、map)其实返回的都是一个RDD对象,可以认为最后形成了一个RDD对象的队列;直到最后需要计算时(例如调用了saveAsTextFile)才开始逐一调用各个RDD对象的compute方法,完成实际的运算。

具体分析:

1. spark.textFile("hdfs://...")返回了什么?
文件:/core/src/main/scala/org/apache/spark/SparkContext.scala

def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits).map(pair => pair._2.toString)
}

def hadoopFile[K, V](...): RDD[(K, V)] = {
new HadoopRDD(...)
}


可见,执行spark.textFile("hdfs://...")其实并没有真正的打开文件或做什么实际的操作,而只是返回了一个HadoopRDD的对象。

2. map、flatMap做了什么?
文件:core/src/main/scala/org/apache/spark/rdd/RDD.scala

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))


就是这么简单,map返回一个MappedRDD,flatMap返回一个FlatMappedRDD对象。他们都是RDD对象的子类。

3. reduceByKey做了什么?
reduceByKey不是RDD的一个成员函数,它定义在PairRDDFunctions类中。RDD会被隐式转换为PairRDDFunctions。

注:隐式转换的方法定义在文件:/core/src/main/scala/org/apache/spark/SparkContext.scala中。
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)

reduceByKey的定义在文件:/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializerClass: String = null): RDD[(K, C)] = {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (getKeyClass().isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}


这个过程有点复杂,但简单的从函数值上我们可以看到,reduceByKey最终返回的结果是RDD[(K, V)]。其实,细致分析下去,最终返回的是个MapPartitionsRDD对象:上面的代码最终都是生成一个ShuffledRDD对象;然后调用该对象的mapPartitionsWithContext方法,这个方法返回MapPartitionsRDD对象。如下:

def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}


4. 是时候开始真正的计算了!——compute()
通过上面的几个步骤,我们可以看到,其实我们还什么都没有做,只是构建了一些列的RDD对象!每个RDD对象都有一个Parent,如下:

/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))


通过这个Parent,实际上我们把一个个RDD对象串联了起来!这一串RDD,就是我们搭建起来的计算过程!具体如下:
HadoopRDD->FlatMappedRDD->MappedRDD->MapPartitionsRDD

在执行saveAsTextFile最终会调用runJob,在runJob的过程中会调用RDD的compute(具体过程另行分析)。它是RDD的一个abstract函数,由各个子类具体实现。我们以MappedRDD为例,看一下compute都做了什么事:

override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)


它首先调用了parent RDD的iterator(split, context),然后就执行了map运算。可见,在这里我们才真正执行了map运算——这就实现了所谓的延迟计算!

等等,前面还有个firstParent[T].iterator(split, context),猜猜它是干什么的? ——很简单,我们要想让前面搭建起来的运算过程顺序执行,必须得让“parent”RDD先compute完成,然后才执行用我们当前这个RDD要做的运算啊。所以他就是迭代的调用之前所有parent RDD的compute方法。如下:

/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}


总结:

这类延迟计算过程在Spark中称为“Transformation操作”。Transformation操作主要有:

map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter(func) : 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多个结果
mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition
mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index
sample(withReplacement,faction,seed):抽样
union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合
distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型
join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数
cartesian(otherDataset):笛卡尔积就是m*n
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 源代码 lazy