Spark Streaming从Kafka自定义时间间隔内实时统计行数、TopN并将结果存到hbase中
2017-08-02 15:23
459 查看
一、统计kafka的topic在10秒间隔内生产数据的行数并将统计结果存入到hbase中
先在hbase中建立相应的表:
create 'linecount','count'
开启kafka集群并建立相应的topic:
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper h71:2181,h72:2181,h73:2181 --replication-factor 3 --partitions 2 --topic test
启动生产者:
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list h71:9092,h72:9092,h73:9092 --topic test
java代码:
在myeclipse中运行该代码后在kafka的生产者终端输入数据:
hello world
ni hao a
hello spark
注意:如果你是将我这三行复制过去的话还要再按一下回车键,否则的话你实际输入的是两行
过一段时间后再输入数据:
i
love
you
baby
,
come
on
查看linecount表:
二、统计kafka的topic在10秒间隔内生产数据的TopN并将统计结果存入到hbase中
在hbase中创建相应的Top3表:
create 'KafkaTop','TopN'
java代码:
在myeclipse中运行该代码后在kafka的生产者终端输入数据:
hello world
hello hadoop
hello hive
hello hadoop
hello world
hello world
hbase hive
在myeclipse的打印台会输出:
查看hbase表:
三、下面这个不是Spark Streaming的,是来自网上的一个列子,相当于离线分析TopN,仅做参考
来自:http://blog.csdn.net/accptanggang/article/details/52924970
下面是源数据hui.txt,我存放在了我的Windows电脑的桌面的spark文件夹里,取出最大的前3个数字:
2
4
1
6
8
10
34
89
java代码:
在myeclipse中运行结果为:
89
34
10
先在hbase中建立相应的表:
create 'linecount','count'
开启kafka集群并建立相应的topic:
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper h71:2181,h72:2181,h73:2181 --replication-factor 3 --partitions 2 --topic test
启动生产者:
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list h71:9092,h72:9092,h73:9092 --topic test
java代码:
import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import kafka.serializer.StringDecoder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; public class KafkaDirectWordCountPersistHBase { private static String beginTime = null; private static int cishu = 0; private static int interval = 0; private static String rowkey = null; public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void insert(String tableName, String rowKey, String family, String quailifer, String value) { try { HTable table = new HTable(getConfiguration(), tableName); Put put = new Put(rowKey.getBytes()); put.add(family.getBytes(), quailifer.getBytes(), value.getBytes()) ; table.put(put); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]"); //这里设置每多少秒计算一次,我这里设置的间隔是10秒 interval = 10; // JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000)); //毫秒 JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(interval)); //秒 // 首先要创建一份kafka参数map Map<String, String> kafkaParams = new HashMap<String, String>(); // 我们这里是不需要zookeeper节点的,所以我们这里放broker.list kafkaParams.put("metadata.broker.list", "192.168.8.71:9092,192.168.8.72:9092,192.168.8.73:9092"); // 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic Set<String> topics = new HashSet<String>(); topics.add("test"); JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream( jssc, String.class, // key类型 String.class, // value类型 StringDecoder.class, // 解码器 StringDecoder.class, kafkaParams, topics); //在第一个间隔的时候其实并非一定等于10秒的,而是小于等于10秒的 SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); java.util.Date date=new java.util.Date(); System.out.println("StreamingContext started->"+time.format(new Date())); beginTime=time.format(date); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Tuple2<String,String> tuple) throws Exception { return Arrays.asList(tuple._2.split("/n")); //按行进行分隔 } }); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>("line", 1); } }); JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordcounts.print(); wordcounts.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(JavaPairRDD<String, Integer> wordcountsRDD) throws Exception { SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); java.util.Date date=new java.util.Date(); System.out.println("endTime1-->"+time.format(new Date())); //yyyy-MM-dd HH:mm:ss形式 final long endTime1 = System.currentTimeMillis(); System.out.println("endTime1-->"+endTime1); //时间戳格式 final String endTime=time.format(date); cishu++; System.out.println("cishu-->"+cishu); if(cishu == 1){ rowkey = beginTime+"__"+endTime; insert("linecount", rowkey, "count", "sum", "0") ; }else{ SimpleDateFormat hh1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date1 = hh1.parse(endTime); long hb=date1.getTime(); long a2 = hb - interval*1000; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date2 = new Date(a2); String beginTime1 = simpleDateFormat.format(date2); rowkey = beginTime1+"__"+endTime; insert("linecount", rowkey, "count", "sum", "0") ; } //foreachPartition这个方法好像和kafka的topic的分区个数有关系,如果你topic有两个分区,则这个方法会执行两次 wordcountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception { Tuple2<String,Integer> wordcount = null; //注意:这里是利用了在hbase中对同一rowkey同一列再查入数据会覆盖前一次值的特征,所以hbase中linecount表的版本号必须是1,建表的时候如果你不修改版本号的话默认是1 while(wordcounts.hasNext()){ wordcount = wordcounts.next(); insert("linecount", rowkey, "count", "sum", wordcount._2.toString()) ; } } }); } }); jssc.start(); jssc.awaitTermination(); jssc.close(); } }
在myeclipse中运行该代码后在kafka的生产者终端输入数据:
hello world
ni hao a
hello spark
注意:如果你是将我这三行复制过去的话还要再按一下回车键,否则的话你实际输入的是两行
过一段时间后再输入数据:
i
love
you
baby
,
come
on
查看linecount表:
hbase(main):187:0> scan 'linecount' ROW COLUMN+CELL 2017-07-26 17:27:56__2017-07-26 17:28:00 column=count:sum, timestamp=1501061244619, value=0 2017-07-26 17:28:00__2017-07-26 17:28:10 column=count:sum, timestamp=1501061252476, value=3 2017-07-26 17:28:10__2017-07-26 17:28:20 column=count:sum, timestamp=1501061262405, value=0 2017-07-26 17:28:20__2017-07-26 17:28:30 column=count:sum, timestamp=1501061272420, value=7 4 row(s) in 0.3150 seconds
二、统计kafka的topic在10秒间隔内生产数据的TopN并将统计结果存入到hbase中
在hbase中创建相应的Top3表:
create 'KafkaTop','TopN'
java代码:
import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeMap; import kafka.serializer.StringDecoder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** * @author huiqiang * 2017-7-28 11:24 */ public class KafkaSparkTopN { private static String beginTime = null; private static String hbasetable = "KafkaTop"; //将处理结果存到hbase中的表名,在运行程序之前就得存在 private static int cishu = 0; private static int interval = 10; //这里设置每多少秒计算一次,我这里设置的间隔是10秒 private static int n = 0; private static String rowkey = null; public static int K = 3; //你想Top几就设置几 //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排 public static TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() { @Override public int compare(Integer x, Integer y) { return y.compareTo(x); } }); //连接hbase public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void insert2(String tableName,String rowKey,String family,String quailifer,String value){ try { HTable table1 = new HTable(getConfiguration(), tableName); Put put = new Put(rowKey.getBytes()); put.add(family.getBytes(), quailifer.getBytes(), value.getBytes()); table1.put(put); } catch (Exception e) { e.printStackTrace(); } } public static void insert3(String tableName,String rowKey,String family){ try { HTable table1 = new HTable(getConfiguration(), tableName); Put put = new Put(rowKey.getBytes()); for (int i = 1; i <= K; i++) { put.add(family.getBytes(), ("Top"+i).getBytes(), "null".getBytes()); } table1.put(put); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]"); // JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000)); //毫秒 JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(interval)); //秒 // 首先要创建一份kafka参数map Map<String, String> kafkaParams = new HashMap<String, String>(); // 我们这里是不需要zookeeper节点的,所以我们这里放broker.list kafkaParams.put("metadata.broker.list", "192.168.8.71:9092,192.168.8.72:9092,192.168.8.73:9092"); // 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic Set<String> topics = new HashSet<String>(); topics.add("test"); JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream( jssc, String.class, // key类型 String.class, // value类型 StringDecoder.class, // 解码器 StringDecoder.class, kafkaParams, topics); //在第一个间隔的时候其实并非一定等于10秒的,而是小于等于10秒的 SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); java.util.Date date=new java.util.Date(); System.out.println("StreamingContext started->"+time.format(new Date())); beginTime=time.format(date); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Tuple2<String,String> tuple) throws Exception { return Arrays.asList(tuple._2.split(" ")); //按空格进行分隔 } }); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordcounts.print(); wordcounts.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(JavaPairRDD<String, Integer> wordcountsRDD) throws Exception { n = 0; treeMap.clear(); SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); java.util.Date date=new java.util.Date(); System.out.println("endTime1-->"+time.format(new Date())); //yyyy-MM-dd HH:mm:ss形式 final long endTime1 = System.currentTimeMillis(); System.out.println("endTime1-->"+endTime1); //时间戳格式 final String endTime=time.format(date); cishu++; System.out.println("cishu-->"+cishu); if(cishu == 1){ rowkey = beginTime+"__"+endTime; insert3(hbasetable, rowkey, "TopN"); }else{ SimpleDateFormat hh1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date1 = hh1.parse(endTime); long hb=date1.getTime(); long a2 = hb - interval*1000; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date2 = new Date(a2); String beginTime1 = simpleDateFormat.format(date2); rowkey = beginTime1+"__"+endTime; insert3(hbasetable, rowkey, "TopN"); } //foreachPartition这个方法好像和kafka的topic的分区个数有关系,如果你topic有两个分区,则这个方法会执行两次 wordcountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception { Tuple2<String,Integer> wordcount = null; while(wordcounts.hasNext()){ n++; wordcount = wordcounts.next(); if (treeMap.containsKey(wordcount._2)){ String value = treeMap.get(wordcount._2) + "," + wordcount._1; treeMap.remove(wordcount._2); treeMap.put(wordcount._2, value); }else { treeMap.put(wordcount._2, wordcount._1); } if(treeMap.size() > K) { treeMap.remove(treeMap.lastKey()); } } } }); if(n!=0){ int y = 0; for(int num : treeMap.keySet()) { y++; //注意:这里是利用了在hbase中对同一rowkey同一列再查入数据会覆盖前一次值的特征,所以hbase中KafkaTop表的版本号必须是1,建表的时候如果你不修改版本号的话默认是1 insert2(hbasetable, rowkey, "TopN", "Top"+y, treeMap.get(num)+" "+num); } } } }); jssc.start(); jssc.awaitTermination(); jssc.close(); } }
在myeclipse中运行该代码后在kafka的生产者终端输入数据:
hello world
hello hadoop
hello hive
hello hadoop
hello world
hello world
hbase hive
在myeclipse的打印台会输出:
------------------------------------------- Time: 1501214340000 ms ------------------------------------------- (hive,2) (hello,6) (world,3) (hadoop,2) (hbase,1) endTime1-->2017-07-28 11:59:00 endTime1-->1501214340455 cishu-->1 。。。。。。省略 ------------------------------------------- Time: 1501214350000 ms ------------------------------------------- endTime1-->2017-07-28 11:59:10 endTime1-->1501214350090 cishu-->2
查看hbase表:
hbase(main):018:0> scan 'KafkaTop' ROW COLUMN+CELL 2017-07-28 11:58:55__2017-07-28 11:59:00 column=TopN:Top1, timestamp=1501101768643, value=hello 6 2017-07-28 11:58:55__2017-07-28 11:59:00 column=TopN:Top2, timestamp=1501101768661, value=world 3 2017-07-28 11:58:55__2017-07-28 11:59:00 column=TopN:Top3, timestamp=1501101768679, value=hadoop,hive 2 2017-07-28 11:59:00__2017-07-28 11:59:10 column=TopN:Top1, timestamp=1501101770921, value=null 2017-07-28 11:59:00__2017-07-28 11:59:10 column=TopN:Top2, timestamp=1501101770921, value=null 2017-07-28 11:59:00__2017-07-28 11:59:10 column=TopN:Top3, timestamp=1501101770921, value=null 2 row(s) in 0.3140 seconds
三、下面这个不是Spark Streaming的,是来自网上的一个列子,相当于离线分析TopN,仅做参考
来自:http://blog.csdn.net/accptanggang/article/details/52924970
下面是源数据hui.txt,我存放在了我的Windows电脑的桌面的spark文件夹里,取出最大的前3个数字:
2
4
1
6
8
10
34
89
java代码:
import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkTop { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("Top3").setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); //JavaRDD<String> lines = sc.textFile("hdfs://tgmaster:9000/in/nums2"); JavaRDD<String> lines = sc.textFile("C:\\Users\\huiqiang\\Desktop\\spark\\hui.txt"); //经过map映射,形成键值对的形式。 JavaPairRDD<Integer, Integer> mapToPairRDD = lines.mapToPair(new PairFunction<String, Integer, Integer>() { private static final long serialVersionUID = 1L; public Tuple2<Integer, Integer> call(String num) throws Exception { // TODO Auto-generated method stub int numObj=Integer.parseInt(num); Tuple2<Integer, Integer> tuple2 = new Tuple2<Integer, Integer>(numObj, numObj); return tuple2; } }); /** * 1、通过sortByKey()算子,根据key进行降序排列 * 2、排序完成后,通过map()算子获取排序之后的数字 */ JavaRDD<Integer> resultRDD = mapToPairRDD.sortByKey(false).map(new Function<Tuple2<Integer,Integer>, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Tuple2<Integer, Integer> v1) throws Exception { // TODO Auto-generated method stub return v1._1; } }); //通过take()算子获取排序后的前3个数字 List<Integer> nums = resultRDD.take(3); for (Integer num : nums) { System.out.println(num); } sc.close(); } }
在myeclipse中运行结果为:
89
34
10
相关文章推荐
- Spark Streaming从Kafka自定义时间间隔内实时统计行数、TopN并将结果存到hbase中
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
- Spark Streaming+kafka订单实时统计实现
- Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数
- kafka + spark streaming 实时读取计算 nginx 日志,存储结果到 mongodb/mysql
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码
- 用Spark Streaming+Kafka实现订单数和GMV的实时更新
- kafka->spark->streaming->mysql(scala)实时数据处理示例
- SparkStreaming的实时单词统计小例子
- 大数据IMF传奇行动绝密课程第120课:Spark Streaming性能优化:如何在End-to-End生产环境下安全高效地把结果数据存入HBase中
- kafka+sparkstreaming实现每批次的wordcount统计模版
- Kafka+Spark Streaming+Redis实时计算整合实践
- 使用Flume+Kafka+SparkStreaming进行实时日志分析
- Kafka+Spark Streaming+Redis实时计算整合实践
- spark streaming - kafka updateStateByKey 统计用户消费金额
- kafka-spark streaming-hbase
- sparkstreaming接受kafka数据实时存入hbse并集成rest服务
- spark streaming - kafka updateStateByKey 统计用户消费金额
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算
- flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统