自定义HbaseSink输出采集日志到Hbase
2015-03-03 16:28
253 查看
前提:
当前机器都安装用hbase,hadoop,flume,如果没安装有hbase和hadoop的可能会少一些依赖,把core-site.xml,hdfs-site.xml,hbase-site.xml配置文件拷贝到Flume安装目录的conf目录下,打jar包的时候只需要把下面的java类打进去即可,不需要别的依赖。
1、编写Serializer
5、查看hbase的数据,scan 'access_log'
当前机器都安装用hbase,hadoop,flume,如果没安装有hbase和hadoop的可能会少一些依赖,把core-site.xml,hdfs-site.xml,hbase-site.xml配置文件拷贝到Flume安装目录的conf目录下,打jar包的时候只需要把下面的java类打进去即可,不需要别的依赖。
1、编写Serializer
package com.panguoyuan.hbase.sink; import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import org.apache.commons.lang.RandomStringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.HbaseEventSerializer; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import com.google.common.base.Charsets; import com.google.common.collect.Lists; public class RegexHbaseEventSerializer implements HbaseEventSerializer { // Config vars /** Regular expression used to parse groups from event data. */ public static final String REGEX_CONFIG = "regex"; public static final String REGEX_DEFAULT = " "; /** Whether to ignore case when performing regex matches. */ public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase"; public static final boolean INGORE_CASE_DEFAULT = false; /** Comma separated list of column names to place matching groups in. */ public static final String COL_NAME_CONFIG = "colNames"; public static final String COLUMN_NAME_DEFAULT = "ip"; /** Index of the row key in matched regex groups */ public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex"; /** Placeholder in colNames for row key */ public static final String ROW_KEY_NAME = "ROW_KEY"; /** Whether to deposit event headers into corresponding column qualifiers */ public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders"; public static final boolean DEPOSIT_HEADERS_DEFAULT = false; /** What charset to use when serializing into HBase's byte arrays */ public static final String CHARSET_CONFIG = "charset"; public static final String CHARSET_DEFAULT = "UTF-8"; /* * This is a nonce used in HBase row-keys, such that the same row-key never * gets written more than once from within this JVM. */ protected static final AtomicInteger nonce = new AtomicInteger(0); protected static String randomKey = RandomStringUtils.randomAlphanumeric(10); protected byte[] cf; private byte[] payload; private List<byte[]> colNames = Lists.newArrayList(); private Map<String, String> headers; private boolean regexIgnoreCase; private boolean depositHeaders; private Pattern inputPattern; private Charset charset; private int rowKeyIndex; @Override public void configure(Context context) { String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT); regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, INGORE_CASE_DEFAULT); depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG, DEPOSIT_HEADERS_DEFAULT); inputPattern = Pattern.compile(regex, Pattern.DOTALL + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0)); charset = Charset.forName(context.getString(CHARSET_CONFIG, CHARSET_DEFAULT)); String cols = new String(context.getString("columns")); String colNameStr; if (cols != null && !"".equals(cols)) { colNameStr = cols; } else { colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT); } String[] columnNames = colNameStr.split(","); for (String s : columnNames) { colNames.add(s.getBytes(charset)); } } @Override public void configure(ComponentConfiguration conf) {} @Override public void initialize(Event event, byte[] columnFamily) { this.headers = event.getHeaders(); this.payload = event.getBody(); this.cf = columnFamily; } protected byte[] getRowKey(Calendar cal) { String str = new String(payload, charset); String tmp = str.replace("\"", ""); String[] arr = tmp.split(" "); String clientIp = arr[0]; String dataStr = arr[3].replace("[", ""); String rowKey = getDate2Str(dataStr) + "-" + clientIp + "-" + nonce.getAndIncrement(); return rowKey.getBytes(charset); } protected byte[] getRowKey() { return getRowKey(Calendar.getInstance()); } @Override public List<Row> getActions() throws FlumeException { List<Row> actions = Lists.newArrayList(); byte[] rowKey; String body = new String(payload, charset); String tmp = body.replace("\"", ""); String[] arr = tmp.split(REGEX_DEFAULT); String clientIp = arr[0]; String dataStr = arr[3].replace("[", ""); String method = arr[5]; String url = arr[10]; String os = arr[12].replace("(", ""); String browser = arr[18]; try { rowKey = getRowKey(); Put put = new Put(rowKey); put.add(cf, colNames.get(0), clientIp.getBytes(Charsets.UTF_8)); put.add(cf, colNames.get(1), url.getBytes(Charsets.UTF_8)); put.add(cf, colNames.get(2), method.getBytes(Charsets.UTF_8)); put.add("misc".getBytes(), colNames.get(3), os.getBytes(Charsets.UTF_8)); put.add("http".getBytes(), colNames.get(4), browser.getBytes(Charsets.UTF_8)); put.add(cf, colNames.get(5), dataStr.getBytes(Charsets.UTF_8)); actions.add(put); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } return actions; } @Override public List<Increment> getIncrements() { return Lists.newArrayList(); } @Override public void close() {} public static String getDate2Str(String dataStr) { SimpleDateFormat formatter = null; SimpleDateFormat format = null; Date date = null; try { formatter = new SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss", Locale.ENGLISH); date = formatter.parse(dataStr); format = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss"); } catch (Exception e) { e.printStackTrace(); } return format.format(date); } }2、编写日志汇总端,sink.type=hbase
collector.sources = AvroIn collector.sources.AvroIn.type = avro collector.sources.AvroIn.bind = ip地址 collector.sources.AvroIn.port = 33330 collector.sources.AvroIn.channels = mc1 collector.channels = mc1 collector.channels.mc1.type = memory collector.channels.mc1.capacity = 1000 collector.sinks = HbaseOut collector.sinks.HbaseOut.type = hbase #collector.sinks.HbaseOut.type = asynchbase collector.sinks.HbaseOut.channel = mc1 collector.sinks.HbaseOut.table = access_log collector.sinks.HbaseOut.columnFamily = common collector.sinks.HbaseOut.batchSize = 500 collector.sinks.HbaseOut.timeout = 60000 collector.sinks.HbaseOut.serializer = com.panguoyuan.hbase.sink.RegexHbaseEventSerializer #collector.sinks.HbaseOut.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer #collector.sinks.HbaseOut.serializer.columns = common:rowKey,common:hostname,common:remotehost,common:remoteuser,common:eventtimestamp,http:requestmethod,http:requeststatus,http:responsebytes,misc:referrer,misc:agent #collector.sinks.HbaseOut.serializer.columns = common:clientIp,common:url,common:method,common:os,common:browser,common:dataStr collector.sinks.HbaseOut.serializer.columns = clientIp,url,method,os,browser,dataStr3、编写日志采集端
source_agent.sources = apache_server source_agent.sources.apache_server.type = exec source_agent.sources.apache_server.command = tail -F /root/install/tomcat7/logs/localhost_access_log.2015-03-02.txt source_agent.sources.apache_server.channels = memoryChannel source_agent.channels = memoryChannel source_agent.channels.memoryChannel.type = memory source_agent.channels.memoryChannel.capacity = 1000 source_agent.channels.memoryChannel.transactionCapacity = 1000 source_agent.sinks = avro_sink source_agent.sinks.avro_sink.type = avro source_agent.sinks.avro_sink.hostname = IP地址 source_agent.sinks.avro_sink.port = 33330 source_agent.sinks.avro_sink.channel = memoryChannel4、执行启动flume的采集端和汇总端,测试时先把日志打印到控制台,便于观察。
bin/flume-ng agent --conf conf --conf-file conf/send.conf --name source_agent -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf --conf-file conf/accepter.conf --name collector -Dflume.root.logger=INFO,console
5、查看hbase的数据,scan 'access_log'
2015-03-02-10:24:08-10.144.32.214-8 column=common:clientIp, timestamp=1425263455412, value=10.144.32.214 2015-03-02-10:24:08-10.144.32.214-8 column=common:dataStr, timestamp=1425263455412, value=02/Mar/2015:10:24:08 2015-03-02-10:24:08-10.144.32.214-8 column=common:method, timestamp=1425263455412, value=GET 2015-03-02-10:24:08-10.144.32.214-8 column=common:url, timestamp=1425263455412, value=http://192.168.232.100:8080/gfk-rule/rule.jsp 2015-03-02-10:24:08-10.144.32.214-8 column=http:browser, timestamp=1425263455412, value=Firefox/36.0 2015-03-02-10:24:08-10.144.32.214-8 column=misc:os, timestamp=1425263455412, value=Windows 2015-03-02-10:24:08-10.144.32.214-9 column=common:clientIp, timestamp=1425263455412, value=10.144.32.214 2015-03-02-10:24:08-10.144.32.214-9 column=common:dataStr, timestamp=1425263455412, value=02/Mar/2015:10:24:08 2015-03-02-10:24:08-10.144.32.214-9 column=common:method, timestamp=1425263455412, value=GET 2015-03-02-10:24:08-10.144.32.214-9 column=common:url, timestamp=1425263455412, value=http://192.168.232.100:8080/gfk-rule/rule.jsp 2015-03-02-10:24:08-10.144.32.214-9 column=http:browser, timestamp=1425263455412, value=Firefox/36.0 2015-03-02-10:24:08-10.144.32.214-9 column=misc:os, timestamp=1425263455412, value=Windows 181 row(s) in 1.3220 seconds hbase(main):002:0>
相关文章推荐
- ----log4j----按时间和大小增加新日志文件/输出多个自定义日志文件
- log4j输出多个自定义日志文件,动态配置路径
- log4j输出多个自定义日志文件,动态配置路径[转]
- log4j输出多个自定义日志文件,动态配置路径
- log4j输出多个自定义日志文件
- log4j输出多个自定义日志文件
- 使用最新的log4cplus(1.1.1)输出多个自定义日志文件,隔离不同的 log 文件输出
- log4j自定义级别并将新级别日志信息输出到指定带日期格式的log文件
- ----log4j----按时间和大小增加新日志文件/输出多个自定义日志文件 - pulybuffer的专栏 - 博客频道 - CSDN.NET
- Java自定义日志输出文件
- log4j输出多个自定义路径的日志文件,动态配置路径
- log4j输出多个自定义日志文件,动态配置路径
- log4j输出多个自定义日志文件,动态配置路径[转]
- log4j----按时间和大小增加新日志文件/输出多个自定义日志文件
- java日志记录,并将该记录输出到自定义的窗口中(一)
- 自定义Iptables日志输出
- [转载]log4j输出多个自定义日志文件,动态配置路径
- log4j输出多个自定义日志文件,动态配置路径
- log4j输出多个自定义日志文件,动态配置路径
- log4j输出多个自定义日志文件,动态配置路径