Spark:Java实现 二次排序
2017-12-06 19:47
357 查看
测试数据
输出结果
实现思路:
1.实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法
2.将包含文本的RDD,映射成key为自定义key,value为文本的JavaPariRDD
3.使用sortByKey算子按照自定义的key进行排序
4.再次映射,剔除自定义的key,而只保留文本行
自定义的key:SecondarySortKey_12
* SecondarySort_12 类:*
1 5 2 4 3 6 1 3 2 1
输出结果
1 3 1 5 2 1 2 4 3 6
实现思路:
1.实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法
2.将包含文本的RDD,映射成key为自定义key,value为文本的JavaPariRDD
3.使用sortByKey算子按照自定义的key进行排序
4.再次映射,剔除自定义的key,而只保留文本行
自定义的key:SecondarySortKey_12
package cn.spark.study.core; import java.io.Serializable; import scala.math.Ordered; /** * 自定义的二次排序key * @author Administrator * */ public class SecondarySortKey_12 implements Ordered<SecondarySortKey_12>,Serializable{ private static final long serialVersionUID = 1L; //首先在自定义的key里面,定义需要进行排序的列 private int first; private int second; public SecondarySortKey_12(int first, int second) { this.first = first; this.second = second; } @Override public boolean $greater(SecondarySortKey_12 other) { if(this.first > other.getFirst()){ return true; } else if (this.first == other.getFirst() && this.second>other.getSecond()){ return true; } return false; } @Override public boolean $greater$eq(SecondarySortKey_12 other) { if(this.$greater(other)){ return true; } else if (this.first == other.getFirst() && this.second == other.getSecond()){ return true; } return false; } @Override public boolean $less(SecondarySortKey_12 other) { if(this.first<other.getFirst()){ return true; } else if (this.first == other.getFirst() && this.second<other.getSecond()){ return true; } return false; } @Override public boolean $less$eq(SecondarySortKey_12 other) { if(this.$less(other)){ return true; } else if(this.first == other.getFirst() && this.second == other.getSecond()) { return true; } return false; } @Override public int compare(SecondarySortKey_12 other) { if (this.first - other.getFirst() != 0){ return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } @Override public int compareTo(SecondarySortKey_12 other) { if (this.first - other.getFirst() != 0){ return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } //为要进行排序的多个列,提供getter和setter方法,以及hascode 和equals方法 public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + second; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondarySortKey_12 other = (SecondarySortKey_12) obj; if (first != other.first) return false; if (second != other.second) return false; return true; } }
* SecondarySort_12 类:*
package cn.spark.study.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class SecondarySort_12 { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SecondarySort").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("E://BigData//sparkdata//sort.txt"); JavaPairRDD<SecondarySortKey_12, String> pairs = lines.mapToPair( new PairFunction<String, SecondarySortKey_12, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<SecondarySortKey_12, String> call(String line) throws Exception { String[] lineSplited = line.split(" "); SecondarySortKey_12 key = new SecondarySortKey_12( Integer.valueOf(lineSplited[0]), Integer.valueOf(lineSplited[1])); return new Tuple2<SecondarySortKey_12, String>(key, line); } }); JavaPairRDD<SecondarySortKey_12, String> sortedPairs = pairs.sortByKey(); JavaRDD<String> sortedLines = sortedPairs.map( new Function<Tuple2<SecondarySortKey_12,String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(Tuple2<SecondarySortKey_12, String> v) throws Exception { return v._2; } }); sortedLines.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String t) throws Exception { System.out.println(t); } }); sc.close(); } }
相关文章推荐
- Spark用Java实现二次排序的自定义key
- Spark基础排序+二次排序(java+scala)
- spark二次排序简单例子(JAVA)
- spark二次排序简单例子(JAVA)
- spark二次排序简单例子(JAVA)
- java8实现spark wordcount并且按照value排序输出
- [1.5]以二次排序算法的实现为例体验spark高级排序
- spark二次排序简单例子(JAVA)
- spark二次排序简单例子(JAVA)
- spark二次排序简单例子(JAVA)
- spark二次排序简单例子(JAVA)
- 大数据IMF传奇 第19课 spark 二次排序 使用JAVA自定义key 进行二次排序
- spark二次排序简单例子(JAVA)
- spark二次排序简单例子(JAVA)
- Hadoop和Spark分别实现二次排序
- spark二次排序简单例子(JAVA)
- spark二次排序简单例子(JAVA)
- 数据处理中Java与scala实现二次排序
- spark二次排序简单例子(JAVA)
- 【spark】sortByKey实现二次排序