您的位置:首页 > 其它

Spark 源码阅读(1)——WordCount程序执行流程

2018-03-05 17:21 489 查看

1.RDD 源码解析

主要方法属性:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)RDD是一个抽象类,继承类可以有多种实现;第1个参数SparkContext,@transient表示不需要序列化第2个参数deps,表示依赖关系
abstract class RDD[T: ClassTag](
   @transient private var _sc: SparkContext,
   @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
 //该方法只会被调用一次。由子类实现,返回这个RDD的所有partition。
protected def getPartitions: Array[Partition]
//该方法只会被调用一次。计算该RDD和父RDD的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 对分区进行计算,返回一个可遍历的结果
def compute(split: Partition, context: TaskContext): Iterator[T]
//可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
//可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce
@transient val partitioner: Option[Partitioner] = None
}

2.以wordCount程序举例

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
​
object WordCount {
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("source-code")
   val sc = new SparkContext(conf)
​
   val textFile = sc.textFile("hdfs://...")
   val counts = textFile.flatMap(line => line.split(" "))
     .filter(_.length >= 2)
     .map(word => (word, 1))
     .reduceByKey(_ + _)
   counts.saveAsTextFile("hdfs://...")
}
}
​




2.1 textFile源码

 /**
  * Read a text file from HDFS, a local file system (available on all nodes), or any
  * Hadoop-supported file system URI, and return it as an RDD of Strings.
  */
 def textFile(
     path: String,
     minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
   assertNotStopped()
   hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
     minPartitions).map(pair => pair._2.toString)
}
​
path: 是任何hadoop支持的文件系统uriminPartitions:设置的最小分区

2.2 hadoopFile源码

def hadoopFile[K, V](
     path: String,
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
     minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
   assertNotStopped()
   // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
   val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
   val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
   new HadoopRDD(
     this,
     confBroadcast,
     Some(setInputPathsFunc),
     inputFormatClass,
     keyClass,
     valueClass,
     minPartitions).setName(path)
}
返回了一个HadoopRDD

2.3 HadoopRDD源码

hadoopRDD中主要的方法:getPartitions
override def getPartitions: Array[Partition] = {
 val jobConf = getJobConf()
 // add the credentials here as this can be called before SparkContext initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
 if (inputFormat.isInstanceOf[Configurable]) {
   inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
 // 创建切片
 val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
 val array = new Array[Partition](inputSplits.size)
 // 根据切片创建分区Array并返回
 for (i <- 0 until inputSplits.size) {
   array(i) = new HadoopPartition(id, i, inputSplits(i))
}
 array
}computeoverride def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
   val iter = new NextIterator[(K, V)] {
 // 将theSplit转为HadoopPartition
     val split = theSplit.asInstanceOf[HadoopPartition]
     logInfo("Input split: " + split.inputSplit)
     val jobConf = getJobConf()

     val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

     // 找到一个函数,该函数将返回该线程读取的文件系统字节。
     // 创建RecordReader,因为RecordReader的构造函数可能读取一些字节
     val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
       split.inputSplit.value match {
         case _: FileSplit | _: CombineFileSplit =>
           SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
         case _ => None
      }
    }
     inputMetrics.setBytesReadCallback(bytesReadCallback)

     var reader: RecordReader[K, V] = null
     val inputFormat = getInputFormat(jobConf)
     HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
       context.stageId, theSplit.index, context.attemptNumber, jobConf)
     reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

     // 注册一个任务完成回调,以关闭输入流。
     context.addTaskCompletionListener{ context => closeIfNeeded() }
     val key: K = reader.createKey()
     val value: V = reader.createValue()

     override def getNext(): (K, V) = {
       try {
         finished = !reader.next(key, value)
      } catch {
         case eof: EOFException =>
           finished = true
      }
       if (!finished) {
         inputMetrics.incRecordsRead(1)
      }
      (key, value)
    }
  }
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
   case Some(c) =>
     try {
       val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
       val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
       Some(HadoopRDD.convertSplitLocationInfo(infos))
     } catch {
       case e: Exception =>
         logDebug("Failed to use InputSplitWithLocations.", e)
         None
     }
   case None => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: