SPARK排序算法,使用Scala开发 二次排序 自定义KEY值,相比JAVA的罗嗦,Scala优雅简洁!!!
2016-02-28 21:20
585 查看
Spark使用Scala开发的二次排序
【数据文件Input】
2 3
4 1
3 2
4 3
8 7
2 1
【运行结果Output】倒排序
8 7
4 3
4 1
3 2
2 3
2 1
运行结果
【源代码文件】SecondarySortApp.scala SecondarySortKey.scala
class SecondarySortKey
定义排序方法compare
class SecondarySortKey(valfirst:Int,valsecond:Int)
extendsOrdered [SecondarySortKey]
withSerializable {
defcompare(other:SecondarySortKey):Int
={
if(this.first-other.first!=0)
{
this.first-other.first
} else{
this.second-other.second
}
}
SecondarySortApp
1、读入每行数据
vallines =sc.textFile("G://IMFBigDataSpark2016//tesdata//helloSpark.txt", 1) //读取本地文件并设置为一个Partion
2、对每行数据生成一个K,V元组,key值为SecondarySortKey(里面分别放第一个及第二个数据),value
为每一行的数据
val pairWithSortKey = lines.map(line => (
new SecondarySortKey(line.split("")(0).toInt, line.split(" ")(1).toInt),line
))
3、对pairWithSortKey排序,降序排序
val sorted = pairWithSortKey.sortByKey(false)
4、对排序以后的结果, sortedLine为k,v键值对,只输出sortedLine._2的value值,即每行的数据
val sortedResult = sorted.map(sortedLine =>sortedLine._2)
5、collect收集打印输出。
sortedResult.collect().foreach (println)
源代码:
package com.dt.spark
class SecondarySortKey(val first:Int,val second:Int) extends Ordered [SecondarySortKey] with Serializable {
def compare(other:SecondarySortKey):Int = {
if (this.first - other.first !=0) {
this.first - other.first
} else {
this.second - other.second
}
}
}
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/*
* *王家林老师授课 http://weibo.com/ilovepains
*/
object SecondarySortApp {
def main(args:Array[String]){
val conf = new SparkConf() //创建SparkConf对象
conf.setAppName("SecondarySortApp!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群
val sc = new SparkContext(conf)
val lines = sc.textFile("G://IMFBigDataSpark2016//tesdata//helloSpark.txt", 1) //读取本地文件并设置为一个Partion
val pairWithSortKey = lines.map(line => (
// val splited = line.split(" ")
new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line
))
val sorted = pairWithSortKey.sortByKey(false)
val sortedResult = sorted.map(sortedLine =>sortedLine._2)
sortedResult.collect().foreach (println)
}
}
【数据文件Input】
2 3
4 1
3 2
4 3
8 7
2 1
【运行结果Output】倒排序
8 7
4 3
4 1
3 2
2 3
2 1
运行结果
【源代码文件】SecondarySortApp.scala SecondarySortKey.scala
class SecondarySortKey
定义排序方法compare
class SecondarySortKey(valfirst:Int,valsecond:Int)
extendsOrdered [SecondarySortKey]
withSerializable {
defcompare(other:SecondarySortKey):Int
={
if(this.first-other.first!=0)
{
this.first-other.first
} else{
this.second-other.second
}
}
SecondarySortApp
1、读入每行数据
vallines =sc.textFile("G://IMFBigDataSpark2016//tesdata//helloSpark.txt", 1) //读取本地文件并设置为一个Partion
2、对每行数据生成一个K,V元组,key值为SecondarySortKey(里面分别放第一个及第二个数据),value
为每一行的数据
val pairWithSortKey = lines.map(line => (
new SecondarySortKey(line.split("")(0).toInt, line.split(" ")(1).toInt),line
))
3、对pairWithSortKey排序,降序排序
val sorted = pairWithSortKey.sortByKey(false)
4、对排序以后的结果, sortedLine为k,v键值对,只输出sortedLine._2的value值,即每行的数据
val sortedResult = sorted.map(sortedLine =>sortedLine._2)
5、collect收集打印输出。
sortedResult.collect().foreach (println)
源代码:
package com.dt.spark
class SecondarySortKey(val first:Int,val second:Int) extends Ordered [SecondarySortKey] with Serializable {
def compare(other:SecondarySortKey):Int = {
if (this.first - other.first !=0) {
this.first - other.first
} else {
this.second - other.second
}
}
}
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/*
* *王家林老师授课 http://weibo.com/ilovepains
*/
object SecondarySortApp {
def main(args:Array[String]){
val conf = new SparkConf() //创建SparkConf对象
conf.setAppName("SecondarySortApp!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群
val sc = new SparkContext(conf)
val lines = sc.textFile("G://IMFBigDataSpark2016//tesdata//helloSpark.txt", 1) //读取本地文件并设置为一个Partion
val pairWithSortKey = lines.map(line => (
// val splited = line.split(" ")
new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line
))
val sorted = pairWithSortKey.sortByKey(false)
val sortedResult = sorted.map(sortedLine =>sortedLine._2)
sortedResult.collect().foreach (println)
}
}
相关文章推荐
- Data Structures And Problem Solving Using Java (Fourth Edition)中译版(Java 修饰词与可见性)
- Java自定义比较器实现中文排序
- struts2 使用内置对象的方法
- spring框架学习(一)
- Data Structures And Problem Solving Using Java (Fourth Edition)中译版(Java 常见异常篇)
- Java程序生成exe可执行文件详细教程(图文说明)
- 随堂笔记160228表达式
- Java中基本数据类型和包装器类型的关系
- Java 学习笔记 ------第三章 基础语法
- Java泛型
- SpringMVC + Hibernate + Framewoker 示例
- APK中java代码反编译
- Java的单根继承结构--优点
- git 版本控制和SourceTree的安装使用 eclipse 的egit 插件安装使用
- Java 继承详解
- Java Cookie记录商品浏览历史
- Java规范
- java入门:反射
- Java基础知识 -- 多线程
- 求含有n个元素的集合的幂集