Spark 源码解析 ----RDD创建与本质
2014-02-16 21:21
597 查看
1 HadoopRdd
class HadoopRDD[K, V](
sc: SparkContext,
@transient conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
//返回分区指针数组,如果为空新建分区
override def getPartitions: Array[Partition] = {
val env = SparkEnv.get
env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
}
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
def createInputFormat(conf: JobConf): InputFormat[K, V] = {
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
}
//根据分区号返回那个分区的迭代器
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
if (fmt.isInstanceOf[Configurable]) {
fmt.asInstanceOf[Configurable].setConf(conf)
}
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
//此处体现函数式编程特点 返回分区的迭代器
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
}
override def close() {
try {
reader.close()
} catch {
case e: Exception => logWarning("Exception in RecordReader.close()", e)
}
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
// TODO: Filtering out "localhost" in case of file:// URLs
val hadoopSplit = split.asInstanceOf[HadoopPartition]
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
}
override def checkpoint() {
// Do nothing. Hadoop RDD should not be checkpointed.
}
def getConf: Configuration = confBroadcast.value.value
}
2 NextIterator (分区迭代器)
/** Provides a basic/boilerplate Iterator implementation. */
private[spark] abstract class NextIterator[U] extends Iterator[U] {
private var gotNext = false
private var nextValue: U = _
private var closed = false
protected var finished = false
/**
* Method for subclasses to implement to provide the next element.
*
* If no next element is available, the subclass should set `finished`
* to `true` and may return any value (it will be ignored).
*
* This convention is required because `null` may be a valid value,
* and using `Option` seems like it might create unnecessary Some/None
* instances, given some iterators might be called in a tight loop.
*
* @return U, or set 'finished' when done
*/
protected def getNext(): U
/**
* Method for subclasses to implement when all elements have been successfully
* iterated, and the iteration is done.
*
* <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
* called because it has no control over what happens when an exception
* happens in the user code that is calling hasNext/next.
*
* Ideally you should have another try/catch, as in HadoopRDD, that
* ensures any resources are closed should iteration fail.
*/
protected def close()
/**
* Calls the subclass-defined close method, but only once.
*
* Usually calling `close` multiple times should be fine, but historically
* there have been issues with some InputFormats throwing exceptions.
*/
def closeIfNeeded() {
if (!closed) {
close()
closed = true
}
}
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
closeIfNeeded()
}
gotNext = true
}
}
!finished
}
override def next(): U = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}
}
3RDD
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
abstract class RDD[T: ClassManifest](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================
/** Implemented by subclasses to compute a given partition. */
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/** Optionally overridden by subclasses to specify placement preferences. */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
val partitioner: Option[Partitioner] = None
// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================
/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
/** A friendly name for this RDD */
var name: String = null
/** Assign a name to this RDD */
def setName(_name: String) = {
name = _name
this
}
/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not. 此处用于获取分区数组 看hadoopRDD中的实现
*/
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
}
partitions_
}
}
4第一类task shufflemaptask (此中用到分区)
private[spark] class ShuffleMapTask(
stageId: Int,
var rdd: RDD[_],
var dep: ShuffleDependency[_,_], //此处为传入的依赖关系。。待研究
var partition: Int, (此处体现只针对每个分区新建一个task,task与分区是一一对应的关系)
@transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId)
with Externalizable
with Logging {
var split = if (rdd == null) null else rdd.partitions(partition) //根据分区号找到那个分区的指针
override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(taskContext.taskMetrics)
val blockManager = SparkEnv.get.blockManager
var shuffle: ShuffleBlocks = null
var buckets: ShuffleWriterGroup = null
try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partition)
// Write the map output to its associated buckets.此处关键将那个分区的每个元素写入到应该写入的bucket中
for (elem <- rdd.iterator(split, taskContext)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair)
}
// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.size()
totalBytes += size
MapOutputTracker.compressSize(size)
}
// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
return new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (buckets != null) {
buckets.writers.foreach(_.revertPartialWrites())
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && buckets != null) {
shuffle.releaseWriters(buckets)
}
// Execute the callbacks on task completion.
taskContext.executeOnCompleteCallbacks()
}
}
5第二类task resulttask
private[spark] class ResultTask[T, U](
stageId: Int,
var rdd: RDD[T],
var func: (TaskContext, Iterator[T]) => U, //此处为传入的函数操作
var partition: Int, //此处也是体现task 和split一一对应关系
@transient locs: Seq[TaskLocation],
var outputId: Int)
extends Task[U](stageId) with Externalizable {
def this() = this(0, null, null, 0, null, 0)
var split = if (rdd == null) {
null
} else {
rdd.partitions(partition)
}
@transient private val preferredLocs: Seq[TaskLocation] = {
if (locs == null) Nil else locs.toSet.toSeq
}
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
class HadoopRDD[K, V](
sc: SparkContext,
@transient conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
//返回分区指针数组,如果为空新建分区
override def getPartitions: Array[Partition] = {
val env = SparkEnv.get
env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
}
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
def createInputFormat(conf: JobConf): InputFormat[K, V] = {
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
}
//根据分区号返回那个分区的迭代器
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
if (fmt.isInstanceOf[Configurable]) {
fmt.asInstanceOf[Configurable].setConf(conf)
}
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
//此处体现函数式编程特点 返回分区的迭代器
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
}
override def close() {
try {
reader.close()
} catch {
case e: Exception => logWarning("Exception in RecordReader.close()", e)
}
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
// TODO: Filtering out "localhost" in case of file:// URLs
val hadoopSplit = split.asInstanceOf[HadoopPartition]
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
}
override def checkpoint() {
// Do nothing. Hadoop RDD should not be checkpointed.
}
def getConf: Configuration = confBroadcast.value.value
}
2 NextIterator (分区迭代器)
/** Provides a basic/boilerplate Iterator implementation. */
private[spark] abstract class NextIterator[U] extends Iterator[U] {
private var gotNext = false
private var nextValue: U = _
private var closed = false
protected var finished = false
/**
* Method for subclasses to implement to provide the next element.
*
* If no next element is available, the subclass should set `finished`
* to `true` and may return any value (it will be ignored).
*
* This convention is required because `null` may be a valid value,
* and using `Option` seems like it might create unnecessary Some/None
* instances, given some iterators might be called in a tight loop.
*
* @return U, or set 'finished' when done
*/
protected def getNext(): U
/**
* Method for subclasses to implement when all elements have been successfully
* iterated, and the iteration is done.
*
* <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
* called because it has no control over what happens when an exception
* happens in the user code that is calling hasNext/next.
*
* Ideally you should have another try/catch, as in HadoopRDD, that
* ensures any resources are closed should iteration fail.
*/
protected def close()
/**
* Calls the subclass-defined close method, but only once.
*
* Usually calling `close` multiple times should be fine, but historically
* there have been issues with some InputFormats throwing exceptions.
*/
def closeIfNeeded() {
if (!closed) {
close()
closed = true
}
}
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
closeIfNeeded()
}
gotNext = true
}
}
!finished
}
override def next(): U = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}
}
3RDD
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
abstract class RDD[T: ClassManifest](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================
/** Implemented by subclasses to compute a given partition. */
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/** Optionally overridden by subclasses to specify placement preferences. */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
val partitioner: Option[Partitioner] = None
// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================
/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
/** A friendly name for this RDD */
var name: String = null
/** Assign a name to this RDD */
def setName(_name: String) = {
name = _name
this
}
/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not. 此处用于获取分区数组 看hadoopRDD中的实现
*/
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
}
partitions_
}
}
4第一类task shufflemaptask (此中用到分区)
private[spark] class ShuffleMapTask(
stageId: Int,
var rdd: RDD[_],
var dep: ShuffleDependency[_,_], //此处为传入的依赖关系。。待研究
var partition: Int, (此处体现只针对每个分区新建一个task,task与分区是一一对应的关系)
@transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId)
with Externalizable
with Logging {
var split = if (rdd == null) null else rdd.partitions(partition) //根据分区号找到那个分区的指针
override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(taskContext.taskMetrics)
val blockManager = SparkEnv.get.blockManager
var shuffle: ShuffleBlocks = null
var buckets: ShuffleWriterGroup = null
try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partition)
// Write the map output to its associated buckets.此处关键将那个分区的每个元素写入到应该写入的bucket中
for (elem <- rdd.iterator(split, taskContext)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair)
}
// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.size()
totalBytes += size
MapOutputTracker.compressSize(size)
}
// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
return new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (buckets != null) {
buckets.writers.foreach(_.revertPartialWrites())
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && buckets != null) {
shuffle.releaseWriters(buckets)
}
// Execute the callbacks on task completion.
taskContext.executeOnCompleteCallbacks()
}
}
5第二类task resulttask
private[spark] class ResultTask[T, U](
stageId: Int,
var rdd: RDD[T],
var func: (TaskContext, Iterator[T]) => U, //此处为传入的函数操作
var partition: Int, //此处也是体现task 和split一一对应关系
@transient locs: Seq[TaskLocation],
var outputId: Int)
extends Task[U](stageId) with Externalizable {
def this() = this(0, null, null, 0, null, 0)
var split = if (rdd == null) {
null
} else {
rdd.partitions(partition)
}
@transient private val preferredLocs: Seq[TaskLocation] = {
if (locs == null) Nil else locs.toSet.toSeq
}
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
相关文章推荐
- spark源码剖析--RDD创建和本质
- 第46课:Spark中的新解析引擎Catalyst源码初探 SQLContext、Catalog、SqlParser 、analyzer、optimizer、QueryExecution、RDD
- 第51课:Spark中的新解析引擎Catalyst源码SQL最终转化为RDD具体实现
- spark中cache和persist的区别,rdd缓存源码解析
- spark源码阅读笔记RDD(七) RDD的创建、读取和保存
- 第225讲:Spark Shuffle Pluggable框架SortShuffle解析以及创建源码详解
- Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]
- spark rdd 源码解析
- Spark2.x---1. RDD源码解析
- 51:Spark中的新解析引擎Catalyst源码SQL最终转化为RDD具体实现
- [spark] RDD缓存源码解析
- 第51课: Spark中的新解析引擎Catalyst源码SQL最终转化为RDD具体实现
- Spark源码解析:RDD
- 第62讲:Scala中上下文界定内幕中的隐式参数与隐式参数的实战详解及其在Spark中的应用源码解析学习笔记
- 一起学Spark(3) -- RDD介绍和创建
- Scala中Variance代码实战及其在Spark中的应用源码解析之Scala学习笔记-40
- Fresco源码解析 - 创建一个ImagePipeline(一)
- Spark TaskScheduler 功能及源码解析
- Spark 源码分析 -- RDD
- Spring源码解析(七)——实例创建(中)