您的位置:首页 > 其它

flume+kafka+storm整合01bak

2018-03-07 23:21 369 查看

flume+kafka+storm整合00 中使用KafkaSpout提供数据

接下来设置Bolt 对数据进行处理

1.1、第一个Bolt 原始数据进行处理, 剔除字段不足的数据

declarer.declare(new Fields("srcIp", "destIp"));


1.2、第二个Bolt即MiddleSumBolt统计每10秒内的ip计数, 这个是为了最后进行合计Bolt减小压力, 总计先进行一次小的统计

@Override
public void execute(Tuple input) {
String srcIp = input.getStringByField("srcIp");
String destIp = input.getStringByField("destIp");

collector.ack(input);
Long curTime = System.currentTimeMillis();
if(curTime-lastTime >= 1000*10) {
//emit
System.out.println("MiddleSumBolt emit ..." + new SimpleDateFormat("HH:mm:ss").format(new Date()));
for (String f : countMap.keySet()) {
Map<String, Integer> map = countMap.get(f);
for (String ip : map.keySet()) {
collector.emit(new Values(f, ip, map.get(ip)));
}
}
countMap.clear();
lastTime = curTime;
}else {
Map<String, Integer> srcIpMap = countMap.get("srcIp");
if(srcIpMap == null) {
srcIpMap = new HashMap<String, Integer>();
}
//统计srcip的量
Integer srcIpNum =  srcIpMap.get(srcIp);
if(srcIpNum == null) {
srcIpNum = 0;
}
srcIpNum++;
srcIpMap.put(srcIp, srcIpNum);
countMap.put("srcIp", srcIpMap);

//统计destIp数量
Map<String, Integer> destIpMap = countMap.get("destIp");
if(destIpMap == null) {
destIpMap = new HashMap<String, Integer>();
}
Integer destIpNum = destIpMap.get(destIp);
if(destIpNum == null) {
destIpNum = 0;
}
destIpNum++;
destIpMap.put(destIp, destIpNum);
countMap.put("destIp", destIpMap);
}
}


MiddleBolt发射出去的字段含义

src-dest 表示是源ip, 还是目的ip,
declarer.declare(new Fields("src-dest", "ip", "num"));


1.3、最后一个Bolt ,做最后的合计按照每60秒 ,进行一次合计

package com.chb.test.bolt.idc;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
* summarybolt
* @author 12285
*
*/
public class SummaryBolt extends BaseRichBolt{
OutputCollector collector;
Map<String, Map<String, Integer>> countMap = null;
/**
* 上次emit的时间
*/
private Long lastTime = System.currentTimeMillis();
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
countMap = new HashMap<String, Map<String,Integer>>();
}

@Override
public void execute(Tuple input) {
String src_dest = input.getString(0);
String ip = input.getString(1);
int num = input.getInteger(2);
Long curTime = System.currentTimeMillis();
if(curTime-lastTime >= 1000*60) {
//emit
for(String sd: countMap.keySet()){
Map<String, Integer> maps = countMap.get(sd);
List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>();
list.addAll(maps.entrySet());
Collections.sort(list, new ValueComparator());
System.out.println(sd+"-top5:"+"{");
//可能没有第5个元素
for(int i=0; i< list.size() && i < 5; i++) {
System.out.println("("+list.get(i).getKey()+","+list.get(i).getValue()+")");
}
System.out.println("}" +  new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
countMap.clear();
lastTime = curTime;
}else {
Map<String, Integer> srcIpMap = countMap.get(src_dest);
if(srcIpMap == null) {
srcIpMap = new HashMap<String, Integer>();
}

//统计srcip的量
Integer srcIpNum =  srcIpMap.get(ip);
if(srcIpNum == null) {
srcIpNum = 0;
}
srcIpNum = srcIpNum + num;
srcIpMap.put(ip, srcIpNum);
countMap.put(src_dest, srcIpMap);
}
}
}


输出结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm