您的位置:首页 > 大数据 > 人工智能

Spark PairRDD 转化一

2016-02-01 20:51 337 查看
package edu.berkeley.simple_project;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

/**
* Hello world!
*
*/
public class App {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);

// convert from other RDD
JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"));
JavaPairRDD<String, String> prdd = line1.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) throws Exception {
return new Tuple2(x.split(" ")[0], x);
}
});
System.out.println("111111111111mapToPair:");
prdd.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> x) throws Exception {
System.out.println(x);
}
});

// parallelizePairs
Tuple2 t1 = new Tuple2(1, 5);
Tuple2 t2 = new Tuple2(1, 3);
Tuple2 t3 = new Tuple2(3, 7);
List list1 = new ArrayList<Tuple2>();
list1.add(t1);
list1.add(t2);
list1.add(t3);
JavaPairRDD<Integer, Integer> line2 = sc.parallelizePairs(list1);
line2.persist(StorageLevel.MEMORY_ONLY());

System.out.println("22222222222222222parallelize:");
line2.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
public void call(Tuple2<Integer, Integer> x) throws Exception {
System.out.println(x);
}
});

// reduceByKey
JavaPairRDD<Integer, Integer> line3 = line2.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) throws Exception {
return (x + y);
}
});
System.out.println("333333333333reduceByKey:");
line3.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
public void call(Tuple2<Integer, Integer> x) throws Exception {
System.out.println(x);
}
});

// groupByKey
JavaPairRDD<Integer, Iterable<Integer>> line4 = line2.groupByKey();

System.out.println("44444444444444groupByKey:");
line4.foreach(new VoidFunction<Tuple2<Integer, Iterable<Integer>>>() {
public void call(Tuple2<Integer, Iterable<Integer>> x) throws Exception {
System.out.println(x);
}
});

// mapValues
JavaPairRDD<Integer, Integer> line5 = line2.mapValues(new Function<Integer, Integer>() {
public Integer call(Integer x) throws Exception {
return x * x;
}
});

System.out.println("555555555555555mapValues:");
line5.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
public void call(Tuple2<Integer, Integer> x) throws Exception {
System.out.println(x);
}
});

// flatMapValues
JavaPairRDD<Integer, Integer> line6 = line2.flatMapValues(new Function<Integer, Iterable<Integer>>() {
public Iterable<Integer> call(Integer x) throws Exception {
ArrayList list = new ArrayList();
list.add(x);
return list;
}
});

System.out.println("666666666666flatMapValues:");
line6.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
public void call(Tuple2<Integer, Integer> x) throws Exception {
System.out.println(x);
}
});

// keys
JavaRDD<Integer> line7 = line2.keys();
System.out.println("777777777777keys:");
line7.foreach(new VoidFunction<Integer>() {
public void call(Integer x) throws Exception {
System.out.println(x);
}
});

// values
JavaRDD<Integer> line8 = line2.values();
System.out.println("888888888888888values:");
line8.foreach(new VoidFunction<Integer>() {
public void call(Integer x) throws Exception {
System.out.println(x);
}
});

// sortByKey
JavaPairRDD<Integer, Integer> line9 = line2.sortByKey(false);
// JavaPairRDD<Integer, Integer> line9 = line2.sortByKey(new
// MyComparator(), true);  MyComparator必须实现Comparator接口和Serializable接口
System.out.println("9999999999999sortByKey:");
line9.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
public void call(Tuple2<Integer, Integer> x) throws Exception {
System.out.println(x);
}
});

// filter
JavaPairRDD<Integer, Integer> line10 = line2.filter(new Function<Tuple2<Integer, Integer>,Boolean>(){
public Boolean call(Tuple2<Integer, Integer> x) throws Exception {
return (x._1>2);
}
});
System.out.println("aaaaaaaaaaaaaaaafilter:");
line10.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
public void call(Tuple2<Integer, Integer> x) throws Exception {
System.out.println(x);
}
});

}
}


package edu.berkeley.simple_project;

import java.util.Comparator;

public class MyComparator implements Comparator<Integer>{
public int compare(Integer x, Integer y) {
// TODO Auto-generated method stub
return (x-y);
}

}


111111111111mapToPair:
[Stage 0:>                                                          (0 + 0) / 4]16/02/01 18:23:35 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
(4,4 cc)
(3,3 dd)
(1,1 aa)
(2,2 bb)
22222222222222222parallelize:
(3,7)
(1,3)
(1,5)
333333333333reduceByKey:
(3,7)
(1,8)
44444444444444groupByKey:
(1,[5, 3])
(3,[7])
555555555555555mapValues:
(3,49)
(1,25)
(1,9)
666666666666flatMapValues:
(1,3)
(3,7)
(1,5)
777777777777keys:
1
1
3
888888888888888values:
5
7
3
9999999999999sortByKey:
(1,5)
(1,3)
(3,7)
aaaaaaaaaaaaaaaafilter:
(3,7)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: