您的位置:首页 > 其它

[1.5]以二次排序算法的实现为例体验spark高级排序

2016-06-13 13:51 330 查看

场景

完成文件
/home/pengyucheng/resource/hellospark.txt
中数据的二次升序排序(即第一个数字相同,则比较第二个数字的大小,并以此排序) -

源数据:

1,1,spark

1,3,zookeeper

1,2,akka

3,1,hadoop

3,8,zookeeper

2,1,flink

排序后的数据:
1,1,spark

1,2,akka

1,3,zookeeper

2,1,flink

3,1,hadoop

3,8,zookeeper


利用spark内置排序函数sortByKey,分别以java与scala实现上叙二次排序算法。

分析

spark core内置的sorkBykey实现方式:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}


即默认按照 K 值升序一次排序,完序后产生一个ShuffledRDD。

那么二次排序,三次排序,n次排序如何实现呢?下面代码解说二次排序的实现!

实验

java版

java bean

package cool.pengych.spark.core;
import java.io.Serializable;
import scala.math.Ordered;
/**
* 定义二次排序 bean
* @author pengyucheng
*/
public class SecondarySort implements Ordered<SecondarySort>, Serializable
{
/*
* 排序Key
*/
private int first;
private int second;

public SecondarySort(int first, int second) {
super();
this.first = first;
this.second = second;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SecondarySort other = (SecondarySort) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}

@Override
public boolean $greater(SecondarySort other)
{
if(this.first>other.getFirst())
{
return true;
}
else if(this.first==other.getFirst()&&this.second>other.second)
{
return true;
}
return false;
}

@Override
public boolean $greater$eq(SecondarySort other) {
if(this.$greater(other)){
return true;
}
else if(this.first==other.getFirst() && this.second == other.getSecond()) return true;
return false;
}

@Override
public boolean $less(SecondarySort other) {
if(this.first<other.getFirst()){
return true;
}else if(this.first==other.getFirst() && this.second < other.getSecond()) return true;
return false;
}

@Override
public boolean $less$eq(SecondarySort other) {
if(this.$less(other)){
return true;
}
else if(this.first==other.getFirst() && this.second == other.getSecond()) return true;
return false;
}

@Override
public int compare(SecondarySort other) {
if(this.first  - other.getFirst() !=0){
return this.first - other.getFirst();
}else {
return this.second - other.getSecond();
}
}

@Override
public int compareTo(SecondarySort other) {
if(this.first  - other.getFirst() !=0){
return this.first - other.getFirst();
}else {
return this.second - other.getSecond();
}
}

public int getFirst() {
return first;
}

public int getSecond() {
return second;
}

public void setFirst(int first) {
this.first = first;
}

public void setSecond(int second) {
this.second = second;
}
}


实现

package cool.pengych.spark.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
* 二次排序,具体实现步骤
* 1、按照Ordered与Serializable接口实现自定义排序的Key
* 2、将要进行二次排序的文件加载进来生成<K,V>类型的RDD
* 3、使用sortByKey基于自定义的K进行二次排序
* 4、去除排序的key,只保存排序的结果
* @author pengyucheng
*/
public class SecondarySortApp {
public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("Spark SecondarySort of java version").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("/home/pengyucheng/resource/hellospark.txt");

JavaPairRDD<SecondarySort, String> pairs = lines.mapToPair(new PairFunction<String, SecondarySort, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<SecondarySort, String> call(String line) throws Exception {
String[]  splited = line.split(",");
SecondarySort ss = new SecondarySort(Integer.valueOf(splited[0]), Integer.valueOf(splited[1]));
return new Tuple2<SecondarySort, String>(ss, line);
}
});

JavaPairRDD<SecondarySort, String> sorted = pairs.sortByKey();

JavaRDD<String> results = sorted.map(new Function<Tuple2<SecondarySort,String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<SecondarySort, String> v1) throws Exception {
return v1._2;
}
});

results.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
}
}


scala版

case class

package cool.pengych.spark.core

/**
* Created by pengyucheng on 16-6-13.
*/
case class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable{
override def compare(that: SecondarySortKey): Int =
if(this.first!=that.first) this.first-that.first else this.second-that.second
}


object

package cool.pengych.spark.core
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by pengyucheng on 16-6-13.
* 二次排序 scala 版本
*/
object SecondarySortKeyTest {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Secondary Sort ALG"))
sc.textFile("/home/pengyucheng/resource/hellospark.txt").map(line => {
val splited = line.split(",")     (SecondarySortKey(splited(0).toInt,splited(1).toInt),line)
}).sortByKey().map(pair => pair._2).collect().foreach(println)
}
}


执行结果

16/06/13 13:34:15 INFO DAGScheduler: Job 1 finished: collect at SecondarySortKeyTest.scala:17, took 0.606013 s
1,1,spark
1,2,akka
1,3,zookeeper
2,1,flink
3,1,hadoop
3,8,zookeeper
16/06/13 13:34:15 INFO SparkContext: Invoking stop() from shutdown hook


总结

scala版比java版本代码量少了50%以上,相当简洁-这得意于强大的scala编译器!我们用反编译命令来看看scala编译器为我们做了什么:

反编译命令 :
$ javap -c -p -classpath . SecondarySortKey.class


结果:`public class SecondarySortKey implements SecondarySortKey>, scala.Serializable {

private final int first;

private final int second;

public boolean $less(java.lang.Object);

Code:

0: aload_0

1: aload_1

2: invokestatic #23 // Method scala/math/Ordered$class.$less:(Lscala/math/Ordered;Ljava/lang/Object;)Z

5: ireturn

public boolean $greater(java.lang.Object);

Code:

0: aload_0

1: aload_1

2: invokestatic #30 // Method scala/math/Ordered$class.$greater:(Lscala/math/Ordered;Ljava/lang/Object;)Z

5: ireturn

public boolean $less$eq(java.lang.Object);

Code:

0: aload_0

1: aload_1

2: invokestatic #33 // Method scala/math/Ordered$class.$less$eq:(Lscala/math/Ordered;Ljava/lang/Object;)Z

5: ireturn

public boolean greatereq(java.lang.Object);

Code:

0: aload_0

1: aload_1

2: invokestatic #36 // Method scala/math/Ordered$class.$greater$eq:(Lscala/math/Ordered;Ljava/lang/Object;)Z

5: ireturn

public int compareTo(java.lang.Object);

Code:

0: aload_0

1: aload_1

2: invokestatic #41 // Method scala/math/Ordered$class.compareTo:(Lscala/math/Ordered;Ljava/lang/Object;)I

5: ireturn

public int first();

Code:

0: aload_0

1: getfield #44 // Field first:I

4: ireturn

public int second();

Code:

0: aload_0

1: getfield #46 // Field second:I

4: ireturn

public int compare(cool.pengych.spark.core.SecondarySortKey);

Code:

0: aload_0

1: invokevirtual #50 // Method first:()I

4: aload_1

5: invokevirtual #50 // Method first:()I

8: if_icmpeq 23

11: aload_0

12: invokevirtual #50 // Method first:()I

15: aload_1

16: invokevirtual #50 // Method first:()I

19: isub

20: goto 32

23: aload_0

24: invokevirtual #52 // Method second:()I

27: aload_1

28: invokevirtual #52 // Method second:()I

31: isub

32: ireturn

public int compare(java.lang.Object);

Code:

0: aload_0

1: aload_1

2: checkcast #2 // class cool/pengych/spark/core/SecondarySortKey

5: invokevirtual #54 // Method compare:(Lcool/pengych/spark/core/SecondarySortKey;)I

8: ireturn

public cool.pengych.spark.core.SecondarySortKey(int, int);

Code:

0: aload_0

1: iload_1

2: putfield #44 // Field first:I

5: aload_0

6: iload_2

7: putfield #46 // Field second:I

10: aload_0

11: invokespecial #59 // Method java/lang/Object.””:()V

14: aload_0

15: invokestatic #63 // Method scala/math/Orderedclass.init$:(Lscala/math/Ordered;)V

18: return

}

`

you see that ! you are never coding alone, compiler is alway there, giving you a hand .

参考

王家林DT大数据梦工厂

spark1.6.0源码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: