您的位置:首页 > 其它

Spark源码系列(二)RDD详解

2014-06-12 00:05 549 查看

1、什么是RDD?

上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。

RDD的全名是Resilient Distributed Dataset,意思是容错的分布式数据集,每一个RDD都会有5个特征:

1、有一个分片列表。就是能被切分,和hadoop一样的,能够切分的数据才能并行计算。

2、有一个函数计算每一个分片,这里指的是下面会提到的compute函数。

3、对其他的RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。

4、可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。

5、可选:每一个分片的优先计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置。

对应着上面这几点,我们在RDD里面能找到这4个方法和1个属性,别着急,下面我们会慢慢展开说这5个东东。

//只计算一次
protected def getPartitions: Array[Partition]
//对一个分片进行计算,得出一个可遍历的结果
def compute(split: Partition, context: TaskContext): Iterator[T]
//只计算一次,计算RDD对父RDD的依赖
protected def getDependencies: Seq[Dependency[_]] = deps
//可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce
@transient val partitioner: Option[Partitioner] = None
//可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil


2、多种RDD之间的转换

下面用一个实例讲解一下吧,就拿我们常用的一段代码来讲吧,然后会把我们常用的RDD都会讲到。

val hdfsFile = sc.textFile(args(1))
val flatMapRdd = hdfsFile.flatMap(s => s.split(" "))
val filterRdd = flatMapRdd.filter(_.length == 2)
val mapRdd = filterRdd.map(word => (word, 1))
val reduce = mapRdd.reduceByKey(_ + _)


这里涉及到很多个RDD,textFile是一个HadoopRDD经过map后的MappredRDD,经过flatMap是一个FlatMappedRDD,经过filter方法之后生成了一个FilteredRDD,经过map函数之后,变成一个MappedRDD,通过隐式转换成 PairRDD,最后经过reduceByKey。

我们首先看textFile的这个方法,进入SparkContext这个方法,找到它。

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString)
}


看它的输入参数,path,TextInputFormat,LongWritable,Text,同志们联想到什么?写过mapreduce的童鞋都应该知道哈。

1、hdfs的地址

2、InputFormat的类型

3、Mapper的第一个类型

4、Mapper的第二类型

这就不难理解为什么立马就对hadoopFile后面加了一个map方法,取pair的第二个参数了,最后在shell里面我们看到它是一个MappredRDD了。

那么现在如果大家要用的不是textFile,而是一个别的hadoop文件类型,大家会不会使用hadoopFile来得到自己要得到的类型呢,不要告诉我不会哈,不会的赶紧回去复习mapreduce。

言归正传,默认的defaultMinPartitions的2太小了,我们用的时候还是设置大一点吧。

2.1 HadoopRDD

我们继续追杀下去,看看hadoopFile方法,里面我们看到它做了3个操作。

1、把hadoop的配置文件保存到广播变量里。

2、设置路径的方法

3、new了一个HadoopRDD返回

好,我们接下去看看HadoopRDD这个类吧,我们重点看看它的getPartitions、compute、getPreferredLocations。

先看getPartitions,它的核心代码如下:

val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}


它调用的是inputFormat自带的getSplits方法来计算分片,然后把分片HadoopPartition包装到到array里面返回。

这里顺便顺带提一下,因为1.0又出来一个NewHadoopRDD,它使用的是mapreduce新api的inputformat,getSplits就不要有minPartitions了,别的逻辑都是一样的,只是使用的类有点区别。

我们接下来看compute方法,它的输入值是一个Partition,返回是一个Iterator[(K, V)]类型的数据,这里面我们只需要关注2点即可。

1、把Partition转成HadoopPartition,然后通过InputSplit创建一个RecordReader

2、重写Iterator的getNext方法,通过创建的reader调用next方法读取下一个值。

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
// 是否使用外部排序,是由参数spark.shuffle.spill,默认是true
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
// 用map来去重,用update方法来更新值,如果没值的时候,返回值,如果有值的时候,通过mergeValue方法来合并
// mergeValue方法就是我们在reduceByKey里面写的那个匿名函数,在这里就是(_ + _)
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
} else {
// 用了一个外部排序的map来去重,就不停的往里面插入值即可,基本原理和上面的差不多,区别在于需要外部排序
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
combiners.iterator
}


View Code
这个就是一个很典型的按照key来做合并的方法了,我们继续看ShuffledRDD吧。

ShuffledRDD和之前的RDD很明显的特征是

1、它的依赖传了一个Nil(空列表)进去,表示它没有依赖。

2、它的compute计算方式比较特别,这个在之后的文章说,过程比较复杂。

3、它的分片默认是采用HashPartitioner,数量和前面的RDD的分片数量一样,也可以不一样,我们可以在reduceByKey的时候多传一个分片数量即可。

在new完ShuffledRDD之后又来了一遍mapPartitionsWithContext,不过调用的匿名函数变成了combineCombinersByKey。

combineCombinersByKey和combineValuesByKey的逻辑基本相同,只是输入输出的类型有区别。combineCombinersByKey只是做单纯的合并,不会对输入输出的类型进行改变,combineValuesByKey会把iter[K, V]的V值变成iter[K, C]。

case class Aggregator[K, V, C] (
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C)
  ......
}


这个方法会根据我们传进去的匿名方法的参数的类型做一个自动转换。

到这里,作业都没有真正执行,只是将RDD各种嵌套,我们通过RDD的id和类型的变化观测到这一点,RDD[1]->RDD[2]->RDD[3]......



3其它RDD

平常我们除了从hdfs上面取数据之后,我们还可能从数据库里面取数据,那怎么办呢?没关系,有个JdbcRDD!

val rdd = new JdbcRDD(
sc,
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 100, 3,
(r: ResultSet) => { r.getInt(1) }
).cache()


前几个参数大家都懂,我们重点说一下后面1, 100, 3是咋回事?

在这个JdbcRDD里面它默认我们是会按照一个long类型的字段对数据进行切分,(1,100)分别是最小值和最大值,3是分片的数量。

比如我们要一次查ID为1-1000,000的的用户,分成10个分片,我们就填(1, 1000,000, 10)即可,在sql语句里面还必须有"? <= ID AND ID <= ?"的句式,别尝试着自己造句哦!

最后是怎么处理ResultSet的方法,自己爱怎么处理怎么处理去吧。不过确实觉着用得不方便的可以自己重写一个RDD。

小结:

这一章重点介绍了各种RDD那5个特征,以及RDD之间的转换,希望大家可以对RDD有更深入的了解,下一章我们将要讲作业的运行过程,敬请关注!

岑玉海

转载请注明出处,谢谢!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: