大数据IMF传奇行动绝密课程第103课:动手实战Spark Streaming Broadcast、Accumulator实现在线黑名单过滤和计数
2017-04-03 22:16
633 查看
动手实战Spark Streaming Broadcast、Accumulator实现在线黑名单过滤和计数
1、自定义Receiver分析2、自定义Receiver实战
package com.tom.spark.SparkApps.sparkstreaming; import java.util.Arrays; import java.util.List; import org.apache.hadoop.hive.ql.parse.HiveParser.ifExists_return; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class SparkStreamingBroadcastAccumulator { private static volatile Broadcast<List<String>> broadcastList = null; private static volatile Accumulator<Integer> accumulator = null; /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub //好处:1、checkpoint 2、工厂 SparkConf conf = new SparkConf().setAppName("SparkStreamingBroadcastAccumulator").setMaster("hdfs://Master:7077/"); JavaStreamingContext javassc = new JavaStreamingContext(conf, Durations.seconds(15)); //没有action广播不会发出 //使用Broadcast广播黑名单到每个Executor中 broadcastList = javassc.sparkContext().broadcast(Arrays.asList("Hadoop","Mahout","Hive")); //全局计数器,用于统计在线过滤了多少个黑名单 accumulator = javassc.sparkContext().accumulator(0, "OnlineBlacklistCounter"); //创建Kafka元数据来让Spark Streaming这个Kafka Consumer利用 JavaReceiverInputDStream<String> lines = javassc.socketTextStream("Master", 9999); JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String t) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(t, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ //对相同的key,进行Value的累加(包括Local和Reducer级别同时Reduce) public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1 + v2; } }); wordsCount.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception { // TODO Auto-generated method stub rdd.filter(new Function<Tuple2<String,Integer>, Boolean>() { public Boolean call(Tuple2<String, Integer> wordPair) throws Exception { if(broadcastList.value().contains(wordPair._1)) { accumulator.add(wordPair._2); return false; } else { return true; } } }).collect(); System.out.println(broadcastList.value().toString() + " : " + accumulator.value()); return null; } }); wordsCount.print(); /** * Spark Streaming 执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于 * 接收应用程序本身或者Executor中的消息, */ javassc.start(); javassc.awaitTermination(); javassc.close(); } }
相关文章推荐
- 大数据IMF传奇行动绝密课程第94课:SparkStreaming实现广告计费系统中在线黑名单过滤实战
- 大数据IMF传奇行动绝密课程第82课:Spark Streaming案例动手实战并在电光石火间理解其工作原理
- 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数
- 大数据IMF传奇行动绝密课程第102课:动手实战Spark Streaming自定义Receiver并进行调试和测试
- 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数
- 大数据IMF传奇行动绝密课程第97课:使用SparkStreaming+SparkSQL实现在线动态计算出特定时间窗口
- 大数据IMF传奇行动绝密课程第95课:通过SparkStreaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战
- 第103讲: 动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数
- 大数据IMF传奇行动绝密课程第87课:Flume推送数据到Spark Streaming案例实战和内幕源码解密
- 动手实战联合使用Spark Streaming、Broadcast、Accumulator计数器实现在线黑名单过滤和计数
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第83课:透彻讲解使用Scala和Java两种方式实战Spark Streaming开发
- 大数据IMF传奇行动绝密课程第98-99课:使用Spark Streaming实战对论坛网站动态行为的多维度分析
- 大数据IMF传奇行动绝密课程第93课:SparkStreaming updateStateByKey案例实战和内置源码解密
- 大数据IMF传奇行动绝密课程第88课:SparkStreaming从Flume Poll数据案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第89课:SparkStreaming On Kafka之kafka解析和安装实战
- 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第61课:Spark SQL数据加载和保存内幕深度解密实战
- 大数据IMF传奇行动绝密课程第96课:通过SparkStreaming的foreachRDD把处理后的数据写入外部存储系统中