您的位置:首页 > 其它

Spark 源码解析 ----RDD创建与本质

2014-02-16 21:21 597 查看
1 HadoopRdd

class HadoopRDD[K, V](

sc: SparkContext,

@transient conf: JobConf,

inputFormatClass: Class[_ <: InputFormat[K, V]],

keyClass: Class[K],

valueClass: Class[V],

minSplits: Int)

extends RDD[(K, V)](sc, Nil) with Logging {

// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it

private val confBroadcast = sc.broadcast(new SerializableWritable(conf))

//返回分区指针数组,如果为空新建分区

override def getPartitions: Array[Partition] = {

val env = SparkEnv.get

env.hadoop.addCredentials(conf)

val inputFormat = createInputFormat(conf)

if (inputFormat.isInstanceOf[Configurable]) {

inputFormat.asInstanceOf[Configurable].setConf(conf)

}

val inputSplits = inputFormat.getSplits(conf, minSplits)

val array = new Array[Partition](inputSplits.size)

for (i <- 0 until inputSplits.size) {

array(i) = new HadoopPartition(id, i, inputSplits(i))

}

array

}

def createInputFormat(conf: JobConf): InputFormat[K, V] = {

ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)

.asInstanceOf[InputFormat[K, V]]

}

//根据分区号返回那个分区的迭代器

override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {

val split = theSplit.asInstanceOf[HadoopPartition]

logInfo("Input split: " + split.inputSplit)

var reader: RecordReader[K, V] = null

val conf = confBroadcast.value.value

val fmt = createInputFormat(conf)

if (fmt.isInstanceOf[Configurable]) {

fmt.asInstanceOf[Configurable].setConf(conf)

}

reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.

context.addOnCompleteCallback{ () => closeIfNeeded() }

val key: K = reader.createKey()

val value: V = reader.createValue()

//此处体现函数式编程特点 返回分区的迭代器

override def getNext() = {

try {

finished = !reader.next(key, value)

} catch {

case eof: EOFException =>

finished = true

}

(key, value)

}

override def close() {

try {

reader.close()

} catch {

case e: Exception => logWarning("Exception in RecordReader.close()", e)

}

}

}

override def getPreferredLocations(split: Partition): Seq[String] = {

// TODO: Filtering out "localhost" in case of file:// URLs

val hadoopSplit = split.asInstanceOf[HadoopPartition]

hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")

}

override def checkpoint() {

// Do nothing. Hadoop RDD should not be checkpointed.

}

def getConf: Configuration = confBroadcast.value.value

}

2 NextIterator (分区迭代器)

/** Provides a basic/boilerplate Iterator implementation. */

private[spark] abstract class NextIterator[U] extends Iterator[U] {



private var gotNext = false

private var nextValue: U = _

private var closed = false

protected var finished = false

/**

* Method for subclasses to implement to provide the next element.

*

* If no next element is available, the subclass should set `finished`

* to `true` and may return any value (it will be ignored).

*

* This convention is required because `null` may be a valid value,

* and using `Option` seems like it might create unnecessary Some/None

* instances, given some iterators might be called in a tight loop.

*

* @return U, or set 'finished' when done

*/

protected def getNext(): U

/**

* Method for subclasses to implement when all elements have been successfully

* iterated, and the iteration is done.

*

* <b>Note:</b> `NextIterator` cannot guarantee that `close` will be

* called because it has no control over what happens when an exception

* happens in the user code that is calling hasNext/next.

*

* Ideally you should have another try/catch, as in HadoopRDD, that

* ensures any resources are closed should iteration fail.

*/

protected def close()

/**

* Calls the subclass-defined close method, but only once.

*

* Usually calling `close` multiple times should be fine, but historically

* there have been issues with some InputFormats throwing exceptions.

*/

def closeIfNeeded() {

if (!closed) {

close()

closed = true

}

}

override def hasNext: Boolean = {

if (!finished) {

if (!gotNext) {

nextValue = getNext()

if (finished) {

closeIfNeeded()

}

gotNext = true

}

}

!finished

}

override def next(): U = {

if (!hasNext) {

throw new NoSuchElementException("End of stream")

}

gotNext = false

nextValue

}

}

3RDD

* Internally, each RDD is characterized by five main properties:

*

* - A list of partitions

* - A function for computing each split

* - A list of dependencies on other RDDs

* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for

* an HDFS file)

*

* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD

* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for

* reading data from a new storage system) by overriding these functions. Please refer to the

* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details

* on RDD internals.

*/

abstract class RDD[T: ClassManifest](

@transient private var sc: SparkContext,

@transient private var deps: Seq[Dependency[_]]

) extends Serializable with Logging {

/** Construct an RDD with just a one-to-one dependency on one parent */

def this(@transient oneParent: RDD[_]) =

this(oneParent.context , List(new OneToOneDependency(oneParent)))

// =======================================================================

// Methods that should be implemented by subclasses of RDD

// =======================================================================

/** Implemented by subclasses to compute a given partition. */

def compute(split: Partition, context: TaskContext): Iterator[T]

/**

* Implemented by subclasses to return the set of partitions in this RDD. This method will only

* be called once, so it is safe to implement a time-consuming computation in it.

*/

protected def getPartitions: Array[Partition]

/**

* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only

* be called once, so it is safe to implement a time-consuming computation in it.

*/

protected def getDependencies: Seq[Dependency[_]] = deps

/** Optionally overridden by subclasses to specify placement preferences. */

protected def getPreferredLocations(split: Partition): Seq[String] = Nil

/** Optionally overridden by subclasses to specify how they are partitioned. */

val partitioner: Option[Partitioner] = None

// =======================================================================

// Methods and fields available on all RDDs

// =======================================================================

/** The SparkContext that created this RDD. */

def sparkContext: SparkContext = sc

/** A unique ID for this RDD (within its SparkContext). */

val id: Int = sc.newRddId()

/** A friendly name for this RDD */

var name: String = null

/** Assign a name to this RDD */

def setName(_name: String) = {

name = _name

this

}

/**

* Get the array of partitions of this RDD, taking into account whether the

* RDD is checkpointed or not. 此处用于获取分区数组 看hadoopRDD中的实现

*/

final def partitions: Array[Partition] = {

checkpointRDD.map(_.partitions).getOrElse {

if (partitions_ == null) {

partitions_ = getPartitions

}

partitions_

}

}

4第一类task shufflemaptask (此中用到分区)

private[spark] class ShuffleMapTask(

stageId: Int,

var rdd: RDD[_],

var dep: ShuffleDependency[_,_], //此处为传入的依赖关系。。待研究

var partition: Int, (此处体现只针对每个分区新建一个task,task与分区是一一对应的关系)

@transient private var locs: Seq[TaskLocation])

extends Task[MapStatus](stageId)

with Externalizable

with Logging {

var split = if (rdd == null) null else rdd.partitions(partition) //根据分区号找到那个分区的指针

override def run(attemptId: Long): MapStatus = {

val numOutputSplits = dep.partitioner.numPartitions

val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false)

metrics = Some(taskContext.taskMetrics)

val blockManager = SparkEnv.get.blockManager

var shuffle: ShuffleBlocks = null

var buckets: ShuffleWriterGroup = null

try {

// Obtain all the block writers for shuffle blocks.

val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)

shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)

buckets = shuffle.acquireWriters(partition)

// Write the map output to its associated buckets.此处关键将那个分区的每个元素写入到应该写入的bucket中

for (elem <- rdd.iterator(split, taskContext)) {

val pair = elem.asInstanceOf[Product2[Any, Any]]

val bucketId = dep.partitioner.getPartition(pair._1)

buckets.writers(bucketId).write(pair)

}

// Commit the writes. Get the size of each bucket block (total block size).

var totalBytes = 0L

val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>

writer.commit()

writer.close()

val size = writer.size()

totalBytes += size

MapOutputTracker.compressSize(size)

}

// Update shuffle metrics.

val shuffleMetrics = new ShuffleWriteMetrics

shuffleMetrics.shuffleBytesWritten = totalBytes

metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

return new MapStatus(blockManager.blockManagerId, compressedSizes)

} catch { case e: Exception =>

// If there is an exception from running the task, revert the partial writes

// and throw the exception upstream to Spark.

if (buckets != null) {

buckets.writers.foreach(_.revertPartialWrites())

}

throw e

} finally {

// Release the writers back to the shuffle block manager.

if (shuffle != null && buckets != null) {

shuffle.releaseWriters(buckets)

}

// Execute the callbacks on task completion.

taskContext.executeOnCompleteCallbacks()

}

}

5第二类task resulttask

private[spark] class ResultTask[T, U](

stageId: Int,

var rdd: RDD[T],

var func: (TaskContext, Iterator[T]) => U, //此处为传入的函数操作

var partition: Int, //此处也是体现task 和split一一对应关系

@transient locs: Seq[TaskLocation],

var outputId: Int)

extends Task[U](stageId) with Externalizable {

def this() = this(0, null, null, 0, null, 0)

var split = if (rdd == null) {

null

} else {

rdd.partitions(partition)

}

@transient private val preferredLocs: Seq[TaskLocation] = {

if (locs == null) Nil else locs.toSet.toSeq

}

override def run(attemptId: Long): U = {

val context = new TaskContext(stageId, partition, attemptId, runningLocally = false)

metrics = Some(context.taskMetrics)

try {

func(context, rdd.iterator(split, context))

} finally {

context.executeOnCompleteCallbacks()

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: