Spark:使用Java实现所有的Transformation操作
2017-12-06 16:00
405 查看
完整代码如下
package cn.spark.study.core; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import akka.dispatch.Filter; import scala.Tuple2; /** * transformation操作实战 * @author Administrator * */ @SuppressWarnings(value = {"unused","unchecked"}) public class TransformationOperation_6 { public static void main(String[] args) { map(); filter(); flatMap(); groupByKey(); reduceByKey(); sortByKey(); join(); cogroup(); } /** * map算子案例:将集合中每一个元素都乘以2 */ private static void map(){ //创建sparkconf SparkConf conf = new SparkConf().setAppName("map").setMaster("local"); //创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); //构造集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5); //并行化集合,创建初始rdd JavaRDD<Integer> numberRdd = sc.parallelize(numbers); //使用map算子,将集合中的每个元素都乘以2 //map算子是对任何类型的rdd都可以调用的 //在java中,map算子接受的参数是Function对象 //创建的function对象,一定会让你设置第二个泛型参数,这个泛型类型,就是返回的新元素的类型 //同时 call()方法的返回类型,也必须与第二个泛型类型同步 //在call()方法内部,就可以对原始rdd中的每一个元素进行各种处理和计算,并返回一个新的元素, //所有新的元素就会组成一个新的rdd JavaRDD<Integer> multipleNumberRdd = numberRdd.map(new Function<Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; //传入call()方法的就是1,2,3,4,5 //返回的就是2,4,6,8,10 @Override public Integer call(Integer v1) throws Exception { return v1*2; } }); //打印新的RDD multipleNumberRdd.foreach(new VoidFunction<Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Integer t) throws Exception { System.out.println(t); } }); //关闭JavaSparkContext sc.close(); } /** * filter 算子案例:过滤集合中的偶数 */ private static void filter(){ //创建sparkconf SparkConf conf = new SparkConf().setAppName("filter").setMaster("local"); //创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,14); //并行化集合,创建初始化rdd JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //对初始rdd使用filter算子,过滤出集合中的偶数 //filter算子,传入的也是Function,其他的使用注意点,实际上和map是一样的 //但是唯一不同的就是call()方法的返回类型是boolean //每一个初始rdd中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑 //来判断这个元素是否是你想要的 //如果你想在新的rdd中保留这个元素,那么就返回true,苟泽返回false JavaRDD< Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() { /** * */ private static final long serialVersionUID = 1L; @Override public Boolean call(Integer v) throws Exception { return v % 2 == 0; } }); //打印新的RDD evenNumberRDD.foreach(new VoidFunction<Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Integer t) throws Exception { System.out.println(t); } }); //关闭JavaSparkContext sc.close(); } /** * flatMap 案例:将文本行拆分为多个单词 */ private static void flatMap(){ //创建sparkconf SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local"); //创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); //构造集合 List<String> lineList = Arrays.asList("hello you","hello me","hello world"); //并行化集合,创建RDD JavaRDD<String> lines = sc.parallelize(lineList); //对RDD执行flatMap算子,将每一行文本,拆分为多个单词 //flatMap算子,在java中,接受的参数是FlatMapFunction //我们需要自己定义FlatMapFunction的第二个泛型类型,即代表了返回的新元素的类型 //call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同 //flatMap其实就是,接受原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素 //多个元素,即封装在Iterable集合中,可以使用ArrayList等集合 //新的RDD中,即封装了所有的新元素,也就是说,新的RDD的大小一定是>=原始RDD的大小 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String v) throws Exception { return Arrays.asList(v.split(" ")); } }); //打印新的RDD words.foreach(new VoidFunction<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(String t) throws Exception { System.out.println(t); } }); //关闭JavaSparkContext sc.close(); } /** * groutByKey案例:安装班级对成绩进行分组 */ private static void groupByKey(){ //创建sparkConf SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local"); //创建sparkcontext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90), new Tuple2<String, Integer>("class2", 85)); //并行化集合,创建JavaPariRDD JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组 //groupByKey算子,返回的还是JavaPariRDD //但是JavaPariRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型 //也就是说,按照了Key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable //那么接下来,我们是不是就可以通过groupedScores这种JavaPariRDD,很方便地处理某种分组内的数据 JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey(); //打印新的RDD groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class:"+t._1); Iterator<Integer> iterable = t._2.iterator(); while(iterable.hasNext()){ System.out.println(iterable.next()); } System.out.println("==================="); } }); //关闭SparkContext sc.close(); } /** * reduceByKey案例:统计每个班级的总分 */ private static void reduceByKey(){ //创建sparkConf SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local"); //创建sparkcontext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90), new Tuple2<String, Integer>("class2", 85)); //并行化集合,创建JavaPariRDD JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //针对scores RDD,执行reduceByKey算子 //reduceByKey,接收的参数是Function2类型,它有三个泛型类型,实际上代表了三个值 //第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型 //因此对每个key进行reduce,都会一次将第一个、第二个value传入,将值再与第三个value传入 //因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数类型 //第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的 //reduceByKey算子返回的RDD,还是JavaPariRDD<key,value> JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); //打印新的RDD totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1+" :"+t._2); } }); //关闭SparkContext sc.clos c209 e(); } /** * sortByKey案例:按照学生分数进行排序 */ private static void sortByKey(){ //创建sparkConf SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local"); //创建sparkcontext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Tuple2<Integer, String>> scoreList = Arrays.asList( new Tuple2<Integer, String>(65, "leo"), new Tuple2<Integer, String>(80, "tom"), new Tuple2<Integer, String>(90, "marry"), new Tuple2<Integer, String>(70, "jack")); //并行化集合,创建JavaPariRDD JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList); //对scores RDD执行sortByKey算子 //sortByKey其实就是根据key进行排序,可以手动指定升序或者降序 //返回的,还是JavaPariRDD,其中的元素内容,都是和原始的RDD一模一样的 //但是就是RDD中的元素的顺序不同了 JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false); //打印新的RDD sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<Integer, String> t) throws Exception { System.out.println(t._1+":"+t._2); } }); //关闭SparkContext sc.close(); } /** * join案例:打印学生成绩 */ private static void join(){ //创建sparkConf SparkConf conf = new SparkConf().setAppName("join").setMaster("local"); //创建sparkcontext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "leo"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90), new Tuple2<Integer, Integer>(3, 70)); //并行化两个RDD JavaPairRDD<Integer,String> students = sc.parallelizePairs(studentList); JavaPairRDD<Integer,Integer> scores = sc.parallelizePairs(scoreList); //使用join算子关联两个RDD //join以后,还是会根据key进行join,并返回JavaPariRDD //但是JavaPariRDD的第一个泛型类型,之前两个JavaPariRDD的key的类型,因为是通过key进行join的 //第二个泛型类型,是Tuple2<v1,v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型 //join,就返回的RDD的每一个元素,就是通过key join上的一个pair //什么意思?比如(1,1)(1,2)(1,3)的一个RDD // 还有一个(1,4)(2,1)(2,2)的一个RDD //join以后,实际上会得到(1,(1,4))(1,(2,4))(1,(3,4)) JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores); //打印新的rdd studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { System.out.println("student id:" + t._1); System.out.println("student name:"+t._2._1); System.out.println("student score:"+t._2._2); System.out.println("==========================="); } }); //关闭SparkContext sc.close(); } /** * cogroup案例:打印学生成绩 */ private static void cogroup(){ //创建sparkConf SparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local"); //创建sparkcontext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "leo"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90), new Tuple2<Integer, Integer>(3, 70), new Tuple2<Integer, Integer>(1, 80), new Tuple2<Integer, Integer>(2, 95), new Tuple2<Integer, Integer>(3, 60)); //并行化两个RDD JavaPairRDD<Integer,String> students = sc.parallelizePairs(studentList); JavaPairRDD<Integer,Integer> scores = sc.parallelizePairs(scoreList); //使用cogroup算子关联两个RDD //cogroup和join不同 //相当于是一个key join上的所有value,都给放到一个Iteable里面去了 //cogroup,不太好讲解,希望多加理解 JavaPairRDD<Integer, Tuple2<Iterable<String>,Iterable<Integer>>> studentScores = students.cogroup(scores); //打印新的rdd studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception { System.out.println("student id:" + t._1); System.out.println("student name:"+t._2._1); System.out.println("student score:"+t._2._2); System.out.println("==========================="); } }); //关闭SparkContext sc.close(); }
}
相关文章推荐
- 学习之使用Java IO 来实现复制文件的操作
- 使用监听器实现JAVA代码对数据库的定时操作,求大神帮帮忙!!!
- 在不使用if,while,do...while,for,switch,?:等操作,实现函数void printLess(int k),返回小于k的所有整数
- Java中使用synchronized关键字实现简单同步操作示例
- JAVA操作XML一(读取):使用DOM读取XML数据的两种具体实现
- 使用JAVA的开源API-JExcelAPI来操作Excel,实现基本的功能
- [Java Web]ibatis使用queryForMap实现数据查找等操作
- spark中transformation操作的各种算子(java版)
- 【JAVA使用XPath、DOM4J解析XML文件,实现对XML文件的CRUD操作】
- 使用java实现,随机取4张牌,使用加减乘除得到24的所有可能情况
- SPARK 使用Java 在IDE中实战RDD和DataFrame动态转换操作
- 市面上所有号称"虚拟机","防火墙"的实时监控杀毒软件无一不是使用的IFSHOOK技术.但是同时也有一些朋友不断写MAIL给我打听如何实现读写的监控.下面给出用VTOOLSD写的代码.也就是所有实时杀毒软件的奥秘.同时,很多拦截文件操作的软件,例如对目录加
- echars java帮助类使用操作实现
- java使用Field实现通用数据库操作
- 使用java基本语法实现的小程序(从1000年1月1日到9999年12月31日所有的对称日)
- 通过代理类实现java连接数据库(使用dao层操作数据)实例分享
- [Java] 使用 Apache的 Commons-net库 实现FTP操作
- 使用Java中JTextArea实现类似命令行操作的界面
- java线程池的使用,实现大量数据的更新操作