6.job触发流程原理剖析与源码分析
2017-06-22 18:54
375 查看
从我们编写的一个小的spark demo程序开始 :
在SparkContext中textFile方法源码如下:
这其中的原因已经在代码中说明 , 这其中会调用hadoopFile方法 , 源码如下 :
对于创建好的HadoopRDD并经过flatMap算子操作之后形成的算子最后调用reduceByKey的时候会经过一到隐式转换 , 因为在RDD中是没有reduceByKey方法的,
因此在调用reduceByKey时候其实是调用如下的方法 :
从上面的源码中可以看出最后真正的reduceBykey的方法在PairRDDFunctions中 , 源码如下 :
最后 , 不管经过多少transformation算子操作 , 若是没有action的算子操作的话, 那么是不会运行transformation算子的 , 因此foreach方法其实就是触发上面所有算子的操作 ,
而RDD的fereach的源码如下 :
最后真正调用的就是SparkContext的runjob方法 进行真正的任务运行 :
其实SparkContext中运行job的组件就是之前讲到的DAGScheduler了 , 运行一个job就需要stage的划分了 , 下面就是stage算法划分了
val lines = sparkContext.textFile("")
val words = lines.flatMap(line => line.split("\t"))
val pairs = words.map(word => (word,1))
val counts = pairs.reduceByKey(_+_)
counts.foreach(count +. println(count_1+"_"+count_2))
在SparkContext中textFile方法源码如下:
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*
*首先HadoopFile方法的调用会创建一个HadoopRDD , 该RDD中的元素就是一个一个的(key,value),其中key就是文本行的行号,value就是一行的文本值
*然后调用HadoopRDD的map()方法 , 该方法就会将HadoopRDD中的key移除掉 , 只保留value
*最后就形成一个只包含文本行的MapPartitionRDD
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
这其中的原因已经在代码中说明 , 这其中会调用hadoopFile方法 , 源码如下 :
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
*'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
*record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
*operation will create many references to the same object.
*If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
*copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
// 将Hadoop的配置信息作为广播变量 , 方便在每个节点上都可以读取到相同的Hadoop配置信息
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
// 创建HadoopRDD对象
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
对于创建好的HadoopRDD并经过flatMap算子操作之后形成的算子最后调用reduceByKey的时候会经过一到隐式转换 , 因为在RDD中是没有reduceByKey方法的,
因此在调用reduceByKey时候其实是调用如下的方法 :
/**
*其实在RDD里面是没有reduceByKey的 , 因此对RDD调用reduceByKey()方法时会触发scala的隐式转换;此时就会在作用域内寻找隐式转换
*因此就会在RDD中找到rddToPairRDDFunction()方法,然后调用RDD转换为PairRDDFunction , 在PairRDDFunction中就会调用reduceByKey方法
*/
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
从上面的源码中可以看出最后真正的reduceBykey的方法在PairRDDFunctions中 , 源码如下 :
/**
*Merge the values for each key using an associative reduce function. This will also perform
*the merging locally on each mapper before sending results to a reducer, similarly to a
*"combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
/**
*Merge the values for each key using an associative reduce function. This will also perform
*the merging locally on each mapper before sending results to a reducer, similarly to a
*"combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
reduceByKey(new HashPartitioner(numPartitions), func)
}
/**
*Merge the values for each key using an associative reduce function. This will also perform
*the merging locally on each mapper before sending results to a reducer, similarly to a
*"combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
*parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
最后 , 不管经过多少transformation算子操作 , 若是没有action的算子操作的话, 那么是不会运行transformation算子的 , 因此foreach方法其实就是触发上面所有算子的操作 ,
而RDD的fereach的源码如下 :
/**
*Applies a function f to all elements of this RDD.
*若是一个RDD调用了action的算子 , 比如foreach方法 , 那么其实就会触发job的运行 , 最后其实就会调用SparkContext的runjob方法
*/
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
// 调用SparkContext的runjob方法
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
最后真正调用的就是SparkContext的runjob方法 进行真正的任务运行 :
/**
*Run a function on a given set of partitions in an RDD and pass the results to the given
*handler function. This is the main entry point for all actions in Spark. The allowLocal
*flag specifies whether the scheduler can run the computation on the driver rather than
*shipping it out to the cluster, for short actions like first().
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//最重要的就是这一行代码了 , 调用SparkContext中的DAGScheduler组件运行一个job
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
其实SparkContext中运行job的组件就是之前讲到的DAGScheduler了 , 运行一个job就需要stage的划分了 , 下面就是stage算法划分了
相关文章推荐
- job触发流程原理剖析与源码分析
- Spark2.2 job触发流程原理剖析与源码分析
- 阿里巴巴的Vlayout框架源码原理详解(第一篇流程分析)
- spring Quartz 源码分析--触发器类SimpleTriggerBean源码剖析
- Hbase 源码分析4 - Get 流程及rpc原理
- PureMVC学习系列-从源码深度剖析PureMVC(核心组件工作流程及原理)
- boost.asio源码剖析(三) ---- 流程分析
- Spark内核源码深度剖析:SparkContext原理剖析与源码分析
- BlockManager原理剖析与源码分析
- Master原理剖析与源码分析:主备切换机制原理剖析与源码分析
- Master原理剖析与源码分析:资源调度机制源码分析(schedule(),两种资源调度算法)
- Spring MVC原理(二)请求处理流程源码分析
- CacheManager原理剖析与源码分析
- Spark的job触发流程原理与stage划分算法分析
- Spark内核源码深度剖析:SparkContext原理剖析与源码分析
- Master原理剖析与源码分析:注册机制原理剖析与源码分析
- DAGScheduler原理剖析与源码分析
- Spark源码之路(二):Master原理剖析与源码分析
- spark ml 算法原理剖析以及具体的源码实现分析
- BlockManager原理剖析与源码分析