您的位置:首页 > 其它

用flume的intercepter作数据格式转换和清洗

2016-07-18 16:38 309 查看
         最近在工作中,用到spark streaming做数据解析和实时的数据计算,由于数据量比较大,而计算资源有限,spark的处理性能总是跟不上。观察之后发现,数据格式的解析占用了大量时间。整个数据的流程是nginx -> flume -> kafka -> spark,除了数据量较大,spark 无法及时处理之外,还存在kafka各个partition的数据分布不均衡,spark的job只有少数几个有数据,其他完全无数据输入。为改善这种情况,决定自定义flume的拦截器,由于flume的source读到的是nginx的原始日志,需要做解析后才能进行后续计算,而在spark中,此处又成了影响性能的最主要的因素,于是想到在拦截器中,将数据解析也完成。

              flume自定义拦截器只需要实现Interceptor接口即可,具体方法如下:

public class FormatInterceptor implements Interceptor {

@Override
public void initialize() {
// TODO Auto-generated method stub

}

@Override
public Event intercept(Event event) {
//此处做自己的处理

return event;
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> list = Lists.newArrayListWithCapacity(events.size());
for (Event event : events) {
Event e = intercept(event);
if (interceptedEvent != null) {
list.add(e);
}
}

return list;
}

@Override
public void close() {
// TODO Auto-generated method stub

}

public static class Builder implements Interceptor.Builder {
//使用Builder初始化Interceptor
@Override
public Interceptor build() {
return new FormatInterceptor();
}

@Override
public void configure(Context context) {
// TODO Auto-generated method stub

}
}

}


             打包并放到flume的lib目录下

            在配置文件里添加

          hdp2.sources.s1.interceptors = i1

          hdp2.sources.s1.interceptors.i1.type = com.xx.flume.interceptor.FormatInterceptor$Builder

          这样就可以了

           经测试发现,之前spark  2min为一个batch,数据解析放在后边,而且数据倾斜的情况下,要将近20min才能完成,改进之后,只要几秒就可以了

          

           除此之外,本来是有个疑问,这样会不会影响flume的性能,对数据解析的代码做了性能测试,是可以满足现在的数据处理要求的,并且查看源码后发现,flume本身就是多线程处理的,经过一段时间的观察,也确实没有发生数据阻塞之类的问题

        

    

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  flume