Flume-ng+Hbase实现日志的收集和存储
2015-07-31 10:15
597 查看
flume ng 日志处理并存入数据库:
flume-ng里面的SimpleHbaseEventSerializer只提供了最简单的数据插入hbase功能,如果还有其他需要,就得自己写HbaseEventSerializer类,实现flume中的HbaseEventSerializer接口。一个简单的实例如下:
1.前提条件
Hadoop+HBase+Zookeeper+Flume-ng
2.解析日志程序
①AccessLog.java
②AccessLogParser.java
③AsyncHbaseLogEventSerializer.java
④UUIDGenerator.java
将上面的类导出成jar文件,放在flume-ng的lib目录下
3.通过hbase的shell建立access_log表,其中列族为cf
4.配置flume-ng
<一>数据源配置,监控日志产生,并发送给agent
在FLUME-NG的安装目录的conf下建立tomcatToHbase.conf
<二>数据入库hbase,接收收集的数据
在FLUME-NG的安装目录的conf下建立tomcatToHbase.conf
5.启动flume-ng
在master机器和node1机器上分别启动flume服务进程:
[root@master flume]$ bin/flume-ng agent
--conf conf
--conf-file conf/tomcatToHbase.conf
--name agent
-Dflume.root.logger=INFO,console
[root@node1 flume]$ bin/flume-ng agent
--conf conf
--conf-file conf/tomcatToHbase.conf
--name agent
-Dflume.root.logger=INFO,console
flume-ng里面的SimpleHbaseEventSerializer只提供了最简单的数据插入hbase功能,如果还有其他需要,就得自己写HbaseEventSerializer类,实现flume中的HbaseEventSerializer接口。一个简单的实例如下:
1.前提条件
Hadoop+HBase+Zookeeper+Flume-ng
2.解析日志程序
①AccessLog.java
package com.tcloud.flume; public class AccessLog { private String clientIp; private String clientIndentity; private String remoteUser; private String dateTime; private String request; private String httpStatusCode; private String bytesSent; private String referer; private String userAgent; public String getClientIp() { return clientIp; } public void setClientIp(String clientIp) { this.clientIp = clientIp; } public String getClientIndentity() { return clientIndentity; } public void setClientIndentity(String clientIndentity) { this.clientIndentity = clientIndentity; } public String getRemoteUser() { return remoteUser; } public void setRemoteUser(String remoteUser) { this.remoteUser = remoteUser; } public String getDateTime() { return dateTime; } public void setDateTime(String dateTime) { this.dateTime = dateTime; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getHttpStatusCode() { return httpStatusCode; } public void setHttpStatusCode(String httpStatusCode) { this.httpStatusCode = httpStatusCode; } public String getBytesSent() { return bytesSent; } public void setBytesSent(String bytesSent) { this.bytesSent = bytesSent; } public String getReferer() { return referer; } public void setReferer(String referer) { this.referer = referer; } public String getUserAgent() { return userAgent; } public void setUserAgent(String userAgent) { this.userAgent = userAgent; } } |
package com.tcloud.flume; import java.util.regex.Matcher; import java.util.regex.Pattern; public class AccessLogParser { /** * 日志格式 * 11.52.10.49 - - [17/Sep/2015:11:35:21 +0800] "GET /webapp HTTP/1.1" 302 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36" */ private static String pattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\""; private static Pattern p = Pattern.compile(pattern); public static AccessLog parse(String line){ Matcher matcher = p.matcher(line); if (matcher.matches()){ AccessLog accessLog = new AccessLog(); accessLog.setClientIp(matcher.group(1)); accessLog.setClientIndentity(matcher.group(2)); accessLog.setRemoteUser(matcher.group(3)); accessLog.setDateTime(matcher.group(4)); accessLog.setRequest(matcher.group(5)); accessLog.setHttpStatusCode(matcher.group(6)); accessLog.setBytesSent(matcher.group(7)); accessLog.setReferer(matcher.group(8)); accessLog.setUserAgent(matcher.group(9)); return accessLog; } return null; } } |
package com.tcloud.flume; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Locale; i eccc mport org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.HbaseEventSerializer; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.util.Bytes; public class AsyncHbaseLogEventSerializer implements HbaseEventSerializer { //列族 private byte[] colFam="cf".getBytes(); private Event currentEvent; @Override public void initialize(Event event, byte[] colFam) { this.currentEvent = event; this.colFam = colFam; } @Override public void configure(Context context) {} @Override public void configure(ComponentConfiguration conf) { } @Override public List<Row> getActions() { // Split the event body and get the values for the columns String eventStr = new String(currentEvent.getBody()); AccessLog cols = AccessLogParser.parse(eventStr); String req = cols.getRequest(); String reqPath = req.split(" ")[1]; int pos = reqPath.indexOf("?"); if (pos > 0) { reqPath = reqPath.substring(0,pos); } if(reqPath.length() > 1 && reqPath.trim().endsWith("/")){ reqPath = reqPath.substring(0,reqPath.length()-1); } String req_ts_str = cols.getDateTime(); Long currTime = System.currentTimeMillis(); String currTimeStr = null; if (req_ts_str != null && !req_ts_str.equals("")){ SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US); SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { currTimeStr = df2.format(df.parse(req_ts_str)); currTime = df.parse(req_ts_str).getTime(); } catch (ParseException e) { System.out.println("parse req time error,using system.current time."); } } long revTs = Long.MAX_VALUE - currTime; //行健根据自己需求设计 byte[] currentRowKey = (UUIDGenerator.getUUID()+Long.toString(revTs)+ reqPath).getBytes(); List<Row> puts = new ArrayList<Row>(); Put putReq = new Put( currentRowKey); //增加列 putReq.add( colFam, "clientip".getBytes(), Bytes.toBytes(cols.getClientIp())); putReq.add( colFam, "clientindentity".getBytes(), Bytes.toBytes(cols.getClientIndentity())); putReq.add( colFam, "remoteuser".getBytes(), Bytes.toBytes(cols.getRemoteUser())); putReq.add( colFam, "httpstatuscode".getBytes(), Bytes.toBytes(cols.getHttpStatusCode())); putReq.add( colFam, "bytessent".getBytes(), Bytes.toBytes(cols.getBytesSent())); putReq.add( colFam, "request".getBytes(), Bytes.toBytes(cols.getRequest())); putReq.add( colFam, "referer".getBytes(), Bytes.toBytes(cols.getReferer())); putReq.add( colFam, "datetime".getBytes(), Bytes.toBytes(currTimeStr)); putReq.add( colFam, "useragent".getBytes(), Bytes.toBytes(cols.getUserAgent())); puts.add(putReq); return puts; } @Override public List<Increment> getIncrements() { List<Increment> incs = new ArrayList<Increment>(); return incs; } @Override public void close() { colFam = null; currentEvent = null; } } |
package com.tcloud.flume; import java.util.UUID; public class UUIDGenerator { public UUIDGenerator() { } /** * 获得一个UUID * @return String UUID */ public static String getUUID(){ String s = UUID.randomUUID().toString(); //去掉“-”符号 return s.substring(0,8)+s.substring(9,13)+s.substring(14,18)+s.substring(19,23)+s.substring(24); } /** * 获得指定数目的UUID * @param number int 需要获得的UUID数量 * @return String[] UUID数组 */ public static String[] getUUID(int number){ if(number < 1){ return null; } String[] ss = new String[number]; for(int i=0;i<number;i++){ ss[i] = getUUID(); } return ss; } } |
3.通过hbase的shell建立access_log表,其中列族为cf
4.配置flume-ng
<一>数据源配置,监控日志产生,并发送给agent
在FLUME-NG的安装目录的conf下建立tomcatToHbase.conf
agent.sources =baksrc agent.channels=memoryChannel agent.sinks =remotesink agent.sources.baksrc.type = exec agent.sources.baksrc.command = tail -F /home/test/data/data.txt agent.sources.baksrc.checkperiodic = 1000 agent.sources.baksrc.channels=memoryChannel agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.keep-alive = 30 agent.channels.memoryChannel.capacity = 1000 agent.channels.memoryChannel.transactionCapacity = 1000 agent.sinks.remotesink.type = avro agent.sinks.remotesink.hostname =spider-agent agent.sinks.remotesink.port = 9999 agent.sinks.remotesink.channel= memoryChannel |
在FLUME-NG的安装目录的conf下建立tomcatToHbase.conf
agent.sources = avrosrc agent.channels = memoryChannel agent.sinks = fileSink agent.sources.avrosrc.type = avro agent.sources.avrosrc.bind =spider-agent agent.sources.avrosrc.port =9999 agent.sources.avrosrc.channels = memoryChannel agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.keep-alive = 30 agent.channels.memoryChannel.capacity = 1000 agent.channels.memoryChannel.transactionCapacity =1000 agent.sinks.fileSink.type = hbase agent.sinks.fileSink.channel=memoryChannel agent.sinks.fileSink.table = access_log agent.sinks.fileSink.columnFamily =cf agent.sinks.fileSink.batchSize=5 agent.sinks.fileSink.serializer =com.tcloud.flume.AsyncHbaseLogEventSerializer |
在master机器和node1机器上分别启动flume服务进程:
[root@master flume]$ bin/flume-ng agent
--conf conf
--conf-file conf/tomcatToHbase.conf
--name agent
-Dflume.root.logger=INFO,console
[root@node1 flume]$ bin/flume-ng agent
--conf conf
--conf-file conf/tomcatToHbase.conf
--name agent
-Dflume.root.logger=INFO,console
相关文章推荐
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- 基于HBase Thrift接口的一些使用问题及相关注意事项的详解
- 如何解决struts2日期类型转换
- hbase shell基础和常用命令详解
- 手把手教你配置Hbase完全分布式环境
- 实战:在Java Web 项目中使用HBase
- HBase RowKey设计的那些事
- HBase基本原理
- HBase中的基本概念
- 【原创】基于分布式存储的开源系统在实时数据库海量历史数据存储项目上的预研
- HBase0.96.x开发使用(一)--安装
- 基于外部ZooKeeper的GlusterFS作为分布式文件系统的完全分布式HBase集群安装指南
- 基于solr实现hbase的二级索引
- 非mapreduce生成Hfile,然后导入hbase当中
- HBase 系统架构
- HBase技术介绍
- 主流NoSQL数据库评测之HBase