spark ReduceByKey操作
2016-05-15 16:58
423 查看
执行reduceByKey算子
// reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值
// 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型
// 因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入
// 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型
// 第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的
// reduceByKey算法返回的RDD,还是JavaPairRDD<key, value>
public static void myReduceByKey(){
SparkConf conf=new SparkConf()
.setMaster("local")
.setAppName("myGroupByKey");
JavaSparkContext sc=new JavaSparkContext(conf);
List list=Arrays.asList(new Tuple2<String,Integer>("c1",23),new Tuple2<String,Integer>("c2",33),
new Tuple2<String,Integer>("c1",23),new Tuple2<String,Integer>("c2",56));
JavaPairRDD<String, Integer> listRdd= sc.parallelizePairs(list);
JavaPairRDD<String, Integer> listReduce=listRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer x, Integer y) throws Exception {
// TODO Auto-generated method stub
return x+y;
}
});
listReduce.foreach(new VoidFunction<Tuple2<String,Integer>>(){
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
// TODO Auto-generated method stub
System.out.println("key:"+tuple._1+",values:"+tuple._2);
}
});
}
计算结果:
key:c2,values:89
key:c1,values:46
// reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值
// 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型
// 因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入
// 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型
// 第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的
// reduceByKey算法返回的RDD,还是JavaPairRDD<key, value>
public static void myReduceByKey(){
SparkConf conf=new SparkConf()
.setMaster("local")
.setAppName("myGroupByKey");
JavaSparkContext sc=new JavaSparkContext(conf);
List list=Arrays.asList(new Tuple2<String,Integer>("c1",23),new Tuple2<String,Integer>("c2",33),
new Tuple2<String,Integer>("c1",23),new Tuple2<String,Integer>("c2",56));
JavaPairRDD<String, Integer> listRdd= sc.parallelizePairs(list);
JavaPairRDD<String, Integer> listReduce=listRdd.reduceByKey(new Function2<Integer,Integer,Integer>(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer x, Integer y) throws Exception {
// TODO Auto-generated method stub
return x+y;
}
});
listReduce.foreach(new VoidFunction<Tuple2<String,Integer>>(){
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
// TODO Auto-generated method stub
System.out.println("key:"+tuple._1+",values:"+tuple._2);
}
});
}
计算结果:
key:c2,values:89
key:c1,values:46
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树