Spark PairRDD 行动与数据分区
2016-02-03 19:55
393 查看
package com.fei.simple_project; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; import com.google.common.base.Optional; import scala.Tuple2; /** * Hello world! * */ public class App { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); // convert from other RDD JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd")); JavaPairRDD<String, String> prdd = line1.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String x) throws Exception { return new Tuple2(x.split(" ")[0], x); } }); System.out.println("111111111111mapToPair:"); prdd.foreach(new VoidFunction<Tuple2<String, String>>() { public void call(Tuple2<String, String> x) throws Exception { System.out.println(x); } }); // parallelizePairs Tuple2 t1 = new Tuple2(1, 2); Tuple2 t2 = new Tuple2(3, 4); Tuple2 t3 = new Tuple2(3, 6); List list1 = new ArrayList<Tuple2>(); list1.add(t1); list1.add(t2); list1.add(t3); JavaPairRDD<Integer, Integer> line2 = sc.parallelizePairs(list1); line2.persist(StorageLevel.MEMORY_ONLY()); Tuple2 t4 = new Tuple2(3, 9); List list2 = new ArrayList<Tuple2>(); list2.add(t4); JavaPairRDD<Integer, Integer> line3 = sc.parallelizePairs(list2); line3.persist(StorageLevel.MEMORY_ONLY()); // countByKey System.out.println("222222222222222countByKey:"); Map<Integer, Object> ma = line2.countByKey(); for(Entry<Integer, Object> e:ma.entrySet()){ System.out.println(e.getKey()+" "+e.getValue()+" "); } // collectAsMap,如果key已存在,后面覆盖前面 System.out.println("3333333333333collectAsMap:"); Map<Integer, Integer> ca = line2.collectAsMap(); for(Entry<Integer, Integer> e:ca.entrySet()){ System.out.println(e.getKey()+" "+e.getValue()+" "); } // lookup System.out.println("4444444444444lookup:"); List<Integer> la = line2.lookup(3); for(Integer i:la){ System.out.println(i+" "); } // partitionBy, partitioner, join+filter Tuple2 ta = new Tuple2(1,"sina"); Tuple2 tb = new Tuple2(2, "taobao"); Tuple2 td = new Tuple2(2, "126"); List lista = new ArrayList<Tuple2>(); lista.add(ta); lista.add(tb); lista.add(td); //自带hash分区,此外还有range分区 JavaPairRDD<Integer, String> linea = sc.parallelizePairs(lista).partitionBy(new HashPartitioner(2)); linea.persist(StorageLevel.MEMORY_ONLY()); Optional<Partitioner> op = linea.partitioner(); System.out.println("5555555555partitioner: "+ op); System.out.println("66666666666present: "+ op.isPresent()); if (op.isPresent()) { System.out.println("77777777value:"+ op.get().numPartitions()); } Tuple2 tc = new Tuple2(2, "126"); Tuple2 te = new Tuple2(2, "baidu"); Tuple2 tf = new Tuple2(1, "dangdang"); List listc = new ArrayList<Tuple2>(); listc.add(tc); listc.add(te); listc.add(tf); //自定义分区 JavaPairRDD<Integer, String> linec = sc.parallelizePairs(listc).partitionBy(new MyPartitioner(2)); linec.persist(StorageLevel.MEMORY_ONLY()); System.out.println("888888888888partitioner: "+ linec.partitioner()); System.out.println("9999999999join:"); JavaPairRDD<Integer, Tuple2<String, String>> js = linea.join(linec); js.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){ public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception { System.out.println(x); } }); JavaPairRDD<Integer, Tuple2<String, String>> fs = js.filter(new Function<Tuple2<Integer, Tuple2<String, String>>, Boolean>(){ public Boolean call(Tuple2<Integer, Tuple2<String, String>> y) throws Exception { return !y._2._1.equals(y._2._2); } }); System.out.println("aaaaaaaaaaafilter:"); fs.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){ public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception { System.out.println(x); } }); } }
package com.fei.simple_project; import org.apache.spark.Partitioner; public class MyPartitioner extends Partitioner { public int num; public MyPartitioner(int N) { num = N; } @Override public int getPartition(Object x) { int ret = x.hashCode()%num; if(ret<0) ret=(-1)*ret; return ret; } @Override public int numPartitions() { return num; } }
111111111111mapToPair: [Stage 0:> (0 + 0) / 4]16/02/04 20:15:45 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes (2,2 bb) (4,4 cc) (3,3 dd) (1,1 aa) 222222222222222countByKey: 1 1 3 2 3333333333333collectAsMap: 1 2 3 6 4444444444444lookup: 4 6 5555555555partitioner: Optional.of(org.apache.spark.HashPartitioner@2) 66666666666present: true 77777777value:2 888888888888partitioner: Optional.of(com.fei.simple_project.MyPartitioner@4b78b) 9999999999join: (1,(sina,dangdang)) (2,(taobao,126)) (2,(taobao,baidu)) (2,(126,126)) (2,(126,baidu)) aaaaaaaaaaafilter: (1,(sina,dangdang)) (2,(taobao,126)) (2,(taobao,baidu)) (2,(126,baidu))
相关文章推荐
- Centos下apache启动时httpd: apr_sockaddr_info_get() failed for 报错
- Connection for controluser as defined in your configuration failed.
- Failed to initialize NVML: GPU access blocked by the operating system
- Jenkins进阶系列之——02email-ext邮件通知模板
- Jenkins进阶系列之——01使用email-ext替换Jenkins的默认邮件通知
- Gmail两步验证
- NSLayoutConstraint
- 简单的AIDL的使用
- UVa 442 && HDU 1082 Matrix Chain Multiplication【栈】
- DataInputStream
- An End-to-End System for Unconstrained Face Verification with Deep Convolutional Neural Networks
- pod setup》error: RPC failed; result=18, HTTP code = 200
- hdu acm 1151 Air Raid
- 一个基于MVVM的TableView组件化实现方案——AITableView
- 手表端开发遇到的错误 E/WearablePkgInstaller: Failed to add com.. 和Ignoring Google Play Services dependent app:
- [LeetCode]217. Contains Duplicate
- df 命令结果显示的 Size 不等于 Avail + Used
- poj3250Bad Hair Day【单调栈】
- UVA 11374 Airport Express
- HDU 1021 Fibonacci Again 数学题