Spark源码解读之RDD构建和转换过程
2016-12-15 18:46
495 查看
上一节讲了Spark源码解读之Context的初始化过程,发现其实一行简单的new SparkContext(sparkConf)代码,spark内部会去做很多事情。这节主要讲RDD的构建和转换过程。
RDD在任务计算时是以分区为单位的,计算函数为compute函数:def compute(split: Partition, context: TaskContext): Iterator[T]。输入参数分别为RDD对应的分区以及task运行环境。不同的RDD子类可以去实现自己的compute方法。
使用textFile创建的RDD的lineage为HadoopRDD -> MapPartitionsRDD。
一般只有涉及到从外部存储结构中读取数据时才会有优先位置,比如HadoopRDD,ShuffledRDD。
进行debug。
path用于指定文件的路径,minPartitions用于指定最小的分区数,如果不指定minPartitions,会使用defaultMinPartitions方法获得minPartitions。之后使用hadoopFile方法创建一个HadoopRDD,由于HadoopRDD是一个key-value类型的RDD,key表示偏移量,value表示具体的内容,所以需要使用RDD的map方法来得到具体的数据(这样会将HadoopRDD转为
MapPartitionsRDD)。
1.1
defaultMinPartitions方法会取defaultParallelism和2最小值作为默认的minPartitions。defaultParallelism方法如下:
仔细一看,发现调用的是TaskScheduler的defaultParallelism方法,TaskScheduler是一个trait,实际上调用的是TaskSchedulerImpl的defaultParallelism方法:
由于采用本地运行模式,所以调用的是LocalBackend类的defaultParallelism方法:
创建HadoopRDD之前,先将hadoopConfiguration进行广播,然后创建一个setInputPathsFunc方法。HadoopRDD类介绍详见1.3。
由于所有类型的RDD都继承最原始的org.apache.spark.rdd.RDD这个抽象类,所有在调用HadoopRDD的主构造器时, 会调用RDD这个类,HadoopRDD在初始化时会调用RDD的主构造器,将SparkContext对象和Dependency类型的序列(此处为Nil,这是由于HadoopRDD是一个源RDD,没有依赖)传入。RDD抽象类的介绍详见1.4。
创建了HadoopRDD之后,会调用SparkContext的clean方法,实现如下:
这里实际上调用了ClosureCleaner的clean方法,目的是为了清楚闭包中的不能序列化的对象,防止RDD在网络传输中反序列化失败。
此外,在Spark 1.3版本之后,有一个RDD的伴生对象,伴生对象实现了一些隐式转换的方法,如:implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToOrderedRDDFunctions[K : Ordering
: ClassTag, V: ClassTag](rdd: RDD[(K, V)])等等。
先调用SparkContext的clean方法,之后使用MapPartitionsRDD的主构造器来创建一个MapPartitionsRDD对象。this表示调用者,这儿指的是HadoopRDD。
初始化MapPartitionsRDD,调用的是RDD抽象类的一个辅助构造器,如下:
辅助构造器再调用主构造器,如下:
从这儿可以追溯,deps->List(new OneToOneDependency(oneParent)),oneParent->prev->this->HadoopRDD。也就是说在创建MapPartitionsRDD时,会将HadoopRDD作为它的依赖。这个依赖关系存在MapPartitionsRDD,RDD中的getDependencies可以得到deps。(关于Spark中Dependency的讲解详见Spark源码解读之RDD依赖Dependency)
使用map方法后,会将HadoopRDD转为MapPartitionsRDD。
上图可以看出使用textFile方法生先后生成了HadoopRDD和MapPartitionsRDD,调用flatMap和map方法都新生成了MapPartitionsRDD,所以在调用reduceByKey方法时,会先调用所有RDD都有的一个隐式转换方法rddToPairRDDFunctions,将非PairRDDFunctions类型的RDD转为PairRDDFunctions类型的RDD。rddToPairRDDFunctions方法如下:
在构建PairRDDFunctions类时,传入的self即MapPartitionsRDD。
接下来看下reduceByKey的实现:
reduceByKey方法最终会调用combineByKeyWithClassTag方法,其处理步骤如下:
1. 创建Aggregator
2. self.partitioner为MapPartitionsRDD的partitioner,而创建MapPartitionsRDD时,它的partitioner为None,因而创建ShuffledRDD。
ShuffledRDD的构造方法如下:
从这儿可以看出,在构造ShuffledRDD时,并没有直接创建它的依赖(因为继承RDD时,使用RDD的主构造器,传入的deps参数为Nil)。那么ShuffledRDD的依赖是什么时候被创建的呢?其实是在getDependencies方法被调用时才创建的。
由此可见,本例中改ShuffledRDD的依赖为MapPartitionsRDD,依赖类型为ShuffledDependency。
下一篇将介绍Spark源码解读之Job提交
一、 RDD概述
RDD (Resilient Distributed Dataset) ,一个弹性分布式数据集,Spark中的基本抽象。代表一个不变(只读)的、可以并行操作的元素的分区集合。Spark中原生的RDD支持从以下三种方式创建:从scala集合中创建、从文件系统中创建、现有RDD的transform操作创建。RDD主要有以下五个特点:1. 分区集合
RDD是一个分区(partition)的集合,一个RDD有一个或多个分区。分区的数量决定了并行度。使用textFile创建RDD时可以不指定分区数(采用默认的分区数),也可以自己指定分区数。
2. 计算函数以分区为单位
RDD在任务计算时是以分区为单位的,计算函数为compute函数:def compute(split: Partition, context: TaskContext): Iterator[T]。输入参数分别为RDD对应的分区以及task运行环境。不同的RDD子类可以去实现自己的compute方法。3. RDD依赖于其他RDD
每个RDD都有依赖关系(源RDD的依赖关系为空),这些依赖关系成为lineage,可以通过toDebugString方法来获得lineage。使用textFile创建的RDD的lineage为HadoopRDD -> MapPartitionsRDD。
4. key-value 类型RDD的 Partitioner
对于非key-value类型的RDD,Partitioner为None,对应key-value类型的RDD,Partitioner默认为HashPartitioner。在进行shuffle操作时,如reduceByKey,sortByKey,Partitioner决定了父RDD shuffle的输出时对应的分区中的数据是如何进行map的。5. 分区支持数据本地性
Spark在进行任务调度时,会尝试将任务分配到数据所在的机器上,从而避免了机器间的数据传输。RDD获取优先位置的方法为getPreferredLocations。一般只有涉及到从外部存储结构中读取数据时才会有优先位置,比如HadoopRDD,ShuffledRDD。
二、 实例讲解RDD构建和转换
在idea中对val WordCounts = sc.textFile("/hosts.txt") .flatMap(text => text.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _)
进行debug。
1. textFile
SparkContext的textFile方法会从HDFS或本地读取文件,然后创建一个String类型的MapPartitionsRDD。方法如下:def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
path用于指定文件的路径,minPartitions用于指定最小的分区数,如果不指定minPartitions,会使用defaultMinPartitions方法获得minPartitions。之后使用hadoopFile方法创建一个HadoopRDD,由于HadoopRDD是一个key-value类型的RDD,key表示偏移量,value表示具体的内容,所以需要使用RDD的map方法来得到具体的数据(这样会将HadoopRDD转为
MapPartitionsRDD)。
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
1.1
defaultMinPartitions方法
defaultMinPartitions方法会取defaultParallelism和2最小值作为默认的minPartitions。defaultParallelism方法如下:def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism }
仔细一看,发现调用的是TaskScheduler的defaultParallelism方法,TaskScheduler是一个trait,实际上调用的是TaskSchedulerImpl的defaultParallelism方法:
override def defaultParallelism(): Int = backend.defaultParallelism()
由于采用本地运行模式,所以调用的是LocalBackend类的defaultParallelism方法:
override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores)
1.2 hadoopFile方法
hadoopFile用于创建HadoopRDD,实现如下:def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
创建HadoopRDD之前,先将hadoopConfiguration进行广播,然后创建一个setInputPathsFunc方法。HadoopRDD类介绍详见1.3。
1.3 HadoopRDD类
HadoopRDD的主构造器如下:class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { if (initLocalJobConfFuncOpt.isDefined) { sparkContext.clean(initLocalJobConfFuncOpt.get) }
由于所有类型的RDD都继承最原始的org.apache.spark.rdd.RDD这个抽象类,所有在调用HadoopRDD的主构造器时, 会调用RDD这个类,HadoopRDD在初始化时会调用RDD的主构造器,将SparkContext对象和Dependency类型的序列(此处为Nil,这是由于HadoopRDD是一个源RDD,没有依赖)传入。RDD抽象类的介绍详见1.4。
创建了HadoopRDD之后,会调用SparkContext的clean方法,实现如下:
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) f }
这里实际上调用了ClosureCleaner的clean方法,目的是为了清楚闭包中的不能序列化的对象,防止RDD在网络传输中反序列化失败。
1.4 RDD抽象类
RDD类在初始化时,会初始化以下的一些变量。abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { // shuffle操作时,会用到partitioner @transient val partitioner: Option[Partitioner] = None // 一个唯一的ID,标识这个RDD val id: Int = sc.newRddId() // RDD的名字 @transient var name: String = null // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed // dependencies和partitions会在checkpoint时重写 private var dependencies_ : Seq[Dependency[_]] = null @transient private var partitions_ : Array[Partition] = null private var storageLevel: StorageLevel = StorageLevel.NONE @transient private[spark] val creationSite = sc.getCallSite() @transient private[spark] val scope: Option[RDDOperationScope] = { Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson) } private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None }
此外,在Spark 1.3版本之后,有一个RDD的伴生对象,伴生对象实现了一些隐式转换的方法,如:implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToOrderedRDDFunctions[K : Ordering
: ClassTag, V: ClassTag](rdd: RDD[(K, V)])等等。
1.5 RDD map方法
在使用hadoopFile方法得到HadoopRDD之后,会调用RDD的map方法,将HadoopRDD的value提取出来,作为一个新的RDD(MappartitionsRDD)的数据。map方法实现如下:def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
先调用SparkContext的clean方法,之后使用MapPartitionsRDD的主构造器来创建一个MapPartitionsRDD对象。this表示调用者,这儿指的是HadoopRDD。
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) {
初始化MapPartitionsRDD,调用的是RDD抽象类的一个辅助构造器,如下:
/** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))
辅助构造器再调用主构造器,如下:
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] )
从这儿可以追溯,deps->List(new OneToOneDependency(oneParent)),oneParent->prev->this->HadoopRDD。也就是说在创建MapPartitionsRDD时,会将HadoopRDD作为它的依赖。这个依赖关系存在MapPartitionsRDD,RDD中的getDependencies可以得到deps。(关于Spark中Dependency的讲解详见Spark源码解读之RDD依赖Dependency)
protected def getDependencies: Seq[Dependency[_]] = deps
使用map方法后,会将HadoopRDD转为MapPartitionsRDD。
2. RDD flatMap方法
flatMap方法会将上一步的MapPartitionsRDD进行变换,得到一个新的MapPartitionsRDD。flatMap方法实现如下:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }
3. RDD map方法
map方法在1.5部分已经详细讲解了。4. RDD reduceByKey方法
reduceByKey方法是PairRDDFunctions类型RDD特有的方法,由于上一步map方法产生的是一个MapPartitionsRDD,在这可以使用 toDebugString方法来看下RDD之间的依赖关系。上图可以看出使用textFile方法生先后生成了HadoopRDD和MapPartitionsRDD,调用flatMap和map方法都新生成了MapPartitionsRDD,所以在调用reduceByKey方法时,会先调用所有RDD都有的一个隐式转换方法rddToPairRDDFunctions,将非PairRDDFunctions类型的RDD转为PairRDDFunctions类型的RDD。rddToPairRDDFunctions方法如下:
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd) }
class PairRDDFunctions[K, V](self: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) extends Logging with SparkHadoopMapReduceUtil with Serializable {
在构建PairRDDFunctions类时,传入的self即MapPartitionsRDD。
接下来看下reduceByKey的实现:
/** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) } def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { // 将传入的RDD序列的按照partitions大小进行降序排序 val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse // 任何一个RDD有partitioner,则返回该partitioner for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return r.partitioner.get } // 返回一个HashPartitioner,如果配置了"spark.default.parallelism"参数, // 该HashPartitioner的partitions为配置参数的值,否则partitions为所传入RDD序列中最大的partitions if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } } }
reduceByKey方法最终会调用combineByKeyWithClassTag方法,其处理步骤如下:
1. 创建Aggregator
2. self.partitioner为MapPartitionsRDD的partitioner,而创建MapPartitionsRDD时,它的partitioner为None,因而创建ShuffledRDD。
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
ShuffledRDD的构造方法如下:
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( @transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner) extends RDD[(K, C)](prev.context, Nil) {
从这儿可以看出,在构造ShuffledRDD时,并没有直接创建它的依赖(因为继承RDD时,使用RDD的主构造器,传入的deps参数为Nil)。那么ShuffledRDD的依赖是什么时候被创建的呢?其实是在getDependencies方法被调用时才创建的。
override def getDependencies: Seq[Dependency[_]] = { List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) }
由此可见,本例中改ShuffledRDD的依赖为MapPartitionsRDD,依赖类型为ShuffledDependency。
下一篇将介绍Spark源码解读之Job提交
相关文章推荐
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- spark源码解读3之RDD中top源码解读
- Spark源码解读--任务生成和提交过程
- Spark定制班第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- Spark源码解读(2)——Worker启动过程
- Spark源码解读之RDD依赖Dependency
- 4000 Spark源码解读(6)——Shuffle过程
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- 第127课: Spark Streaming源码经典解读系列之二:Spark Streaming生成RDD
- Spark 定制版:008~Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- Spark源码的编译过程详细解读(各版本)(博主推荐)
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- Spark源码的编译过程详细解读(各版本)(博主推荐)
- Spark任务提交与执行之RDD的创建、转换及DAG构建
- Spark源码解读(4)——RDD
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考