您的位置:首页 > 产品设计 > UI/UE

spark对分组后value值进行排序(JAVA)

2017-03-03 13:24 316 查看
maven:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>

groupsort.txt:

spark 100

storm 90

kafka 75

hadoop 60

zookeeper 100

impala 80

hbase 65

hive 90

flume 95

elasticsearch 100

spark 80

storm 70

kafka 80

hadoop 75

zookeeper 90

impala 100

hbase 30

hive 70

flume 80

elasticsearch 90

spark 56

storm 88

kafka 44

hadoop 33

zookeeper 99

impala 88

hbase 63

hive 45

flume 89

elasticsearch 79

public class GroupSort {
public static void main(String[] args) {
/**
* 创建spark配置对象SparkConf,设置spark运行时配置信息,
* 例如通过setMaster来设置程序要连接的集群的Master的URL,如果设置为local,
* spark为本地运行
*/
SparkConf conf = new SparkConf().setAppName("My first spark").setMaster("local");
/**
* 创建JavaSparkContext对象
* SparkContext是spark所有功能的唯一入口,
* SparkContext核心作用,初始化spark运行所需要的核心组件,同时还会负责spark程序在master的注册。
*
*/
JavaSparkContext sc = new JavaSparkContext(conf);
//sc.setLogLevel("OFF");
/**
* 根据数据来源,通过JavaSparkContext来创建RDD
*/
JavaRDD<String> lines = sc.textFile("E:/groupsort.txt");

JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String line) throws Exception {
String[] split = line.split(" ");
return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1]));
}
});
/**
* 分组
*/
JavaPairRDD<String, Iterable<Integer>> groups = pairs.groupByKey();
/**
* 对分组结果排序
*/
JavaPairRDD<String, Iterable<Integer>> groupsSort = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupData) throws Exception {
List<Integer> integers = new ArrayList<Integer>();
String name = groupData._1;
Iterator<Integer> it = groupData._2.iterator();
while (it.hasNext()) {
integers.add(it.next());
}
integers.sort(new Comparator<Integer>() {
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
return new Tuple2<String, Iterable<Integer>>(name, integers);
}
});
/**
* 打印
*/
groupsSort.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
public void call(Tuple2<String, Iterable<Integer>> data) throws Exception {
System.out.println(data._1+"  "+data._2);
}
});
/**
* 关闭JavaSparkContext
*/
sc.stop();
}
}

运行结果:

spark  [100, 80, 56]

hive  [90, 70, 45]

hadoop  [75, 60, 33]

flume  [95, 89, 80]

zookeeper  [100, 99, 90]

impala  [100, 88, 80]

storm  [90, 88, 70]

elasticsearch  [100, 90, 79]

kafka  [80, 75, 44]

hbase  [65, 63, 30]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: