flume+kafka+storm整合01bak
2018-03-07 23:21
369 查看
flume+kafka+storm整合00 中使用KafkaSpout提供数据
接下来设置Bolt 对数据进行处理
1.1、第一个Bolt 原始数据进行处理, 剔除字段不足的数据
declarer.declare(new Fields("srcIp", "destIp"));
1.2、第二个Bolt即MiddleSumBolt统计每10秒内的ip计数, 这个是为了最后进行合计Bolt减小压力, 总计先进行一次小的统计
@Override public void execute(Tuple input) { String srcIp = input.getStringByField("srcIp"); String destIp = input.getStringByField("destIp"); collector.ack(input); Long curTime = System.currentTimeMillis(); if(curTime-lastTime >= 1000*10) { //emit System.out.println("MiddleSumBolt emit ..." + new SimpleDateFormat("HH:mm:ss").format(new Date())); for (String f : countMap.keySet()) { Map<String, Integer> map = countMap.get(f); for (String ip : map.keySet()) { collector.emit(new Values(f, ip, map.get(ip))); } } countMap.clear(); lastTime = curTime; }else { Map<String, Integer> srcIpMap = countMap.get("srcIp"); if(srcIpMap == null) { srcIpMap = new HashMap<String, Integer>(); } //统计srcip的量 Integer srcIpNum = srcIpMap.get(srcIp); if(srcIpNum == null) { srcIpNum = 0; } srcIpNum++; srcIpMap.put(srcIp, srcIpNum); countMap.put("srcIp", srcIpMap); //统计destIp数量 Map<String, Integer> destIpMap = countMap.get("destIp"); if(destIpMap == null) { destIpMap = new HashMap<String, Integer>(); } Integer destIpNum = destIpMap.get(destIp); if(destIpNum == null) { destIpNum = 0; } destIpNum++; destIpMap.put(destIp, destIpNum); countMap.put("destIp", destIpMap); } }
MiddleBolt发射出去的字段含义
src-dest 表示是源ip, 还是目的ip, declarer.declare(new Fields("src-dest", "ip", "num"));
1.3、最后一个Bolt ,做最后的合计按照每60秒 ,进行一次合计
package com.chb.test.bolt.idc; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * summarybolt * @author 12285 * */ public class SummaryBolt extends BaseRichBolt{ OutputCollector collector; Map<String, Map<String, Integer>> countMap = null; /** * 上次emit的时间 */ private Long lastTime = System.currentTimeMillis(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; countMap = new HashMap<String, Map<String,Integer>>(); } @Override public void execute(Tuple input) { String src_dest = input.getString(0); String ip = input.getString(1); int num = input.getInteger(2); Long curTime = System.currentTimeMillis(); if(curTime-lastTime >= 1000*60) { //emit for(String sd: countMap.keySet()){ Map<String, Integer> maps = countMap.get(sd); List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(); list.addAll(maps.entrySet()); Collections.sort(list, new ValueComparator()); System.out.println(sd+"-top5:"+"{"); //可能没有第5个元素 for(int i=0; i< list.size() && i < 5; i++) { System.out.println("("+list.get(i).getKey()+","+list.get(i).getValue()+")"); } System.out.println("}" + new SimpleDateFormat("HH:mm:ss").format(new Date())); } countMap.clear(); lastTime = curTime; }else { Map<String, Integer> srcIpMap = countMap.get(src_dest); if(srcIpMap == null) { srcIpMap = new HashMap<String, Integer>(); } //统计srcip的量 Integer srcIpNum = srcIpMap.get(ip); if(srcIpNum == null) { srcIpNum = 0; } srcIpNum = srcIpNum + num; srcIpMap.put(ip, srcIpNum); countMap.put(src_dest, srcIpMap); } } }
输出结果:
相关文章推荐
- flume+kafka+storm整合01
- flume+kafka+storm整合(二)--- flume 头文件header处理
- flume+kafka+storm+hdfs整合
- _00022 Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
- flume-ng+Kafka+Storm+HDFS+jdbc 实时系统搭建的完美整合
- Flume-ng+kafka+storm+hbase 整合实例
- Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
- flume+kafka+storm整合
- flume+kafka+storm+hdfs整合
- flume读取日志数据写入kafka 然后kafka+storm整合
- flume+kafka+storm+hdfs整合
- Flume,Kafka与Storm整合
- Flume-ng+kafka+storm+hbase 整合实例
- flume+kafka+storm整合02---问题
- flume+kafka+storm整合00
- flume+kafka+storm整合
- Flume-Kafka-Storm 整合实例
- flume+kafka+storm整合文件
- flume+kafka+storm整合(一)
- Flume+kafka+storm整合