Spark PairRDD 转化一
2016-02-01 20:51
337 查看
package edu.berkeley.simple_project; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; /** * Hello world! * */ public class App { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); // convert from other RDD JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd")); JavaPairRDD<String, String> prdd = line1.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String x) throws Exception { return new Tuple2(x.split(" ")[0], x); } }); System.out.println("111111111111mapToPair:"); prdd.foreach(new VoidFunction<Tuple2<String, String>>() { public void call(Tuple2<String, String> x) throws Exception { System.out.println(x); } }); // parallelizePairs Tuple2 t1 = new Tuple2(1, 5); Tuple2 t2 = new Tuple2(1, 3); Tuple2 t3 = new Tuple2(3, 7); List list1 = new ArrayList<Tuple2>(); list1.add(t1); list1.add(t2); list1.add(t3); JavaPairRDD<Integer, Integer> line2 = sc.parallelizePairs(list1); line2.persist(StorageLevel.MEMORY_ONLY()); System.out.println("22222222222222222parallelize:"); line2.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { public void call(Tuple2<Integer, Integer> x) throws Exception { System.out.println(x); } }); // reduceByKey JavaPairRDD<Integer, Integer> line3 = line2.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) throws Exception { return (x + y); } }); System.out.println("333333333333reduceByKey:"); line3.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { public void call(Tuple2<Integer, Integer> x) throws Exception { System.out.println(x); } }); // groupByKey JavaPairRDD<Integer, Iterable<Integer>> line4 = line2.groupByKey(); System.out.println("44444444444444groupByKey:"); line4.foreach(new VoidFunction<Tuple2<Integer, Iterable<Integer>>>() { public void call(Tuple2<Integer, Iterable<Integer>> x) throws Exception { System.out.println(x); } }); // mapValues JavaPairRDD<Integer, Integer> line5 = line2.mapValues(new Function<Integer, Integer>() { public Integer call(Integer x) throws Exception { return x * x; } }); System.out.println("555555555555555mapValues:"); line5.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { public void call(Tuple2<Integer, Integer> x) throws Exception { System.out.println(x); } }); // flatMapValues JavaPairRDD<Integer, Integer> line6 = line2.flatMapValues(new Function<Integer, Iterable<Integer>>() { public Iterable<Integer> call(Integer x) throws Exception { ArrayList list = new ArrayList(); list.add(x); return list; } }); System.out.println("666666666666flatMapValues:"); line6.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { public void call(Tuple2<Integer, Integer> x) throws Exception { System.out.println(x); } }); // keys JavaRDD<Integer> line7 = line2.keys(); System.out.println("777777777777keys:"); line7.foreach(new VoidFunction<Integer>() { public void call(Integer x) throws Exception { System.out.println(x); } }); // values JavaRDD<Integer> line8 = line2.values(); System.out.println("888888888888888values:"); line8.foreach(new VoidFunction<Integer>() { public void call(Integer x) throws Exception { System.out.println(x); } }); // sortByKey JavaPairRDD<Integer, Integer> line9 = line2.sortByKey(false); // JavaPairRDD<Integer, Integer> line9 = line2.sortByKey(new // MyComparator(), true); MyComparator必须实现Comparator接口和Serializable接口 System.out.println("9999999999999sortByKey:"); line9.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { public void call(Tuple2<Integer, Integer> x) throws Exception { System.out.println(x); } }); // filter JavaPairRDD<Integer, Integer> line10 = line2.filter(new Function<Tuple2<Integer, Integer>,Boolean>(){ public Boolean call(Tuple2<Integer, Integer> x) throws Exception { return (x._1>2); } }); System.out.println("aaaaaaaaaaaaaaaafilter:"); line10.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { public void call(Tuple2<Integer, Integer> x) throws Exception { System.out.println(x); } }); } }
package edu.berkeley.simple_project; import java.util.Comparator; public class MyComparator implements Comparator<Integer>{ public int compare(Integer x, Integer y) { // TODO Auto-generated method stub return (x-y); } }
111111111111mapToPair: [Stage 0:> (0 + 0) / 4]16/02/01 18:23:35 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes (4,4 cc) (3,3 dd) (1,1 aa) (2,2 bb) 22222222222222222parallelize: (3,7) (1,3) (1,5) 333333333333reduceByKey: (3,7) (1,8) 44444444444444groupByKey: (1,[5, 3]) (3,[7]) 555555555555555mapValues: (3,49) (1,25) (1,9) 666666666666flatMapValues: (1,3) (3,7) (1,5) 777777777777keys: 1 1 3 888888888888888values: 5 7 3 9999999999999sortByKey: (1,5) (1,3) (3,7) aaaaaaaaaaaaaaaafilter: (3,7)
相关文章推荐
- Firemonkey 调整 MainMenu 字型大小 (D10)
- synchronized/wait/notify 与 mutex/cond wait wake ~ 链表队列 生产消费问题
- UVA 11168 Airport(凸包+直线方程)
- hdu1022 Train Problem I
- jraiser模块加载执行简要总结
- command/usr/bin/codesign failed with exit code 1- code sign error
- spark的task调度器(FAIR公平调度算法)
- 关于 conversion to dalvik fail with error 1
- bug记录:Mybatis-error:Parameter 'xxx' not found. Available parameters are [list]
- RAID详解
- intel Baytrail 平台audio驱动match流程
- filco minila air在debian下的蓝牙适配安装
- waitpid函数实例
- ug-Assertion failure in [MyClass layoutSublayersOfLayer:]
- 剪发 Haircut
- hdoj--1151--Air Raid(最大独立集)
- hdoj--1151--Air Raid(最大独立集)
- Starting httpd: httpd: apr_sockaddr_info_get() failed
- 【转】UltraISO制作U盘启动盘安装Win7/9/10系统攻略
- 【Xamain 跨平台机制原理剖析】