Spark算子[13]:sortByKey、sortBy、二次排序 源码实例详解
2017-12-14 16:33
761 查看
sortBy是对标准的RDD进行排序。[在scala语言中,RDD与PairRDD没有太严格的界限]。
sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。
该函数最多可以传三个参数:
第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。
从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的元素,也就变成了Key-Value类型的RDD了,它的实现如下:
Scala版本案例1
针对scala语言的pairRDD操作
结果:(d,5) (d,2) (b,2) (c,1) (a,0) (b,0)
Scala版本案例2
针对scala的rdd操作
结果:12 9 5 1
上面的实例对rdd中的元素进行降序排序。并对排序后的RDD的分区个数进行了修改,上面的res就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。
Java版本案例
c837
org.apache.spark.api.java.JavaRDD源码如下:
需要注意的是,此处和scala不同,必须是三个参数!
结果:10 7 6 5 5 3 1 1 0
它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的。
源码
从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。
Scala版本案例
结果:(4,2) (6,1) (8,0) (10,5) (11,0) (20,2)
上面对Key进行了排序。细心的读者可能会问,sortBy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:
他就是默认的排序规则,我们可以对它进行重写,如下:
结果:(10,5) (11,0) (20,2) (4,2) (6,1) (8,0)
Java版本案例
源码
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(): JavaPairRDD[K, V]
自定义比较器:
不使用比较器结果:(1,90) (2,60) (21,60) (3,50)
使用比较器结果:(1,90) (2,60) (21,60) (3,50)
Spark Scala 二次排序
sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。
sortBy
源码/** * RDD.scala * Return this RDD sorted by the given key function. */ def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values }
该函数最多可以传三个参数:
第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。
从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的元素,也就变成了Key-Value类型的RDD了,它的实现如下:
def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanedF = sc.clean(f) map(x => (cleanedF(x), x)) }
Scala版本案例1
针对scala语言的pairRDD操作
def sortBy(): Unit = { val conf = new SparkConf().setMaster("local").setAppName("sortBy") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(("a",0),("d",5),("d",2),("c",1),("b",2),("b",0)),4) // 按照tuple2的第二个元素降序排列 val res =rdd.sortBy(_._2,false,2) res.foreach(x=> print(x+ " ")) }
结果:(d,5) (d,2) (b,2) (c,1) (a,0) (b,0)
Scala版本案例2
针对scala的rdd操作
def sortBy(): Unit = { val conf = new SparkConf().setMaster("local").setAppName("sortBy") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(5,1,9,12),2) val res =rdd.sortBy(x=>x,false,1) //其中的x=>x相当于是函数,不能用_代替 res.foreach(x=> print(x+ " ")) }
结果:12 9 5 1
上面的实例对rdd中的元素进行降序排序。并对排序后的RDD的分区个数进行了修改,上面的res就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。
Java版本案例
c837
org.apache.spark.api.java.JavaRDD源码如下:
/** * Return this RDD sorted by the given key function. */ def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = { def fn: (T) => S = (x: T) => f.call(x) import com.google.common.collect.Ordering // shadows scala.math.Ordering implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]] implicit val ctag: ClassTag[S] = fakeClassTag wrapRDD(rdd.sortBy(fn, ascending, numPartitions)) }
需要注意的是,此处和scala不同,必须是三个参数!
private static void sortBy() { SparkConf conf = new SparkConf().setAppName("sortBy").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> numList = Arrays.asList(1, 5, 3, 5, 6, 7, 0, 1, 10); JavaRDD<Integer> numRdd = sc.parallelize(numList); Function<Integer, Integer> fun1 = new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1; } }; JavaRDD<Integer> res = numRdd.sortBy(fun1, false, 1); res.foreach(x -> System.out.println(x)); sc.close(); }
结果:10 7 6 5 5 3 1 1 0
sortByKey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的。
源码
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。
Scala版本案例
def sortByKey(): Unit = { val conf = new SparkConf().setMaster("local").setAppName("sortBy") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List((11,0),(10,5),(4,2),(6,1),(20,2),(8,0))) val res =rdd.sortByKey(true,1) res.foreach(x => print(x + " ")) }
结果:(4,2) (6,1) (8,0) (10,5) (11,0) (20,2)
上面对Key进行了排序。细心的读者可能会问,sortBy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:
private val ordering = implicitly[Ordering[K]]
他就是默认的排序规则,我们可以对它进行重写,如下:
def sortByKey(): Unit = { val conf = new SparkConf().setMaster("local").setAppName("sortBy") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List((11,0),(10,5),(4,2),(6,1),(20,2),(8,0))) //重写ordering implicit val sortIntegersByString = new Ordering[Int]{ override def compare(x: Int, y: Int): Int = x.toString.compareTo(y.toString) } val res =rdd.sortByKey(true,1) res.foreach(x => print(x + " ")) }
结果:(10,5) (11,0) (20,2) (4,2) (6,1) (8,0)
Java版本案例
源码
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = { implicit val ordering = comp // 允许比较器隐式转换为排序。 fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) }
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(): JavaPairRDD[K, V]
public static void sortByKey() { SparkConf conf = new SparkConf().setAppName("sortBy").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("1", 90), new Tuple2<String, Integer>("2", 60), new Tuple2<String, Integer>("21", 60), new Tuple2<String, Integer>("3", 50) ); JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(scoreList); // 自定义比较器:直接这样在内部定义是不正确的 // 抛出异常Task not serializable: java.io .NotSerializableException: // 解决办法,另外定义一个小class ,implements Comparator<String>,Serializable /* Comparator<String> comparator = new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.valueOf(o1).compareTo(Integer.valueOf(o2)); } }; */ JavaPairRDD<String, Integer> res = pairRDD.sortByKey(); //JavaPairRDD<String, Integer> res = pairRDD.sortByKey(new MyComparator()); res.foreach(x -> System.out.print(x + " ")); sc.close(); }
自定义比较器:
public class MyComparator implements Comparator<String>,Serializable { @Override public int compare(String o1, String o2) { return Integer.valueOf(o1).compareTo(Integer.valueOf(o2)); } }
不使用比较器结果:(1,90) (2,60) (21,60) (3,50)
使用比较器结果:(1,90) (2,60) (21,60) (3,50)
二次排序
Spark Java 二次排序Spark Scala 二次排序
相关文章推荐
- Spark算子[12]:groupByKey、cogroup、join、lookup 源码实例详解
- Spark算子[10]:foldByKey、fold 源码实例详解
- 大数据:Spark 算子(一)排序算子sortByKey来看大数据平台下如何做排序
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
- Spark: sortBy sortByKey 二次排序
- Spark 使用sortByKey进行二次排序
- Spark 使用sortByKey进行二次排序
- Spark算子详解之reduceByKey_sample_take_takeSample_distinct_sortByKey_saveAsTextFile_intersection
- 【spark】sortByKey实现二次排序
- spark中算子详解:combineByKey
- Spark: sortBy和sortByKey函数详解
- spark 算子combineByKey 详解
- spark中算子详解:combineByKey
- Spark groupByKey,reduceByKey,sortByKey算子的区别
- Spark算子[18]:saveAsTextFile、saveAsObjectFile 源码实例详解
- spark:sortByKey按年龄排序--20
- Spark算子[14]:top、takeOrdered 源码实例详解
- Spark:sortBy和sortByKey的函数详解
- Spark-RDD 之 排序sortBy 和sortByKey
- Spark: sortBy和sortByKey函数详解