您的位置:首页 > 其它

Spark源码阅读笔记(RDD)(一)

2015-07-05 14:36 429 查看

Spark源码阅读笔记(RDD)(一)

RDD(Resilient Distributed Dataset,弹性分布式数据集)是spark最基本的抽象,本质上是一个不可变的集合。该集合上的元素被划分到不同的分区,分区是存取的基本单元,被存储在集群或本地机器的内存或磁盘中,作用在RDD上的函数可以在不同的分区上并行计算。

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

RDD构造

abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
)


def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))


RDD属性

_sc

类型:SparkContext

描述:spark应用程序上下文信息

deps

类型:Seq[Dependency[_]]

描述:该RDD依赖的RDD

partitioner

类型:Partitioner

描述:键值对RDD(key-value pair RDD)中键的分区算法,有HashPartitioner,CustomPartitioner,RangePartitioner等

id

类型:Int

描述:RDD的id,递增,由AtomicInteger.getAndIncrement()生成

RDD属性之依赖(Dependency)

RDD有两种类型的依赖:窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。

窄依赖表示子RDD的每个分区只依赖很少数量的父RDD的分区,窄依赖使计算可以以流式的方式执行。窄依赖本质上是指父RDD转换为子RDD时不需要经过洗牌(shulff)的过程。

Each partition of the child RDD depends on a small number

of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.

宽依赖(其实应该为shulff依赖,ShuffleDependency)表示父RDD需要经过洗牌(shulff)才能转换为子RDD的依赖,即子RDD中的每个分区依赖父RDD的所有分区。

Represents a dependency on the output of a shuffle stage.

依赖(Dependency)的抽象类

/**
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}


窄依赖(NarrowDependency)抽象类

/**
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd
}


窄依赖有许多实现:OneToOneDependency,RangeDependency,PruneDependency等

OneToOneDependency:表示子RDD分区和父RDD分区一一对应的依赖关系

/**
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}


RangeDependency:表示子RDD和父RDD的分区在特定的区间一一对应的依赖关系,[outStart,outStart+length] <==> [inStart,inStart+length]

/**
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int) = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}


宽依赖(ShuffleDependency)

/**
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
*                   the default serializer, as specified by `spark.serializer` config option, will
*                   be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}


RDD属性之Partitioner

Partitioner表示键值对RDD(key-value pair RDD)中根据键(key)计算分区值的算法。该算法在洗牌(shulff)阶段需要用到。

An object that defines how the elements in a key-value pair RDD are partitioned by key. Maps each key to a partition ID, from 0 to
numPartitions - 1
.

Partitioner抽象类

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}


Partitioner有许多实现:HashPartitioner,CustomPartitioner,RangePartitioner等

HashPartitioner实现

/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions

def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}

override def hashCode: Int = numPartitions
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: