spark对分组后value值进行排序(JAVA)
2017-03-03 13:24
316 查看
maven:
groupsort.txt:
spark 100
storm 90
kafka 75
hadoop 60
zookeeper 100
impala 80
hbase 65
hive 90
flume 95
elasticsearch 100
spark 80
storm 70
kafka 80
hadoop 75
zookeeper 90
impala 100
hbase 30
hive 70
flume 80
elasticsearch 90
spark 56
storm 88
kafka 44
hadoop 33
zookeeper 99
impala 88
hbase 63
hive 45
flume 89
elasticsearch 79
运行结果:
spark [100, 80, 56]
hive [90, 70, 45]
hadoop [75, 60, 33]
flume [95, 89, 80]
zookeeper [100, 99, 90]
impala [100, 88, 80]
storm [90, 88, 70]
elasticsearch [100, 90, 79]
kafka [80, 75, 44]
hbase [65, 63, 30]
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> </dependency>
groupsort.txt:
spark 100
storm 90
kafka 75
hadoop 60
zookeeper 100
impala 80
hbase 65
hive 90
flume 95
elasticsearch 100
spark 80
storm 70
kafka 80
hadoop 75
zookeeper 90
impala 100
hbase 30
hive 70
flume 80
elasticsearch 90
spark 56
storm 88
kafka 44
hadoop 33
zookeeper 99
impala 88
hbase 63
hive 45
flume 89
elasticsearch 79
public class GroupSort { public static void main(String[] args) { /** * 创建spark配置对象SparkConf,设置spark运行时配置信息, * 例如通过setMaster来设置程序要连接的集群的Master的URL,如果设置为local, * spark为本地运行 */ SparkConf conf = new SparkConf().setAppName("My first spark").setMaster("local"); /** * 创建JavaSparkContext对象 * SparkContext是spark所有功能的唯一入口, * SparkContext核心作用,初始化spark运行所需要的核心组件,同时还会负责spark程序在master的注册。 * */ JavaSparkContext sc = new JavaSparkContext(conf); //sc.setLogLevel("OFF"); /** * 根据数据来源,通过JavaSparkContext来创建RDD */ JavaRDD<String> lines = sc.textFile("E:/groupsort.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String line) throws Exception { String[] split = line.split(" "); return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1])); } }); /** * 分组 */ JavaPairRDD<String, Iterable<Integer>> groups = pairs.groupByKey(); /** * 对分组结果排序 */ JavaPairRDD<String, Iterable<Integer>> groupsSort = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() { public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupData) throws Exception { List<Integer> integers = new ArrayList<Integer>(); String name = groupData._1; Iterator<Integer> it = groupData._2.iterator(); while (it.hasNext()) { integers.add(it.next()); } integers.sort(new Comparator<Integer>() { public int compare(Integer o1, Integer o2) { return o2 - o1; } }); return new Tuple2<String, Iterable<Integer>>(name, integers); } }); /** * 打印 */ groupsSort.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { public void call(Tuple2<String, Iterable<Integer>> data) throws Exception { System.out.println(data._1+" "+data._2); } }); /** * 关闭JavaSparkContext */ sc.stop(); } }
运行结果:
spark [100, 80, 56]
hive [90, 70, 45]
hadoop [75, 60, 33]
flume [95, 89, 80]
zookeeper [100, 99, 90]
impala [100, 88, 80]
storm [90, 88, 70]
elasticsearch [100, 90, 79]
kafka [80, 75, 44]
hbase [65, 63, 30]
相关文章推荐
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- spark对分组后value值进行排序(JAVA)
- 大数据IMF传奇 第19课 spark 二次排序 使用JAVA自定义key 进行二次排序
- 第20课 :SPARK Top N彻底解秘 TOPN 排序(Scala)SPARK分组TOPN 算法(JAVA) 必须掌握!