Spark 源码阅读(1)——WordCount程序执行流程
2018-03-05 17:21
489 查看
1.RDD 源码解析
主要方法属性:- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)RDD是一个抽象类,继承类可以有多种实现;第1个参数SparkContext,@transient表示不需要序列化第2个参数deps,表示依赖关系
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
//该方法只会被调用一次。由子类实现,返回这个RDD的所有partition。
protected def getPartitions: Array[Partition]
//该方法只会被调用一次。计算该RDD和父RDD的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 对分区进行计算,返回一个可遍历的结果
def compute(split: Partition, context: TaskContext): Iterator[T]
//可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
//可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce
@transient val partitioner: Option[Partitioner] = None
}
2.以wordCount程序举例
import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("source-code") val sc = new SparkContext(conf) val textFile = sc.textFile("hdfs://...") val counts = textFile.flatMap(line => line.split(" ")) .filter(_.length >= 2) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...") } }
2.1 textFile源码
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) } path: 是任何hadoop支持的文件系统uriminPartitions:设置的最小分区
2.2 hadoopFile源码
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }返回了一个HadoopRDD
2.3 HadoopRDD源码
hadoopRDD中主要的方法:getPartitionsoverride def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
// 创建切片
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
// 根据切片创建分区Array并返回
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}computeoverride def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
// 将theSplit转为HadoopPartition
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
// 找到一个函数,该函数将返回该线程读取的文件系统字节。
// 创建RecordReader,因为RecordReader的构造函数可能读取一些字节
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
}
inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// 注册一个任务完成回调,以关闭输入流。
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
(key, value)
}
}
override def getPreferredLocations(split: Partition): Seq[String] = { val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match { case Some(c) => try { val lsplit = c.inputSplitWithLocationInfo.cast(hsplit) val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]] Some(HadoopRDD.convertSplitLocationInfo(infos)) } catch { case e: Exception => logDebug("Failed to use InputSplitWithLocations.", e) None } case None => None } locs.getOrElse(hsplit.getLocations.filter(_ != "localhost")) }
相关文章推荐
- 从源码剖析一个Spark WordCount Job执行的全过程
- 从源码剖析一个Spark WordCount Job执行的全过程
- SparkStreaming---WordCount程序
- Google test源码阅读(一):基本执行流程
- 启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计
- Spark源码编译并在YARN上运行WordCount实例
- 第一个spark scala程序——wordcount
- Hadoop示例程序WordCount源码学习
- Spark streaming 执行流程源码图
- Spark 源码阅读(2)——spark-submit 提交流程
- 从WordCount看MapReduce框架执行流程
- [转] 用SBT编译Spark的WordCount程序
- Spark源码分析(1) 从WordCount示例看Spark延迟计算原理
- Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程
- 在Spark上运行WordCount程序
- 以wordcount理解spark的执行过程
- 3-2 wordcount执行流程 wordcount流程
- 将java开发的wordcount程序提交到spark集群上运行
- 用SBT编译Spark的WordCount程序