您的位置:首页 > 编程语言 > Java开发

Spark二次排序(Java+Scala)

2017-06-01 14:07 253 查看
1.基础排序算法

sc.textFile("/data/putfile.txt").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_,1).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).collect


2.二次排序算法(Java实现)

import java.io.Serializable;

import scala.math.Ordered;

public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable {

//需要二次排序的Key
private int first;
private int seconde;

/**
* 二次排序的构造方法
* @param first
* @param seconde
*/
public SecondarySortKey(int first, int seconde) {
super();
this.first = first;
this.seconde = seconde;
}

@Override
public boolean $greater(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.first> other.getFirst()){
return true;
}else if(this.first==other.getFirst()&&this.seconde>other.seconde){
return true;
}
return false;
}
@Override
public boolean $greater$eq(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.$greater(other)){
return true;
}else if(this.first==other.getFirst()&&this.seconde==other.getSeconde()){
return true;
}
return false;
}
@Override
public boolean $less(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.first<other.getFirst()){
return true;
}else if(this.first==other.getFirst()&&this.seconde<other.getSeconde()){
return true;
}
return false;
}
@Override
public boolean $less$eq(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.$less(other)){
return true;
}else if(this.first==other.getFirst()&&this.seconde==other.getSeconde()){
return true;
}
return false;
}
@Override
public int compare(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.first - other.getFirst()!=0){
return this.first-other.getFirst();
}else{
return this.seconde-other.getSeconde();
}

}
@Override
public int compareTo(SecondarySortKey other) {
// TODO Auto-generated method stub
if(this.first-other.getFirst()!=0){
return this.first-other.getFirst();
}else{
return this.seconde-other.getSeconde();
}
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + seconde;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SecondarySortKey other = (SecondarySortKey) obj;
if (first != other.first)
return false;
if (seconde != other.seconde)
return false;
return true;
}

public int getFirst() {
return first;
}

public void setFirst(int first) {
this.first = first;
}

public int getSeconde() {
return seconde;
}

public void setSeconde(int seconde) {
this.seconde = seconde;
}

}


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

/**
* 二次排序:具体的实现步骤
* 第一步:按照Ordered和Serializable接口实现自定义排序的Key
* 第二步:将要进行二次排序的文件加载进来生成<key,value>类型的RDD
* 第三步:使用SortByKey基于自定义的Key进行二次排序
* 第四步:去除调排序的key,只保留排序的结果
*
* @author Shuai.Zh
*
*/
public class SecondarySortApp {

public static void main(String[] args) {
// TODO Auto-generated method stub

SparkConf conf=new SparkConf();
conf.setAppName("SecondarySort");
conf.setMaster("local");

JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD<String> lines=sc.textFile("C:\\Users\\Administrator\\Desktop\\1.txt");
JavaPairRDD<SecondarySortKey, String> pairs=lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() {

@Override
public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
// TODO Auto-generated method stub
String[] splited=line.split(" ");
SecondarySortKey key=new SecondarySortKey(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));

return new Tuple2<SecondarySortKey, String>(key, line);
}
});

JavaPairRDD<SecondarySortKey, String> sorted=pairs.sortByKey();//完成二次排序
//过滤掉排序后自定的key,保留排序的结果
JavaRDD<String> secondarySorted=sorted.map(new Function<Tuple2<SecondarySortKey,String>, String>() {

@Override
public String call(Tuple2<SecondarySortKey, String> sortedCount) throws Exception {
// TODO Auto-generated method stub
return sortedCount._2;
}
});

secondarySorted.foreach(new VoidFunction<String>() {

@Override
public void call(String sorted) throws Exception {
// TODO Auto-generated method stub
System.out.println(sorted);
}
});;
}

}


二次排序Scala实现

import org.apache.spark.SparkConf

/**
* Created by Administrator on 2017/6/1.
*/
class  SecondarySortkey(val first:Int,val second :Int) extends Ordered[SecondarySortkey] with Serializable {
def compare(other:SecondarySortkey): Int ={
if(this.first-other.first!=0){
this.first-other.first
}else{
this.second-other.second
}
}
}


import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2017/6/1.
*/
object SecondarySortApp {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("sort").setMaster("local")
val sc=new SparkContext(conf)
val lines=sc.textFile("C:\\Users\\Administrator\\Desktop\\1.txt")

val pairWithSortKey=lines.map(line=> (
new SecondarySortkey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
))

val sorted=pairWithSortKey.sortByKey(false)
val sortedResult=sorted.map(sortedLine=>sortedLine._2)
sortedResult.collect().foreach(println)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: