您的位置:首页 > 编程语言 > Java开发

Spark:Java实现 二次排序

2017-12-06 19:47 357 查看
测试数据

1 5
2 4
3 6
1 3
2 1


输出结果

1 3
1 5
2 1
2 4
3 6


实现思路:

1.实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法

2.将包含文本的RDD,映射成key为自定义key,value为文本的JavaPariRDD

3.使用sortByKey算子按照自定义的key进行排序

4.再次映射,剔除自定义的key,而只保留文本行

自定义的key:SecondarySortKey_12

package cn.spark.study.core;

import java.io.Serializable;

import scala.math.Ordered;

/**
* 自定义的二次排序key
* @author Administrator
*
*/
public class SecondarySortKey_12 implements Ordered<SecondarySortKey_12>,Serializable{

private static final long serialVersionUID = 1L;

//首先在自定义的key里面,定义需要进行排序的列
private int first;
private int second;

public SecondarySortKey_12(int first, int second) {
this.first = first;
this.second = second;
}

@Override
public boolean $greater(SecondarySortKey_12 other) {
if(this.first > other.getFirst()){
return true;
}
else if (this.first == other.getFirst() && this.second>other.getSecond()){
return true;
}
return false;
}

@Override
public boolean $greater$eq(SecondarySortKey_12 other) {
if(this.$greater(other)){
return true;
}
else if (this.first == other.getFirst() && this.second == other.getSecond()){
return true;
}
return false;
}

@Override
public boolean $less(SecondarySortKey_12 other) {
if(this.first<other.getFirst()){
return true;
}
else if (this.first == other.getFirst() && this.second<other.getSecond()){
return true;
}
return false;
}

@Override
public boolean $less$eq(SecondarySortKey_12 other) {
if(this.$less(other)){
return true;
}
else if(this.first == other.getFirst() && this.second == other.getSecond()) {
return true;
}
return false;
}

@Override
public int compare(SecondarySortKey_12 other) {
if (this.first - other.getFirst() != 0){
return this.first - other.getFirst();
}
else {
return this.second - other.getSecond();
}
}

@Override
public int compareTo(SecondarySortKey_12 other) {
if (this.first - other.getFirst() != 0){
return this.first - other.getFirst();
}
else {
return this.second - other.getSecond();
}
}

//为要进行排序的多个列,提供getter和setter方法,以及hascode 和equals方法
public int getFirst() {
return first;
}

public void setFirst(int first) {
this.first = first;
}

public int getSecond() {
return second;
}

public void setSecond(int second) {
this.second = second;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SecondarySortKey_12 other = (SecondarySortKey_12) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}
}


* SecondarySort_12 类:*

package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class SecondarySort_12 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SecondarySort").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> lines = sc.textFile("E://BigData//sparkdata//sort.txt");

JavaPairRDD<SecondarySortKey_12, String> pairs = lines.mapToPair(
new PairFunction<String, SecondarySortKey_12, String>() {

private static final long serialVersionUID = 1L;

@Override
public Tuple2<SecondarySortKey_12, String> call(String line) throws Exception {
String[] lineSplited = line.split(" ");
SecondarySortKey_12 key = new SecondarySortKey_12(
Integer.valueOf(lineSplited[0]),
Integer.valueOf(lineSplited[1]));

return new Tuple2<SecondarySortKey_12, String>(key, line);
}
});

JavaPairRDD<SecondarySortKey_12, String> sortedPairs = pairs.sortByKey();

JavaRDD<String> sortedLines = sortedPairs.map(
new Function<Tuple2<SecondarySortKey_12,String>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String call(Tuple2<SecondarySortKey_12, String> v) throws Exception {
return v._2;
}
});

sortedLines.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});

sc.close();

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 排序算法 java