Spark核心RDD:Sort排序详解
2016-10-26 23:27
666 查看
1.sortByKey
无可非议sortByKey是Spark的最常用的排序,简单的案例暂且跳过,下面给一个非简单的案例,进入排序之旅对下面简单元祖,要求先按元素1升序,若元素1相同,则再按元素3升序
(1, 6, 3), (2, 3, 3), (1, 1, 2), (1, 3, 5), (2, 1, 2)
提示:sortByKey对于key是单个元素排序很简单,如果key是元组如(X1,X2,X3.....),它会先按照X1排序,若X1相同,则在根据X2排序,依次类推...
由上面的分析,我们可以给出如下的代码:
val conf = new SparkConf() val sc = new SparkContext(conf) val array = Array((1, 6, 3), (2, 3, 3), (1, 1, 2), (1, 3, 5), (2, 1, 2)) val rdd1 = sc.parallelize(array) //设置元素(e1,e3)为key,value为原来的整体 val rdd2 = rdd1.map(f => ((f._1, f._3), f)) //利用sortByKey排序的对key的特性 val rdd3 = rdd2.sortByKey() val rdd4 = rdd3.values.collect
结果:
rdd4: Array[(Int, Int, Int)] = Array((1,1,2), (1,6,3), (1,3,5), (2,1,2), (2,3,3))
2.sortBy
SortBy其实是SortBykey的加强版,比如上面的功能可以使用这个函数实现val rdd2=rdd1.sortBy(f=>(f._1,f._3)).collect看上去是不是很神奇,其实sortBy内部帮我们做的,就是我上面写的代码。下面看下SortBy的源码:
def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values }sortBy先调用keyBy函数,而keyBy的功能很简单,key为用户制定,比如上面的f => ((f._1, f._3),value为原始值:
def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanedF = sc.clean(f) map(x => (cleanedF(x), x)) }最后在调用sortByKey函数,和上面的神似有木有..
3.Ordering
Ordering在Spark的排序应用中随处可见,比如上面的SortBy它就有一个隐式参数implicit ord: Ordering[K],如下摘抄自Scala提供AIP对简单的类型排序, quickSort 这里使用了高阶函数的柯里化, 为第二个括号为Ordering类型的隐式参数
import scala.util.Sorting val pairs = Array(("a", 5, 2), ("c", 3, 1), ("b", 1, 3)) // sort by 2nd element Sorting.quickSort(pairs)(Ordering.by[(String, Int, Int), Int](_._2)) // sort by the 3rd element, then 1st Sorting.quickSort(pairs)(Ordering[(Int, String)].on(x => (x._3, x._1)))对复杂类型的排序
import scala.util.Sorting case class Person(name:String, age:Int) val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19)) // sort by age object AgeOrdering extends Ordering[Person] { def compare(a:Person, b:Person) = a.age compare b.age } Sorting.quickSort(people)(AgeOrdering)
因为sortByKey实现了Ordering的很多功能,下面以Spark中的top函数为例
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { takeOrdered(num)(ord.reverse) }在下面的元组中,以第2个元素基,取出前3大的元组
val array = Array((1, 3, 3), (2, 6, 3), (1, 1, 2), (1, 5, 4), (2, 1, 2)) val rdd1 = sc.parallelize(array) val result=rdd1.top(3)(Ordering.by[(Int, Int, Int), Int](_._2))
结果:
result: Array[(Int, Int, Int)] = Array((2,6,3), (1,5,4), (1,3,3))
相关文章推荐
- Spark核心RDD:Sort排序详解
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
- “戏”说Spark-Spark核心-RDD 持久化机制详解
- Spark核心编程:RDD持久化详解
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark 核心概念 RDD 详解
- “戏”说Spark-Spark核心-RDD转换操作算子详解(二)
- Spark-Sort排序详解
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:foldByKey函数详解
- Spark算子[13]:sortByKey、sortBy、二次排序 源码实例详解
- “戏”说Spark-Spark核心-RDD转换操作算子详解(一)
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark-RDD 之 排序sortBy 和sortByKey
- 理解Spark的核心RDD
- 理解Spark的核心RDD
- Spark RDD API详解(一) Map和Reduce (zhuan)