您的位置:首页 > 运维架构

Spark Streaming从Kafka自定义时间间隔内实时统计行数、TopN并将结果存到hbase中

2017-11-29 13:21 387 查看
一、统计kafka的topic在10秒间隔内生产数据的行数并将统计结果存入到hbase中

先在hbase中建立相应的表:

create 'linecount','count'

开启kafka集群并建立相应的topic:

[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper h71:2181,h72:2181,h73:2181 --replication-factor 3 --partitions 2 --topic test

启动生产者:
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list h71:9092,h72:9092,h73:9092 --topic test 

java代码:

[java]
view plain
copy

import java.text.SimpleDateFormat;  
import java.util.Arrays;  
import java.util.Date;  
import java.util.HashMap;  
import java.util.HashSet;  
import java.util.Iterator;  
import java.util.Map;  
import java.util.Set;  
  
import kafka.serializer.StringDecoder;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.client.HTable;  
import org.apache.hadoop.hbase.client.Put;  
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  
import org.apache.spark.api.java.function.VoidFunction;  
import org.apache.spark.streaming.Durations;  
import org.apache.spark.streaming.api.java.JavaDStream;  
import org.apache.spark.streaming.api.java.JavaPairDStream;  
import org.apache.spark.streaming.api.java.JavaPairInputDStream;  
import org.apache.spark.streaming.api.java.JavaStreamingContext;  
import org.apache.spark.streaming.kafka.KafkaUtils;  
  
import scala.Tuple2;  
  
public class KafkaDirectWordCountPersistHBase {  
    private static String beginTime = null;  
    private static int cishu = 0;  
    private static int interval = 0;  
    private static String rowkey = null;  
    public static Configuration getConfiguration() {  
        Configuration conf = HBaseConfiguration.create();  
        conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");  
        conf.set("hbase.zookeeper.quorum", "192.168.8.71");  
        return conf;  
    }  
    public static void insert(String tableName, String rowKey, String family,  
            String quailifer, String value) {  
        try {  
            HTable table = new HTable(getConfiguration(), tableName);  
            Put put = new Put(rowKey.getBytes());  
            put.add(family.getBytes(), quailifer.getBytes(), value.getBytes()) ;  
            table.put(put);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
      
    public static void main(String[] args) {  
        SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]");  
        //这里设置每多少秒计算一次,我这里设置的间隔是10秒  
        interval = 10;  
//      JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));    //毫秒  
        JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(interval)); //秒  
        // 首先要创建一份kafka参数map  
        Map<String, String> kafkaParams = new HashMap<String, String>();  
        // 我们这里是不需要zookeeper节点的,所以我们这里放broker.list  
        kafkaParams.put("metadata.broker.list", "192.168.8.71:9092,192.168.8.72:9092,192.168.8.73:9092");  
        // 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic  
        Set<String> topics = new HashSet<String>();  
        topics.add("test");  
        JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream(  
            jssc,   
            String.class, // key类型  
            String.class, // value类型  
            StringDecoder.class, // 解码器  
            StringDecoder.class,  
            kafkaParams,   
            topics);  
        //在第一个间隔的时候其实并非一定等于10秒的,而是小于等于10秒的  
        SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
        java.util.Date date=new java.util.Date();  
        System.out.println("StreamingContext started->"+time.format(new Date()));  
        beginTime=time.format(date);  
          
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>(){  
            private static final long serialVersionUID = 1L;  
            @Override  
            public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {  
                return Arrays.asList(tuple._2.split("/n")); //按行进行分隔  
            }  
        });  
          
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){  
            private static final long serialVersionUID = 1L;  
            @Override  
            public Tuple2<String, Integer> call(String word) throws Exception {  
                return new Tuple2<String, Integer>("line", 1);  
            }  
        });  
          
        JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){  
            private static final long serialVersionUID = 1L;  
            @Override  
            public Integer call(Integer v1, Integer v2) throws Exception {  
                return v1 + v2;  
            }  
        });  
        wordcounts.print();  
        wordcounts.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() {  
            private static final long serialVersionUID = 1L;  
            @Override  
            public void call(JavaPairRDD<String, Integer> wordcountsRDD) throws Exception {  
                SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
                java.util.Date date=new java.util.Date();   
                System.out.println("endTime1-->"+time.format(new Date()));   //yyyy-MM-dd HH:mm:ss形式  
                final long endTime1 = System.currentTimeMillis();  
                System.out.println("endTime1-->"+endTime1);  //时间戳格式  
                final String endTime=time.format(date);  
                cishu++;  
                System.out.println("cishu-->"+cishu);  
                if(cishu == 1){  
                    rowkey = beginTime+"__"+endTime;  
                    insert("linecount", rowkey, "count", "sum", "0") ;  
                }else{  
                    SimpleDateFormat hh1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
                    Date date1 = hh1.parse(endTime);  
                    long hb=date1.getTime();  
                    long a2 = hb - interval*1000;  
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
                    Date date2 = new Date(a2);  
                    String beginTime1 = simpleDateFormat.format(date2);  
                    rowkey = beginTime1+"__"+endTime;  
                    insert("linecount", rowkey, "count", "sum", "0") ;  
                }  
                //foreachPartition这个方法好像和kafka的topic的分区个数有关系,如果你topic有两个分区,则这个方法会执行两次  
                wordcountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() {  
                    private static final long serialVersionUID = 1L;  
                    @Override  
                    public void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception {  
                        Tuple2<String,Integer> wordcount = null;  
//注意:这里是利用了在hbase中对同一rowkey同一列再查入数据会覆盖前一次值的特征,所以hbase中linecount表的版本号必须是1,建表的时候如果你不修改版本号的话默认是1  
                        while(wordcounts.hasNext()){  
                            wordcount = wordcounts.next();  
                            insert("linecount", rowkey, "count", "sum", wordcount._2.toString()) ;  
                        }  
                    }  
                });  
            }  
        });  
        jssc.start();  
        jssc.awaitTermination();  
        jssc.close();  
    }  
}  

在myeclipse中运行该代码后在kafka的生产者终端输入数据:

hello world

ni hao a

hello spark
注意:如果你是将我这三行复制过去的话还要再按一下回车键,否则的话你实际输入的是两行

过一段时间后再输入数据:

i

love

you

baby

,

come

on

查看linecount表:

[plain]
view plain
copy

hbase(main):187:0> scan 'linecount'  
ROW                                                          COLUMN+CELL                                                                                                                                                                       
 2017-07-26 17:27:56__2017-07-26 17:28:00                    column=count:sum, timestamp=1501061244619, value=0                                                                                                                                
 2017-07-26 17:28:00__2017-07-26 17:28:10                    column=count:sum, timestamp=1501061252476, value=3                                                                                                                                
 2017-07-26 17:28:10__2017-07-26 17:28:20                    column=count:sum, timestamp=1501061262405, value=0                                                                                                                                
 2017-07-26 17:28:20__2017-07-26 17:28:30                    column=count:sum, timestamp=1501061272420, value=7                                                                                                                                
4 row(s) in 0.3150 seconds  

二、统计kafka的topic在10秒间隔内生产数据的TopN并将统计结果存入到hbase中

在hbase中创建相应的Top3表:

create 'KafkaTop','TopN'

java代码:

[java]
view plain
copy

import java.text.SimpleDateFormat;  
import java.util.Arrays;  
import java.util.Comparator;  
import java.util.Date;  
import java.util.HashMap;  
import java.util.HashSet;  
import java.util.Iterator;  
import java.util.Map;  
import java.util.Set;  
import java.util.TreeMap;  
  
import kafka.serializer.StringDecoder;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.client.HTable;  
import org.apache.hadoop.hbase.client.Put;  
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  
import org.apache.spark.api.java.function.VoidFunction;  
import org.apache.spark.streaming.Durations;  
import org.apache.spark.streaming.api.java.JavaDStream;  
import org.apache.spark.streaming.api.java.JavaPairDStream;  
import org.apache.spark.streaming.api.java.JavaPairInputDStream;  
import org.apache.spark.streaming.api.java.JavaStreamingContext;  
import org.apache.spark.streaming.kafka.KafkaUtils;  
  
import scala.Tuple2;  
  
/** 
 * @author huiqiang 
 * 2017-7-28 11:24 
 */  
public class KafkaSparkTopN {  
    private static String beginTime = null;  
    private static String hbasetable = "KafkaTop";  //将处理结果存到hbase中的表名,在运行程序之前就得存在  
    private static int cishu = 0;  
    private static int interval = 10;   //这里设置每多少秒计算一次,我这里设置的间隔是10秒  
    private static int n = 0;  
    private static String rowkey = null;  
    public static int K = 3;    //你想Top几就设置几  
      
    //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排  
    public static TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {  
    @Override  
    public int compare(Integer x, Integer y) {  
        return y.compareTo(x);  
    }  
  });  
      
    //连接hbase  
    public static Configuration getConfiguration() {  
        Configuration conf = HBaseConfiguration.create();  
        conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");  
        conf.set("hbase.zookeeper.quorum", "192.168.8.71");  
        return conf;  
    }  
    public static void insert2(String tableName,String rowKey,String family,String quailifer,String value){  
        try {  
            HTable table1 = new HTable(getConfiguration(), tableName);  
            Put put = new Put(rowKey.getBytes());  
            put.add(family.getBytes(), quailifer.getBytes(), value.getBytes());  
            table1.put(put);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    public static void insert3(String tableName,String rowKey,String family){  
        try {  
            HTable table1 = new HTable(getConfiguration(), tableName);  
            Put put = new Put(rowKey.getBytes());  
            for (int i = 1; i <= K; i++) {  
                put.add(family.getBytes(), ("Top"+i).getBytes(), "null".getBytes());  
            }  
            table1.put(put);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
      
    public static void main(String[] args) {  
        SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]");  
//      JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));    //毫秒  
        JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(interval)); //秒  
        // 首先要创建一份kafka参数map  
        Map<String, String> kafkaParams = new HashMap<String, String>();  
        // 我们这里是不需要zookeeper节点的,所以我们这里放broker.list  
        kafkaParams.put("metadata.broker.list", "192.168.8.71:9092,192.168.8.72:9092,192.168.8.73:9092");  
        // 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic  
        Set<String> topics = new HashSet<String>();  
        topics.add("test");  
        JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream(  
            jssc,   
            String.class, // key类型  
            String.class, // value类型  
            StringDecoder.class, // 解码器  
            StringDecoder.class,  
            kafkaParams,   
            topics);  
        //在第一个间隔的时候其实并非一定等于10秒的,而是小于等于10秒的  
        SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
        java.util.Date date=new java.util.Date();  
        System.out.println("StreamingContext started->"+time.format(new Date()));  
        beginTime=time.format(date);  
          
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>(){  
            private static final long serialVersionUID = 1L;  
            @Override  
            public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {  
                return Arrays.asList(tuple._2.split(" "));  //按空格进行分隔  
            }  
        });  
          
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){  
            private static final long serialVersionUID = 1L;  
            @Override  
            public Tuple2<String, Integer> call(String word) throws Exception {  
                return new Tuple2<String, Integer>(word, 1);  
            }  
        });  
          
        JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){  
            private static final long serialVersionUID = 1L;  
            @Override  
            public Integer call(Integer v1, Integer v2) throws Exception {  
                return v1 + v2;  
            }  
        });  
        wordcounts.print();  
        wordcounts.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() {  
            private static final long serialVersionUID = 1L;  
            @Override  
            public void call(JavaPairRDD<String, Integer> wordcountsRDD) throws Exception {  
                n = 0;  
                treeMap.clear();  
                SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
                java.util.Date date=new java.util.Date();   
                System.out.println("endTime1-->"+time.format(new Date()));   //yyyy-MM-dd HH:mm:ss形式  
                final long endTime1 = System.currentTimeMillis();  
                System.out.println("endTime1-->"+endTime1);  //时间戳格式  
                final String endTime=time.format(date);  
                cishu++;  
                System.out.println("cishu-->"+cishu);  
                if(cishu == 1){  
                    rowkey = beginTime+"__"+endTime;  
                    insert3(hbasetable, rowkey, "TopN");  
                }else{  
                    SimpleDateFormat hh1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
                    Date date1 = hh1.parse(endTime);  
                    long hb=date1.getTime();  
                    long a2 = hb - interval*1000;  
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
                  Date date2 = new Date(a2);  
                  String beginTime1 = simpleDateFormat.format(date2);  
                    rowkey = beginTime1+"__"+endTime;  
                    insert3(hbasetable, rowkey, "TopN");  
                }  
                //foreachPartition这个方法好像和kafka的topic的分区个数有关系,如果你topic有两个分区,则这个方法会执行两次  
                wordcountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() {  
                    private static final long serialVersionUID = 1L;  
                    @Override  
                    public void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception {  
                        Tuple2<String,Integer> wordcount = null;  
                        while(wordcounts.hasNext()){  
                            n++;  
                            wordcount = wordcounts.next();  
                        if (treeMap.containsKey(wordcount._2)){  
                            String value = treeMap.get(wordcount._2) + "," + wordcount._1;  
                            treeMap.remove(wordcount._2);  
                            treeMap.put(wordcount._2, value);  
                        }else {  
                            treeMap.put(wordcount._2, wordcount._1);  
                        }  
                        if(treeMap.size() > K) {  
                            treeMap.remove(treeMap.lastKey());  
                        }  
                        }  
                    }  
                });  
          if(n!=0){  
            int y = 0;  
            for(int num : treeMap.keySet()) {  
                y++;  
//注意:这里是利用了在hbase中对同一rowkey同一列再查入数据会覆盖前一次值的特征,所以hbase中KafkaTop表的版本号必须是1,建表的时候如果你不修改版本号的话默认是1  
                    insert2(hbasetable, rowkey, "TopN", "Top"+y, treeMap.get(num)+" "+num);  
                }  
            }   
            }  
        });  
        jssc.start();  
        jssc.awaitTermination();  
        jssc.close();  
    }  
}  

在myeclipse中运行该代码后在kafka的生产者终端输入数据:

hello world

hello hadoop

hello hive

hello hadoop

hello world

hello world

hbase hive

在myeclipse的打印台会输出:

[plain]
view plain
copy

-------------------------------------------  
Time: 1501214340000 ms  
-------------------------------------------  
(hive,2)  
(hello,6)  
(world,3)  
(hadoop,2)  
(hbase,1)  
endTime1-->2017-07-28 11:59:00  
endTime1-->1501214340455  
cishu-->1  
。。。。。。省略  
-------------------------------------------  
Time: 1501214350000 ms  
-------------------------------------------  
endTime1-->2017-07-28 11:59:10  
endTime1-->1501214350090  
cishu-->2  

查看hbase表:

[plain]
view plain
copy

hbase(main):018:0> scan 'KafkaTop'  
ROW                                                          COLUMN+CELL                                                                                                                                                                       
 2017-07-28 11:58:55__2017-07-28 11:59:00                    column=TopN:Top1, timestamp=1501101768643, value=hello 6                                                                                                                          
 2017-07-28 11:58:55__2017-07-28 11:59:00                    column=TopN:Top2, timestamp=1501101768661, value=world 3                                                                                                                          
 2017-07-28 11:58:55__2017-07-28 11:59:00                    column=TopN:Top3, timestamp=1501101768679, value=hadoop,hive 2                                                                                                                    
 2017-07-28 11:59:00__2017-07-28 11:59:10                    column=TopN:Top1, timestamp=1501101770921, value=null                                                                                                                             
 2017-07-28 11:59:00__2017-07-28 11:59:10                    column=TopN:Top2, timestamp=1501101770921, value=null                                                                                                                             
 2017-07-28 11:59:00__2017-07-28 11:59:10                    column=TopN:Top3, timestamp=1501101770921, value=null                                                                                                                             
2 row(s) in 0.3140 seconds  

三、下面这个不是Spark Streaming的,是来自网上的一个列子,相当于离线分析TopN,仅做参考

来自:http://blog.csdn.net/accptanggang/article/details/52924970

下面是源数据hui.txt,我存放在了我的Windows电脑的桌面的spark文件夹里,取出最大的前3个数字:

2

4

1

6

8

10

34

89

java代码:

[java]
view plain
copy

import java.util.List;  
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.Function;  
import org.apache.spark.api.java.function.PairFunction;  
  
import scala.Tuple2;  
  
public class SparkTop {  
    public static void main(String[] args) {  
        SparkConf conf=new SparkConf().setAppName("Top3").setMaster("local");  
        JavaSparkContext sc=new JavaSparkContext(conf);  
  
        //JavaRDD<String> lines = sc.textFile("hdfs://tgmaster:9000/in/nums2");  
        JavaRDD<String> lines = sc.textFile("C:\\Users\\huiqiang\\Desktop\\spark\\hui.txt");  
  
        //经过map映射,形成键值对的形式。  
        JavaPairRDD<Integer, Integer> mapToPairRDD = lines.mapToPair(new PairFunction<String, Integer, Integer>() {  
            private static final long serialVersionUID = 1L;  
            public Tuple2<Integer, Integer> call(String num) throws Exception {  
                // TODO Auto-generated method stub  
                int numObj=Integer.parseInt(num);  
                Tuple2<Integer, Integer> tuple2 = new Tuple2<Integer, Integer>(numObj, numObj);  
                return tuple2;  
            }  
        });  
        /** 
         * 1、通过sortByKey()算子,根据key进行降序排列 
         * 2、排序完成后,通过map()算子获取排序之后的数字 
         */  
        JavaRDD<Integer> resultRDD = mapToPairRDD.sortByKey(false).map(new Function<Tuple2<Integer,Integer>, Integer>() {  
            private static final long serialVersionUID = 1L;  
  
            public Integer call(Tuple2<Integer, Integer> v1) throws Exception {  
                // TODO Auto-generated method stub  
                return v1._1;  
            }  
        });  
        //通过take()算子获取排序后的前3个数字  
        List<Integer> nums = resultRDD.take(3);  
        for (Integer num : nums) {  
            System.out.println(num);  
        }  
        sc.close();  
    }  
}  

在myeclipse中运行结果为:

89

34

10
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: