您的位置:首页 > 其它

job触发流程原理剖析与源码分析

2017-10-14 11:35 453 查看
以wordcount流程解析

val lines = sc.textFile()

def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
//hadoopFile()方法的调用,拿到Hadoop的配置文件,创建HadoopRDD,广播变量
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
//执行map算子操作,剔除key,只保留Value,获得一个MapPartionsRDD。
//MapPartionsRDD里面就是一行一行的文本数据
minPartitions).map(pair => pair._2.toString).setName(path)
}


val words = lines.flatMap(line => line.split(” “)) val pairs =

words.map(word => (word, 1))

// 其实RDD里是没有reduceByKey的,因此对RDD调用reduceByKey()方法的时候,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions。
// 接着会调用PairRDDFunctions中的reduceByKey()方法


val counts = pairs.reduceByKey(_ + _)

counts.foreach(count => println(count._1 + “: ” + count._2))

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//调用SparkContext之前初始化创建的DAGScheduler的Runjob的方法。
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: