您的位置:首页 > 编程语言 > Java开发

史上最简单的spark教程第四章-Java操作SaprkApi常用案例大集合

2019-03-15 18:50 411 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/youbitch1/article/details/88581251

spark+Java代码案例集合:

这一章节大部分都是代码实例,spark中大部分常见的转换操作和行动操作
包含特定数据类型的RDD还支持的一些附加操作,
比如数字型的RDD支持的统计型函数操作.键值对RDD的聚合和键值操作等

史上最简单的spark教程
所有代码示例地址:https://github.com/Mydreamandreality/sparkResearch

(提前声明:文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章,写作不易,转载请注明)
(文章参考:Elasticsearch权威指南,Spark快速大数据分析文档,Elasticsearch官方文档,实际项目中的应用场景)
(帮到到您请点点关注,文章持续更新中!)
Git主页 https://github.com/Mydreamandreality

在开始之前再次把上一章的Java日志案例解释一波

在上一章中的案例中,我们用到了filter()函数对日志进行筛选,做了转换操作,
然后又使用count()对筛选的日志进行聚合

而Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算.
这个是很好理解的,你需要发送指令,告诉Spark,我需要在这一堆杂乱的数据中,筛选哪些数据,生成怎么样的数据

所以在我们使用的filter()函数中,我们传入了一个匿名内部类的函数,

function
,
函数需要作为实现了Spar 的
org.apache.spark.api.java.function
包中的任一函数接口的对象来传递

  • 比如上一章的案例,筛选error的日志信息,我们就是用匿名内部类进行函数传递的
JavaRDD<String> inputRDD = sparkContext.textFile("/usr/local/log");
JavaRDD<String> errorRDD = inputRDD.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("error");
}
});
  • 但是我们还可以使用具体类进行函数的传递
  • 案例如下:
//定义具体类
class ContainsError implements Function<String,Boolean>{
@Override
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}
}

JavaRDD<String> errorRDD = inputRDD.filter(new ContainsErrorDev("Error"));
  • 至于到底用哪个,我个人还是觉得具体类比较好用一些,因为灵活
  • 比如:你可以给构造器添加参数
  • 案例如下:
class ContainsErrorDev implements Function<String,Boolean>{
private String query;
public ContainsErrorDev(String query){
this.query = query;
}
public Boolean call(String v1) {
return v1.contains(query);
}
}

以上就是如何给转换操作函数传递函数的解释,下面就开始我们的案例集合

  • 刚才说了,此处的案例有针对特定数据类型的RDD操作的,也有支持任意类型RDD的案例

  • 那就先看下哪些转换操作和行动操作接受任意数据类型的RDD支持

首先我们最经常用的两个,转换操作应该就是map()和filter()

  • filter()函数的话上一章节和这章节都有说到
      filter()接收一个函数,把满足条件的RDD放入新的RDD中
    • 先搞一个map()的案例
    • 需求:使用map()对RDD中的所有数值求平方*
    • 补充:
    • 在第三章的时候,我有写过创建RDD的两种方式

    • 之前只用过第一种方式,现在就使用第二种:驱动器程序中分发集合(第二种方式只适合测试或者调试使用)
    /**
    * 计算RDD中各值的平方
    */
    public void map(JavaSparkContext sparkContext) {
    
    //这里我们直接创建RDD
    JavaRDD<Integer> num = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
    
    //新生成的RDD元素
    JavaRDD<Integer> result = num.map(new Function<Integer, Integer>() {
    @Override
    public Integer call(Integer v1) throws Exception {
    return v1 * v1;
    }
    });
    System.out.println(StringUtils.join(result.collect(),","));
    }
    • 搞一个flowmap()的案例
    • flatMap()的解释: 对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()
  • 需求:使用flatMap()对RDD中的所有字符串切分为单词*
  • 代码示例
  • /**
    * flatMap分割字符串
    */
    public void flatMap(JavaSparkContext sparkContext){
    JavaRDD<String> lines = sparkContext.parallelize(Arrays.asList("hello world", "hi"));
    
    JavaRDD<String> flatMapResult  = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String s) throws Exception {
    return Arrays.asList(PATTERN.split(s)).iterator();
    }
    });
    
    flatMapResult.first();
    
    //结果:hello
    }

    map和flatmap之间的关系可以这么理解

    目前常用的map和flatmap,filter已经写完了

    这里写一些其他的案例

    伪集合操作:如下

    • 数据混洗:

      因为常常有重复的元素.如果只要唯一的元素,我们可以使用
      RDD.distinct()
      转化操作来生成一个只包含不同元素的新RDD
    • 不过distinct操作的开销很大,因为数据是通过网络混洗的,后续再继续了解下有没有优化的办法
  • 集合操作:

      union 返回一个包含两个 RDD 中所有元素的 RDD
    • 如果输入的RDD中有重复数据,Spark的union()操作也会去除重复数据
  • intersection
      只返回两个RDD中都有的元素
    • intersection() 在运行时也会去掉所有重复的元素()单个RDD内的重复元素也会一起移除)
    • intersection性能比union差,它需要通过网络混洗数据来发现共有的元素

    案例:
    操作数据
    对数据{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作

    [注意此处是两个RDD元素]

    函数名 目的 示例 结果
    union() 生成一个包含两个RDD中所有元素的RDD RDD.union() {1, 2, 3, 3, 4, 5}
    intersection() 求两个 RDD 共同的元素的 RDD RDD.intersection {3}
    subtract() 移除一个RDD中的内容 (比如训练数据) RDD.subtract() {1,2}

    行动操作案例:

    最常见的行动操作:reduce()

    • reduce()的案例
    • reduce()的解释: 接收一个函数作为参数,操作两个RDD的元素,并且返回一个相同类型的新元素
  • 需求:使用reduce()对RDD中的元素进行累加*
  • 代码示例
  • JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
    Integer sum =rdd.reduce(
    new Function2<Integer, Integer, Integer>() {
    public Integercall(Integer v1, Integer v2) throws Exception {
    return v1+v2;
    }
    }
    );
    System.out.println(sum.intValue());
    
    //最后输出:55

    但是有些时候我们的输入参数和输出参数不一定一致,那就需要:aggregate

    • aggregate()的案例
    • aggregate()的解释: aggregate()函数则把我们从返回值类型必须与所操作的RDD类型相同的限制中解放出来
    • 使用 aggregate() 时,需要提供我们期待返回的类型的初始值
    • 然后通过一个函数把RDD中的元素合并起来放入累加器,考虑到每个节点是在本地进行累加的
    • 最终.还需要提供第二个函数来将累加器两两合并
  • 需求:使用aggregate() 来计算 RDD 的平均值
  • 代码示例
  • /**
    * Created by 張燿峰
    * aggregate计算AVG
    *
    * @author 孤
    * @date 2019/3/15
    * @Varsion 1.0
    */
    public class RddAvg {
    private Integer total;
    private Integer num;
    
    public RddAvg(Integer total, Integer num) {
    this.total = total;
    this.num = num;
    }
    
    public double avg() {
    return total / num;
    }
    
    Function2<RddAvg, Integer, RddAvg> avgFunction2 = new Function2<RddAvg, Integer, RddAvg>() {
    @Override
    public RddAvg call(RddAvg v1, Integer v2) {
    v1.total += v2;
    v1.num += 1;
    return v1;
    }
    };
    
    Function2<RddAvg,RddAvg,RddAvg> rddAvgFunction2 = new Function2<RddAvg, RddAvg, RddAvg>() {
    @Override
    public RddAvg call(RddAvg v1, RddAvg v2) {
    v1.total += v2.total;
    v1.num += v2.num;
    return v1;
    }
    };
    }
    package spark;
    
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    import java.util.Arrays;
    
    /**
    * Created by 張燿峰
    * 测试Avg
    *
    * @author 孤
    * @date 2019/3/15
    * @Varsion 1.0
    */
    public class Application {
    public static void main(String[] args) {
    rddAvg(new JavaSparkContext());
    }
    
    public static void rddAvg(JavaSparkContext sparkContext) {
    JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    RddAvg rddAvg = new RddAvg(0, 0);
    RddAvg result = javaRDD.aggregate(rddAvg, rddAvg.avgFunction2, rddAvg.rddAvgFunction2);
    System.out.println(result.avg());
    }
    }

    常用的案例我暂时先写了这么多,周六日抽空更新下Springboot和Elasticsearch分布式搜索的案例吧,
    下周继续Spark的更新

    有什么不懂的可以留言交流一波,

    代码案例地址https://github.com/Mydreamandreality/sparkResearch

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: