您的位置:首页 > 大数据 > 人工智能

Spark PairRDD 行动与数据分区

2016-02-03 19:55 393 查看
package com.fei.simple_project;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import com.google.common.base.Optional;

import scala.Tuple2;

/**
* Hello world!
*
*/
public class App {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);

// convert from other RDD
JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"));
JavaPairRDD<String, String> prdd = line1.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) throws Exception {
return new Tuple2(x.split(" ")[0], x);
}
});
System.out.println("111111111111mapToPair:");
prdd.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> x) throws Exception {
System.out.println(x);
}
});

// parallelizePairs
Tuple2 t1 = new Tuple2(1, 2);
Tuple2 t2 = new Tuple2(3, 4);
Tuple2 t3 = new Tuple2(3, 6);
List list1 = new ArrayList<Tuple2>();
list1.add(t1);
list1.add(t2);
list1.add(t3);
JavaPairRDD<Integer, Integer> line2 = sc.parallelizePairs(list1);
line2.persist(StorageLevel.MEMORY_ONLY());

Tuple2 t4 = new Tuple2(3, 9);
List list2 = new ArrayList<Tuple2>();
list2.add(t4);
JavaPairRDD<Integer, Integer> line3 = sc.parallelizePairs(list2);
line3.persist(StorageLevel.MEMORY_ONLY());

// countByKey
System.out.println("222222222222222countByKey:");
Map<Integer, Object> ma = line2.countByKey();
for(Entry<Integer, Object> e:ma.entrySet()){
System.out.println(e.getKey()+" "+e.getValue()+" ");
}

// collectAsMap,如果key已存在,后面覆盖前面
System.out.println("3333333333333collectAsMap:");
Map<Integer, Integer> ca = line2.collectAsMap();
for(Entry<Integer, Integer> e:ca.entrySet()){
System.out.println(e.getKey()+" "+e.getValue()+" ");
}

// lookup
System.out.println("4444444444444lookup:");
List<Integer> la = line2.lookup(3);
for(Integer i:la){
System.out.println(i+" ");
}

// partitionBy,  partitioner, join+filter
Tuple2 ta = new Tuple2(1,"sina");
Tuple2 tb = new Tuple2(2, "taobao");
Tuple2 td = new Tuple2(2, "126");
List lista = new ArrayList<Tuple2>();
lista.add(ta);
lista.add(tb);
lista.add(td);
//自带hash分区,此外还有range分区
JavaPairRDD<Integer, String> linea = sc.parallelizePairs(lista).partitionBy(new HashPartitioner(2));
linea.persist(StorageLevel.MEMORY_ONLY());
Optional<Partitioner> op = linea.partitioner();
System.out.println("5555555555partitioner: "+ op);
System.out.println("66666666666present: "+ op.isPresent());
if (op.isPresent()) {
System.out.println("77777777value:"+ op.get().numPartitions());
}

Tuple2 tc = new Tuple2(2, "126");
Tuple2 te = new Tuple2(2, "baidu");
Tuple2 tf = new Tuple2(1, "dangdang");
List listc = new ArrayList<Tuple2>();
listc.add(tc);
listc.add(te);
listc.add(tf);
//自定义分区
JavaPairRDD<Integer, String> linec = sc.parallelizePairs(listc).partitionBy(new MyPartitioner(2));
linec.persist(StorageLevel.MEMORY_ONLY());
System.out.println("888888888888partitioner: "+ linec.partitioner());

System.out.println("9999999999join:");
JavaPairRDD<Integer, Tuple2<String, String>> js = linea.join(linec);
js.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){
public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception {
System.out.println(x);
}
});
JavaPairRDD<Integer, Tuple2<String, String>> fs = js.filter(new Function<Tuple2<Integer, Tuple2<String, String>>, Boolean>(){
public Boolean call(Tuple2<Integer, Tuple2<String, String>> y) throws Exception {
return !y._2._1.equals(y._2._2);
}
});
System.out.println("aaaaaaaaaaafilter:");
fs.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String,String>>>(){
public void call(Tuple2<Integer, Tuple2<String, String>> x) throws Exception {
System.out.println(x);
}
});
}
}


package com.fei.simple_project;

import org.apache.spark.Partitioner;

public class MyPartitioner extends Partitioner {
public int num;
public MyPartitioner(int N) {
num = N;
}
@Override
public int getPartition(Object x) {
int ret = x.hashCode()%num;
if(ret<0)
ret=(-1)*ret;
return ret;
}
@Override
public int numPartitions() {
return num;
}
}


111111111111mapToPair:
[Stage 0:>                                                          (0 + 0) / 4]16/02/04 20:15:45 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
(2,2 bb)
(4,4 cc)
(3,3 dd)
(1,1 aa)
222222222222222countByKey:
1 1
3 2
3333333333333collectAsMap:
1 2
3 6
4444444444444lookup:
4
6
5555555555partitioner: Optional.of(org.apache.spark.HashPartitioner@2)
66666666666present: true
77777777value:2
888888888888partitioner: Optional.of(com.fei.simple_project.MyPartitioner@4b78b)
9999999999join:
(1,(sina,dangdang))
(2,(taobao,126))
(2,(taobao,baidu))
(2,(126,126))
(2,(126,baidu))
aaaaaaaaaaafilter:
(1,(sina,dangdang))
(2,(taobao,126))
(2,(taobao,baidu))
(2,(126,baidu))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: