您的位置:首页 > 其它

Spark源码解读之RDD构建和转换过程

2016-12-15 18:46 495 查看
上一节讲了Spark源码解读之Context的初始化过程,发现其实一行简单的new SparkContext(sparkConf)代码,spark内部会去做很多事情。这节主要讲RDD的构建和转换过程。

一、 RDD概述

    RDD (Resilient Distributed Dataset) ,一个弹性分布式数据集,Spark中的基本抽象。代表一个不变(只读)的、可以并行操作的元素的分区集合。Spark中原生的RDD支持从以下三种方式创建:从scala集合中创建、从文件系统中创建、现有RDD的transform操作创建。RDD主要有以下五个特点:

1. 分区集合

    RDD是一个分区(partition)的集合,一个RDD有一个或多个分区。分区的数量决定了并行度。使用textFile创建RDD时可以不指定分区数(采用默认的分区数),也可以自己指定分区数。




2. 计算函数以分区为单位

    RDD在任务计算时是以分区为单位的,计算函数为compute函数:def compute(split: Partition, context: TaskContext): Iterator[T]。输入参数分别为RDD对应的分区以及task运行环境。不同的RDD子类可以去实现自己的compute方法。

3. RDD依赖于其他RDD

   每个RDD都有依赖关系(源RDD的依赖关系为空),这些依赖关系成为lineage,可以通过toDebugString方法来获得lineage。



使用textFile创建的RDD的lineage为HadoopRDD -> MapPartitionsRDD。

4. key-value 类型RDD的 Partitioner

    对于非key-value类型的RDD,Partitioner为None,对应key-value类型的RDD,Partitioner默认为HashPartitioner。在进行shuffle操作时,如reduceByKey,sortByKey,Partitioner决定了父RDD shuffle的输出时对应的分区中的数据是如何进行map的。

5. 分区支持数据本地性

    Spark在进行任务调度时,会尝试将任务分配到数据所在的机器上,从而避免了机器间的数据传输。RDD获取优先位置的方法为getPreferredLocations。

    一般只有涉及到从外部存储结构中读取数据时才会有优先位置,比如HadoopRDD,ShuffledRDD。

二、 实例讲解RDD构建和转换

在idea中对

val WordCounts = sc.textFile("/hosts.txt")
.flatMap(text => text.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)


进行debug。

1.  textFile

    SparkContext的textFile方法会从HDFS或本地读取文件,然后创建一个String类型的MapPartitionsRDD。方法如下:

def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}


path用于指定文件的路径,minPartitions用于指定最小的分区数,如果不指定minPartitions,会使用defaultMinPartitions方法获得minPartitions。之后使用hadoopFile方法创建一个HadoopRDD,由于HadoopRDD是一个key-value类型的RDD,key表示偏移量,value表示具体的内容,所以需要使用RDD的map方法来得到具体的数据(这样会将HadoopRDD转为
MapPartitionsRDD)。

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)


1.1
defaultMinPartitions方法 

defaultMinPartitions方法会取defaultParallelism和2最小值作为默认的minPartitions。defaultParallelism方法如下:

def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}


仔细一看,发现调用的是TaskScheduler的defaultParallelism方法,TaskScheduler是一个trait,实际上调用的是TaskSchedulerImpl的defaultParallelism方法:

override def defaultParallelism(): Int = backend.defaultParallelism()


由于采用本地运行模式,所以调用的是LocalBackend类的defaultParallelism方法:

override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)


1.2 hadoopFile方法

    hadoopFile用于创建HadoopRDD,实现如下:

def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}


创建HadoopRDD之前,先将hadoopConfiguration进行广播,然后创建一个setInputPathsFunc方法。HadoopRDD类介绍详见1.3。

1.3 HadoopRDD类

     HadoopRDD的主构造器如下:

class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {

if (initLocalJobConfFuncOpt.isDefined) {
sparkContext.clean(initLocalJobConfFuncOpt.get)
}


    由于所有类型的RDD都继承最原始的org.apache.spark.rdd.RDD这个抽象类,所有在调用HadoopRDD的主构造器时, 会调用RDD这个类,HadoopRDD在初始化时会调用RDD的主构造器,将SparkContext对象和Dependency类型的序列(此处为Nil,这是由于HadoopRDD是一个源RDD,没有依赖)传入。RDD抽象类的介绍详见1.4。

    创建了HadoopRDD之后,会调用SparkContext的clean方法,实现如下:

private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}


这里实际上调用了ClosureCleaner的clean方法,目的是为了清楚闭包中的不能序列化的对象,防止RDD在网络传输中反序列化失败。

1.4 RDD抽象类

    RDD类在初始化时,会初始化以下的一些变量。

abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

// shuffle操作时,会用到partitioner
@transient val partitioner: Option[Partitioner] = None

// 一个唯一的ID,标识这个RDD
val id: Int = sc.newRddId()

// RDD的名字
@transient var name: String = null

// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
// dependencies和partitions会在checkpoint时重写
private var dependencies_ : Seq[Dependency[_]] = null
@transient private var partitions_ : Array[Partition] = null

private var storageLevel: StorageLevel = StorageLevel.NONE
@transient private[spark] val creationSite = sc.getCallSite()

@transient private[spark] val scope: Option[RDDOperationScope] = {
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
}

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

}


此外,在Spark 1.3版本之后,有一个RDD的伴生对象,伴生对象实现了一些隐式转换的方法,如:implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToOrderedRDDFunctions[K : Ordering
: ClassTag, V: ClassTag](rdd: RDD[(K, V)])等等。

1.5 RDD map方法

    在使用hadoopFile方法得到HadoopRDD之后,会调用RDD的map方法,将HadoopRDD的value提取出来,作为一个新的RDD(MappartitionsRDD)的数据。map方法实现如下:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}


先调用SparkContext的clean方法,之后使用MapPartitionsRDD的主构造器来创建一个MapPartitionsRDD对象。this表示调用者,这儿指的是HadoopRDD。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {


初始化MapPartitionsRDD,调用的是RDD抽象类的一个辅助构造器,如下:

/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))


辅助构造器再调用主构造器,如下:

abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
)


从这儿可以追溯,deps->List(new OneToOneDependency(oneParent)),oneParent->prev->this->HadoopRDD。也就是说在创建MapPartitionsRDD时,会将HadoopRDD作为它的依赖。这个依赖关系存在MapPartitionsRDD,RDD中的getDependencies可以得到deps。(关于Spark中Dependency的讲解详见Spark源码解读之RDD依赖Dependency)

protected def getDependencies: Seq[Dependency[_]] = deps


使用map方法后,会将HadoopRDD转为MapPartitionsRDD。

2. RDD flatMap方法

   flatMap方法会将上一步的MapPartitionsRDD进行变换,得到一个新的MapPartitionsRDD。flatMap方法实现如下:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}


3. RDD map方法

    map方法在1.5部分已经详细讲解了。

4. RDD reduceByKey方法

    reduceByKey方法是PairRDDFunctions类型RDD特有的方法,由于上一步map方法产生的是一个MapPartitionsRDD,在这可以使用 toDebugString方法来看下RDD之间的依赖关系。



上图可以看出使用textFile方法生先后生成了HadoopRDD和MapPartitionsRDD,调用flatMap和map方法都新生成了MapPartitionsRDD,所以在调用reduceByKey方法时,会先调用所有RDD都有的一个隐式转换方法rddToPairRDDFunctions,将非PairRDDFunctions类型的RDD转为PairRDDFunctions类型的RDD。rddToPairRDDFunctions方法如下:

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}


class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging
with SparkHadoopMapReduceUtil
with Serializable
{


在构建PairRDDFunctions类时,传入的self即MapPartitionsRDD。

接下来看下reduceByKey的实现:

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
// 将传入的RDD序列的按照partitions大小进行降序排序
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
// 任何一个RDD有partitioner,则返回该partitioner
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
// 返回一个HashPartitioner,如果配置了"spark.default.parallelism"参数,
// 该HashPartitioner的partitions为配置参数的值,否则partitions为所传入RDD序列中最大的partitions
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
}


reduceByKey方法最终会调用combineByKeyWithClassTag方法,其处理步骤如下:

1. 创建Aggregator 

2. self.partitioner为MapPartitionsRDD的partitioner,而创建MapPartitionsRDD时,它的partitioner为None,因而创建ShuffledRDD。

def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}


ShuffledRDD的构造方法如下:

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {


从这儿可以看出,在构造ShuffledRDD时,并没有直接创建它的依赖(因为继承RDD时,使用RDD的主构造器,传入的deps参数为Nil)。那么ShuffledRDD的依赖是什么时候被创建的呢?其实是在getDependencies方法被调用时才创建的。

override def getDependencies: Seq[Dependency[_]] = {
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}


由此可见,本例中改ShuffledRDD的依赖为MapPartitionsRDD,依赖类型为ShuffledDependency。

下一篇将介绍Spark源码解读之Job提交
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息