您的位置:首页 > 其它

flume源码学习5-RegexExtractorInterceptor实现

2015-03-11 23:28 351 查看
RegexExtractorInterceptor作为一个Interceptor实现类可以根据一个正则表达式匹配event body来提取字符串,并使用serializers把字符串作为header的值

实例:
以如下的命令使用execsource收集日志的时候,可以根据文件的名称设置不同的header,进行不同的操作

#!/bin/sh
filename=$1
hostname=`hostname -s`
tail -F $1 | awk -v filename=$filename -v hostname=$hostname '{print filename":"hostname":"$0}'
source的配置:

xxxx.sources.kafka1.interceptors = i1
xxxx.sources.kafka1.interceptors.i1.type = regex_extractor
xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/
xxxx.sources.kafka1.interceptors.i1.serializers = s1
xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename
xxxx.sources.kafka1.selector.type = multiplexing
xxxx.sources.kafka1.selector.header = logtypename
xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel

几个参数项:
regex 正则表达式

serializers  定义匹配组(正则匹配之后的值作为header的值,比如如果
Event body为1:2:3.4foobar5,regex为(\\d):(\\d):(\\d),serializers
设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name
为three,那么one->1,two->2,three->3.4foobar5,注意可以不必匹配所有的组)

serializers.x.name 作为event的header

首先看内部类Builder:
1)configureSerializers方法用来生成配置项,主要是操作List<NameAndSerializer>,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象,RegexExtractorInterceptorSerializer默认是org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer,即对参数不做任何处理直接返回:

private List<NameAndSerializer> serializerList;
private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
....
private void configureSerializers(Context context) {
String serializerListStr = context.getString( SERIALIZERS ); //解析serializers的配置
Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr),
"Must supply at least one name and serializer" );
String[] serializerNames = serializerListStr.split( "\\s+" ); //按空格分隔
Context serializerContexts =
new Context(context.getSubProperties( SERIALIZERS + "."));
serializerList = Lists. newArrayListWithCapacity(serializerNames.length);
for(String serializerName : serializerNames) { //对每一个serializers里面的设置进行操作
Context serializerContext = new Context(
serializerContexts.getSubProperties(serializerName + "." ));
String type = serializerContext.getString( "type" , "DEFAULT" ); //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
String name = serializerContext.getString( "name" ); ////获取serializers.x.name的设置
Preconditions. checkArgument(!StringUtils. isEmpty(name),
"Supplied name cannot be empty." );
if ("DEFAULT" .equals(type)) {
serializerList .add(new NameAndSerializer(name, defaultSerializer)); //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象
} else {
serializerList .add(new NameAndSerializer(name, getCustomSerializer(
type, serializerContext))); //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象
}
}
}
这里org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口类,定义了一个抽象方法serialize,实现类包括:

org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
//直接返回,不做另外的操作(默认的类)
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
//使用指定的formatting pattern把传入的值转换为milliseconds

2)build方法用于返回一个RegexExtractorInterceptor对象

return new RegexExtractorInterceptor( regex , serializerList );
RegexExtractorInterceptor的主要方法intercept:

static final String REGEX = "regex" ;
static final String SERIALIZERS = "serializers" ;
...
public Event intercept(Event event) {
Matcher matcher = regex.matcher(
new String(event.getBody(), Charsets.UTF_8)); //对Event的body进行matcher操作
Map<String, String> headers = event.getHeaders(); // 获取Event的header键值对
if (matcher.find()) { //检测字符串中的子字符串是否可以匹配到正则
for ( int group = 0, count = matcher.groupCount(); group < count; group++) {
int groupIndex = group + 1; // 匹配的index从1开始
if (groupIndex > serializers .size()) { //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度
....
break;
}
NameAndSerializer serializer = serializers.get(group); //从serializers中获取对应的NameAndSerializer 对象
....
headers.put(serializer. headerName,
serializer. serializer.serialize(matcher.group(groupIndex))); // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理
}
}
return event;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息