史上最简单的spark教程第四章-Java操作SaprkApi常用案例大集合
spark+Java代码案例集合:
这一章节大部分都是代码实例,spark中大部分常见的转换操作和行动操作
包含特定数据类型的RDD还支持的一些附加操作,
比如数字型的RDD支持的统计型函数操作.键值对RDD的聚合和键值操作等
史上最简单的spark教程
所有代码示例地址:https://github.com/Mydreamandreality/sparkResearch
(提前声明:文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章,写作不易,转载请注明)
(文章参考:Elasticsearch权威指南,Spark快速大数据分析文档,Elasticsearch官方文档,实际项目中的应用场景)
(帮到到您请点点关注,文章持续更新中!)
Git主页 https://github.com/Mydreamandreality
在开始之前再次把上一章的Java日志案例解释一波
在上一章中的案例中,我们用到了filter()函数对日志进行筛选,做了转换操作,
然后又使用count()对筛选的日志进行聚合
而Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算.
这个是很好理解的,你需要发送指令,告诉Spark,我需要在这一堆杂乱的数据中,筛选哪些数据,生成怎么样的数据
所以在我们使用的filter()函数中,我们传入了一个匿名内部类的函数,
function,
函数需要作为实现了Spar 的org.apache.spark.api.java.function包中的任一函数接口的对象来传递
- 比如上一章的案例,筛选error的日志信息,我们就是用匿名内部类进行函数传递的
JavaRDD<String> inputRDD = sparkContext.textFile("/usr/local/log"); JavaRDD<String> errorRDD = inputRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return s.contains("error"); } });
- 但是我们还可以使用具体类进行函数的传递
- 案例如下:
//定义具体类 class ContainsError implements Function<String,Boolean>{ @Override public Boolean call(String v1) throws Exception { return v1.contains("error"); } } JavaRDD<String> errorRDD = inputRDD.filter(new ContainsErrorDev("Error"));
- 至于到底用哪个,我个人还是觉得具体类比较好用一些,因为灵活
- 比如:你可以给构造器添加参数
- 案例如下:
class ContainsErrorDev implements Function<String,Boolean>{ private String query; public ContainsErrorDev(String query){ this.query = query; } public Boolean call(String v1) { return v1.contains(query); } }
以上就是如何给转换操作函数传递函数的解释,下面就开始我们的案例集合
-
刚才说了,此处的案例有针对特定数据类型的RDD操作的,也有支持任意类型RDD的案例
-
那就先看下哪些转换操作和行动操作接受任意数据类型的RDD支持
首先我们最经常用的两个,转换操作应该就是map()和filter()
- map()函数的话在第二章快速开发部署第一个Java+Spark程序中有使用的案例中有一个简单的讲解和栗子 map()接收一个函数,把这个函数用作于每个RDD的元素,把函数的返回结果作为新RDD的值
- map()的接收值和返回值不要要一致
-
filter()接收一个函数,把满足条件的RDD放入新的RDD中
- 先搞一个map()的案例
- 需求:使用map()对RDD中的所有数值求平方*
- 补充:
- 在第三章的时候,我有写过创建RDD的两种方式
- 之前只用过第一种方式,现在就使用第二种:驱动器程序中分发集合(第二种方式只适合测试或者调试使用)
/** * 计算RDD中各值的平方 */ public void map(JavaSparkContext sparkContext) { //这里我们直接创建RDD JavaRDD<Integer> num = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); //新生成的RDD元素 JavaRDD<Integer> result = num.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1 * v1; } }); System.out.println(StringUtils.join(result.collect(),",")); }
- 搞一个flowmap()的案例
- flatMap()的解释: 对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()
/** * flatMap分割字符串 */ public void flatMap(JavaSparkContext sparkContext){ JavaRDD<String> lines = sparkContext.parallelize(Arrays.asList("hello world", "hi")); JavaRDD<String> flatMapResult = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(PATTERN.split(s)).iterator(); } }); flatMapResult.first(); //结果:hello }
map和flatmap之间的关系可以这么理解
目前常用的map和flatmap,filter已经写完了
这里写一些其他的案例
伪集合操作:如下
-
数据混洗:
因为常常有重复的元素.如果只要唯一的元素,我们可以使用RDD.distinct()
转化操作来生成一个只包含不同元素的新RDD - 不过distinct操作的开销很大,因为数据是通过网络混洗的,后续再继续了解下有没有优化的办法
集合操作:
-
union
返回一个包含两个 RDD 中所有元素的 RDD
-
只返回两个RDD中都有的元素
案例:
操作数据
对数据{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作
[注意此处是两个RDD元素]
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 生成一个包含两个RDD中所有元素的RDD | RDD.union() | {1, 2, 3, 3, 4, 5} |
intersection() | 求两个 RDD 共同的元素的 RDD | RDD.intersection | {3} |
subtract() | 移除一个RDD中的内容 (比如训练数据) | RDD.subtract() | {1,2} |
行动操作案例:
最常见的行动操作:reduce()
- reduce()的案例
- reduce()的解释: 接收一个函数作为参数,操作两个RDD的元素,并且返回一个相同类型的新元素
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); Integer sum =rdd.reduce( new Function2<Integer, Integer, Integer>() { public Integercall(Integer v1, Integer v2) throws Exception { return v1+v2; } } ); System.out.println(sum.intValue()); //最后输出:55
但是有些时候我们的输入参数和输出参数不一定一致,那就需要:aggregate
- aggregate()的案例
- aggregate()的解释: aggregate()函数则把我们从返回值类型必须与所操作的RDD类型相同的限制中解放出来
- 使用 aggregate() 时,需要提供我们期待返回的类型的初始值
- 然后通过一个函数把RDD中的元素合并起来放入累加器,考虑到每个节点是在本地进行累加的
- 最终.还需要提供第二个函数来将累加器两两合并
/** * Created by 張燿峰 * aggregate计算AVG * * @author 孤 * @date 2019/3/15 * @Varsion 1.0 */ public class RddAvg { private Integer total; private Integer num; public RddAvg(Integer total, Integer num) { this.total = total; this.num = num; } public double avg() { return total / num; } Function2<RddAvg, Integer, RddAvg> avgFunction2 = new Function2<RddAvg, Integer, RddAvg>() { @Override public RddAvg call(RddAvg v1, Integer v2) { v1.total += v2; v1.num += 1; return v1; } }; Function2<RddAvg,RddAvg,RddAvg> rddAvgFunction2 = new Function2<RddAvg, RddAvg, RddAvg>() { @Override public RddAvg call(RddAvg v1, RddAvg v2) { v1.total += v2.total; v1.num += v2.num; return v1; } }; }
package spark; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; /** * Created by 張燿峰 * 测试Avg * * @author 孤 * @date 2019/3/15 * @Varsion 1.0 */ public class Application { public static void main(String[] args) { rddAvg(new JavaSparkContext()); } public static void rddAvg(JavaSparkContext sparkContext) { JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); RddAvg rddAvg = new RddAvg(0, 0); RddAvg result = javaRDD.aggregate(rddAvg, rddAvg.avgFunction2, rddAvg.rddAvgFunction2); System.out.println(result.avg()); } }
常用的案例我暂时先写了这么多,周六日抽空更新下Springboot和Elasticsearch分布式搜索的案例吧,
下周继续Spark的更新
有什么不懂的可以留言交流一波,
代码案例地址https://github.com/Mydreamandreality/sparkResearch
- 史上最简单的spark教程第六章-键值对RDD聚合,分组,统计的Java案例实践-(下集)[核心基础完结篇章]
- 史上最简单的spark教程第十章-SparkSQL编程Java案例实践(二)
- 史上最简单的spark教程第六章-键值对RDD聚合,分组,统计的Java案例实践-(上集)
- 史上最简单的spark教程第七章-spark的数据读取与保存Java案例实践
- 史上最简单的spark教程第八章-spark的自定义累加器与广播变量Java案例实践
- 史上最简单的spark教程第九章-SparkSQL编程Java案例实践(一)斗图王来辣
- JAVA AJAX教程第四章—AJAX和MVC的简单结合
- POI实战-java开发excel详解(第四章 常用操作-单元格合并与数据读取)
- 对Java中常用集合的操作
- Git简单教程(日常操作最常用的命令)
- POI实战-java开发excel详解(第四章 常用操作-注释)
- java集合常用操作
- java操作Excel表格读取简单案例(jxl)
- Java中对List集合的常用操作
- java导入导出excel常用操作小结及简单示例
- java常用的数组、字符串、集合操作以及数据结构与算法基本知识
- Java中对List集合的常用操作
- spring boot进阶(一) springboot整合redis,可操作java对象。最完整、简单易懂、详细的spring boot教程。
- Java集合-Map(简单概述和案例)
- JAVA AJAX教程第四章—AJAX和MVC的简单结合