Flume-ng生产环境实践(四)实现log格式化interceptor
2014-08-05 17:55
495 查看
续上篇,由于filesink中需要使用/data/log/%{dayStr}/log-%{hourStr}%{minStr}-这样文件格式的,为了使file-sink能使用%{dayStr}这样的标签,需要在数据传输过程中,给event的header中添加对应的键值对。在flume-ng中提供了很方便的方式:Interceptor
以下为实现的interceptor,首先使用正则表达式匹配nginx日志,如何匹配成功,则获取匹配到的数据,并且对url中的参数进行处理,最后所有日志信息都被存储在Map中。根据配置文件中需要输出的键找到对应的值,按照顺序输出为csv格式的行。
原始日志格式:
最终结果:
配置信息为:
/usr/programs/flume/conf/logformat_vv.properties文件内容为:
interceptor的代码:
至此,获取nginx日志,进行格式化清洗,传输到collector机器,按照格式化的目录和文件名进行输出全部完成。
以下为实现的interceptor,首先使用正则表达式匹配nginx日志,如何匹配成功,则获取匹配到的数据,并且对url中的参数进行处理,最后所有日志信息都被存储在Map中。根据配置文件中需要输出的键找到对应的值,按照顺序输出为csv格式的行。
原始日志格式:
112.245.239.72 - - [29/Dec/2012:15:00:00 +0800] "GET /p.gif?a=1&b=2 HTTP/1.1" 200 0 "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4 .0; 4399Box.1357; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; AskTbPTV2/5.9.1.14019; 4399Box.1357)" |
1,2 |
agent.sources = source agent.channels = channel agent.sinks = sink agent.sources.source.type = exec #agent.sources.source.command = tail -n +0 -F /data/tmp/accesspvpb_2012-11-18.log agent.sources.source.command = cat /opt/nginx/logs/vvaccess_log_pipe agent.sources.source.interceptors = logformat agent.sources.source.interceptors.logformat.type = org.apache.flume.interceptor.LogFormatInterceptor$Builder agent.sources.source.interceptors.logformat.confpath = /usr/programs/flume/conf/logformat_vv.properties agent.sources.source.interceptors.logformat.dynamicprop = true agent.sources.source.interceptors.logformat.hostname = vv111 agent.sources.source.interceptors.logformat.prop.monitor.rollInterval = 100000 # The channel can be defined as follows. agent.sources.source.channels = channel agent.sinks.sink.type = avro agent.sinks.sink.hostname = 192.168.0.100 agent.sinks.sink.port = 44444 agent.sinks.sink.channel = channel # Each channel's type is defined. agent.channels.channel.type = file agent.channels.channel.checkpointDir = /data/tmpc/checkpoint agent.channels.channel.dataDirs = /data/tmpc/data agent.channels.channel.transactionCapacity = 15000 |
keys=a,b regexp=([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})\\s-\\s-\\s\\[([^]]+)\\]\\s\"GET\\s/p.gif\\?(.+)\\s.*\"\\s[0-9]+\\s[0-9]+\\s\"(.+)\" |
[align=left]package org.apache.flume.interceptor;[/align] [align=left]import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.CONF_PATH;[/align] [align=left]import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP;[/align] [align=left]import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP_DFLT;[/align] [align=left]import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME;[/align] [align=left]import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME_DFLT;[/align] [align=left]import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL;[/align] [align=left]import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL_DFLT;[/align] [align=left]import java.io.File;[/align] [align=left]import java.io.FileInputStream;[/align] [align=left]import java.io.FileNotFoundException;[/align] [align=left]import java.io.IOException;[/align] [align=left]import java.text.ParseException;[/align] [align=left]import java.text.SimpleDateFormat;[/align] [align=left]import java.util.Date;[/align] [align=left]import java.util.HashMap;[/align] [align=left]import java.util.LinkedList;[/align] [align=left]import java.util.List;[/align] [align=left]import java.util.Map;[/align] [align=left]import java.util.Properties;[/align] [align=left]import org.apache.flume.Context;[/align] [align=left]import org.apache.flume.Event;[/align] [align=left]import org.apache.flume.event.EventBuilder;[/align] [align=left]import org.apache.oro.text.regex.MalformedPatternException;[/align] [align=left]import org.apache.oro.text.regex.MatchResult;[/align] [align=left]import org.apache.oro.text.regex.Pattern;[/align] [align=left]import org.apache.oro.text.regex.PatternCompiler;[/align] [align=left]import org.apache.oro.text.regex.PatternMatcher;[/align] [align=left]import org.apache.oro.text.regex.Perl5Compiler;[/align] [align=left]import org.apache.oro.text.regex.Perl5Matcher;[/align] [align=left]import org.slf4j.Logger;[/align] [align=left]import org.slf4j.LoggerFactory;[/align] [align=left]public class LogFormatInterceptor implements Interceptor {[/align] private static final Logger logger = LoggerFactory [align=left] . getLogger(LogFormatInterceptor.class);[/align] [align=left] private String conf_path = null;[/align] [align=left] private boolean dynamicProp = false;[/align] [align=left] private String hostname = null;[/align] private long propLastModify = 0; [align=left] private long propMonitorInterval ;[/align] [align=left] private String regexp = null;[/align] [align=left] private List<String> keys = null;[/align] [align=left] private Pattern pattern = null;[/align] [align=left] private PatternCompiler compiler = null;[/align] [align=left] private PatternMatcher matcher = null;[/align] [align=left] private SimpleDateFormat sdf = null;[/align] [align=left] private SimpleDateFormat sd = null;[/align] [align=left] private SimpleDateFormat sh = null;[/align] [align=left] private SimpleDateFormat sm = null;[/align] [align=left] private SimpleDateFormat sdfAll = null;[/align] private long eventCount = 0l; public LogFormatInterceptor(String conf_path, boolean dynamicProp, String hostname, long propMonitorInterval) { this.conf_path = conf_path; this.dynamicProp = dynamicProp; this.hostname = hostname; this.propMonitorInterval = propMonitorInterval; [align=left] }[/align] [align=left] @Override[/align] public void close () { [align=left] }[/align] [align=left] @Override[/align] public void initialize () { [align=left] try {[/align] [align=left] // 读取配置文件,初始化正在表达式和输出的key列表[/align] [align=left] File file = new File(conf_path );[/align] propLastModify = file.lastModified(); [align=left] Properties props = new Properties();[/align] [align=left] FileInputStream fis;[/align] [align=left] fis = new FileInputStream(file);[/align] [align=left] props.load(fis);[/align] regexp = props.getProperty( "regexp"); [align=left] String strKey = props.getProperty( "keys");[/align] if (strKey != null) { [align=left] String[] strkeys = strKey.split( ",");[/align] [align=left] keys = new LinkedList<String>();[/align] for (String key : strkeys) { [align=left] keys.add(key);[/align] [align=left] }[/align] [align=left] }[/align] if (keys == null) { logger.error("====================keys is null===================="); [align=left] } else {[/align] [align=left] logger.info("keys=" + keys );[/align] [align=left] }[/align] if (regexp == null) { logger.error("====================regexp is null===================="); [align=left] } else {[/align] [align=left] logger.info("regexp=" + regexp );[/align] [align=left] }[/align] [align=left] // 初始化正在表达式以及时间格式化类[/align] [align=left] compiler = new Perl5Compiler();[/align] [align=left] pattern = compiler.compile( regexp);[/align] [align=left] matcher = new Perl5Matcher();[/align] sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z" , [align=left] java.util.Locale. US);[/align] [align=left] sd = new SimpleDateFormat("yyyyMMdd" );[/align] [align=left] sh = new SimpleDateFormat("HH" );[/align] [align=left] sm = new SimpleDateFormat("mm" );[/align] [align=left] sdfAll = new SimpleDateFormat("yyyyMMddHHmmss" );[/align] } catch (MalformedPatternException e) { logger.error("Could not complile pattern!" , e); } catch (FileNotFoundException e) { logger.error("conf file is not found!" , e); } catch (IOException e) { logger.error("conf file can not be read!" , e); [align=left] }[/align] [align=left] }[/align] [align=left] @Override[/align] public Event intercept(Event event) { [align=left] ++ eventCount;[/align] [align=left] try {[/align] if (dynamicProp && eventCount > propMonitorInterval) { [align=left] File file = new File(conf_path );[/align] if (file.lastModified() > propLastModify ) { propLastModify = file.lastModified(); [align=left] Properties props = new Properties();[/align] [align=left] FileInputStream fis;[/align] [align=left] fis = new FileInputStream(file);[/align] [align=left] props.load(fis);[/align] [align=left] String strKey = props.getProperty( "keys");[/align] if (strKey != null) { [align=left] String[] strkeys = strKey.split("," );[/align] [align=left] List<String> keystmp = new LinkedList<String>();[/align] for (String key : strkeys) { [align=left] keystmp.add(key);[/align] [align=left] }[/align] if (keystmp.size() > keys .size()) { keys = keystmp; logger.info("dynamicProp status updated = " + keys); [align=left] } else {[/align] logger.error("dynamicProp status new keys size less than old,so status update fail = " [align=left] + keys);[/align] [align=left] }[/align] [align=left] } else {[/align] logger.error("dynamicProp status get keys fail ,so status update fail = " [align=left] + keys);[/align] [align=left] }[/align] [align=left] }[/align] [align=left] }[/align] [align=left] Map<String, String> headers = event.getHeaders();[/align] [align=left] headers.put( "host", hostname );[/align] [align=left] String body = new String(event.getBody());[/align] if (pattern != null) { [align=left] StringBuffer stringBuffer = new StringBuffer();[/align] [align=left] Date date = null;[/align] Map<String, String> index = new HashMap<String, String>(); if (matcher .contains(body, pattern)) { [align=left] index.put( "host", hostname );[/align] [align=left] MatchResult result = matcher.getMatch();[/align] index.put( "ip", result.group(1)); [align=left] try {[/align] [align=left] date = sdf.parse(result.group(2));[/align] [align=left] index.put( "loc_time", sdfAll.format(date));[/align] } catch (ParseException e1) { [align=left] }[/align] [align=left] String url = result.group(3).replaceAll( ",", "|");[/align] [align=left] String[] params = url.split( "&");[/align] for (String param : params) { [align=left] String[] p = param.split("=" );[/align] if (p.length == 2) { [align=left] index.put(p[0], p[1]);[/align] [align=left] }[/align] [align=left] }[/align] index.put( "browser", result.group(4).replaceAll("," , "|" )); for (String key : keys ) { if (index.containsKey(key)) { [align=left] stringBuffer.append(index.get(key) + ",");[/align] [align=left] } else {[/align] [align=left] stringBuffer.append( "~,");[/align] [align=left] }[/align] [align=left] }[/align] if (stringBuffer.length() > 0) { [align=left] stringBuffer.deleteCharAt(stringBuffer.length() - 1);[/align] [align=left] } else {[/align] stringBuffer.append( "error=" + body); [align=left] }[/align] if (date != null) { [align=left] headers.put( "dayStr", sd .format(date));[/align] [align=left] headers.put( "hourStr", sh .format(date));[/align] [align=left] Integer m = Integer.parseInt(sm.format(date));[/align] [align=left] String min = "";[/align] if (m >= 0 && m < 10) { min = "0" + (m / 5) * 5; [align=left] } else {[/align] [align=left] min = (m / 5) * 5 + "" ;[/align] [align=left] }[/align] headers.put( "minStr", min); [align=left] } else {[/align] [align=left] headers.put( "dayStr", "errorLog" );[/align] [align=left] }[/align] [align=left] Event e = EventBuilder.withBody(stringBuffer.toString()[/align] [align=left] .getBytes(), headers);[/align] [align=left] return e;[/align] [align=left] }[/align] [align=left] }[/align] } catch (Exception e) { logger.error("LogFormat error!" , e); [align=left] }[/align] [align=left] return null ;[/align] [align=left] }[/align] [align=left] @Override[/align] public List<Event> intercept(List<Event> events) { [align=left] List<Event> list = new LinkedList<Event>();[/align] for (Event event : events) { [align=left] Event e = intercept(event);[/align] if (e != null ) { [align=left] list.add(e);[/align] [align=left] }[/align] [align=left] }[/align] [align=left] return list;[/align] [align=left] }[/align] [align=left] /**[/align] [align=left] * Builder which builds new instances of the HostInterceptor.[/align] [align=left] */[/align] public static class Builder implements Interceptor.Builder { [align=left] private String confPath ;[/align] [align=left] private boolean dynamicProp ;[/align] [align=left] private String hostname ;[/align] [align=left] private long propMonitorInterval ;[/align] [align=left] @Override[/align] public Interceptor build() { [align=left] return new LogFormatInterceptor(confPath, dynamicProp, hostname,[/align] [align=left] propMonitorInterval);[/align] [align=left] }[/align] [align=left] @Override[/align] public void configure(Context context) { confPath = context.getString( CONF_PATH); dynamicProp = context.getBoolean(DYNAMICPROP, DYNAMICPROP_DFLT); hostname = context.getString( HOSTNAME, HOSTNAME_DFLT ); propMonitorInterval = context.getLong(PROPMONITORINTERVAL, [align=left] PROPMONITORINTERVAL_DFLT);[/align] [align=left] }[/align] [align=left] }[/align] public static class Constants { [align=left] public static String CONF_PATH = "confpath";[/align] [align=left] public static String DYNAMICPROP = "dynamicprop";[/align] [align=left] public static boolean DYNAMICPROP_DFLT = false;[/align] [align=left] public static String HOSTNAME = "hostname";[/align] [align=left] public static String HOSTNAME_DFLT = "hostname";[/align] [align=left] public static String PROPMONITORINTERVAL = "prop.monitor.rollInterval" ;[/align] public static long PROPMONITORINTERVAL_DFLT = 500000l; [align=left] }[/align] [align=left]}[/align] |
相关文章推荐
- Flume-ng生产环境实践(四)实现log格式化interceptor
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-ng生产环境实践(四)实现log格式化interceptor
- Flume-ng生产环境实践(二)flume-ng 测试过程中event丢失部分body数据
- Flume-ng生产环境实践(一)Flume-ng生产环境编译
- Flume-ng生产环境实践(二)flume-ng 测试过程中event丢失部分body数据
- Flume-ng生产环境实践(一)Flume-ng生产环境编译
- cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索
- cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索
- 使用mysqldump模拟生产环境实现mysql数据库的备份与还原
- Flume(ng) 自定义sink实现和属性注入
- Flume(NG)架构设计要点及配置实践
- nginx+apache实现动静态页面分离的web生产环境(二)
- 结合Git实现Mysql差异备份,可用于生产环境 推荐
- 生产环境下实践DDD中的规约模式
- require.js+backbone 使用r.js 在本地与生产环境 一键压缩的实现方式
- ngCloak 实现 Angular 初始化闪烁最佳实践
- 生产环境MySQL 5.5.x单机多实例配置实践