用flume的intercepter作数据格式转换和清洗
2016-07-18 16:38
309 查看
最近在工作中,用到spark streaming做数据解析和实时的数据计算,由于数据量比较大,而计算资源有限,spark的处理性能总是跟不上。观察之后发现,数据格式的解析占用了大量时间。整个数据的流程是nginx -> flume -> kafka -> spark,除了数据量较大,spark 无法及时处理之外,还存在kafka各个partition的数据分布不均衡,spark的job只有少数几个有数据,其他完全无数据输入。为改善这种情况,决定自定义flume的拦截器,由于flume的source读到的是nginx的原始日志,需要做解析后才能进行后续计算,而在spark中,此处又成了影响性能的最主要的因素,于是想到在拦截器中,将数据解析也完成。
flume自定义拦截器只需要实现Interceptor接口即可,具体方法如下:
打包并放到flume的lib目录下
在配置文件里添加
hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.xx.flume.interceptor.FormatInterceptor$Builder
这样就可以了
经测试发现,之前spark 2min为一个batch,数据解析放在后边,而且数据倾斜的情况下,要将近20min才能完成,改进之后,只要几秒就可以了
除此之外,本来是有个疑问,这样会不会影响flume的性能,对数据解析的代码做了性能测试,是可以满足现在的数据处理要求的,并且查看源码后发现,flume本身就是多线程处理的,经过一段时间的观察,也确实没有发生数据阻塞之类的问题
flume自定义拦截器只需要实现Interceptor接口即可,具体方法如下:
public class FormatInterceptor implements Interceptor { @Override public void initialize() { // TODO Auto-generated method stub } @Override public Event intercept(Event event) { //此处做自己的处理 return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> list = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event e = intercept(event); if (interceptedEvent != null) { list.add(e); } } return list; } @Override public void close() { // TODO Auto-generated method stub } public static class Builder implements Interceptor.Builder { //使用Builder初始化Interceptor @Override public Interceptor build() { return new FormatInterceptor(); } @Override public void configure(Context context) { // TODO Auto-generated method stub } } }
打包并放到flume的lib目录下
在配置文件里添加
hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.xx.flume.interceptor.FormatInterceptor$Builder
这样就可以了
经测试发现,之前spark 2min为一个batch,数据解析放在后边,而且数据倾斜的情况下,要将近20min才能完成,改进之后,只要几秒就可以了
除此之外,本来是有个疑问,这样会不会影响flume的性能,对数据解析的代码做了性能测试,是可以满足现在的数据处理要求的,并且查看源码后发现,flume本身就是多线程处理的,经过一段时间的观察,也确实没有发生数据阻塞之类的问题
相关文章推荐
- Flume环境部署和配置详解及案例大全
- Play! Akka Flume实现的完整数据收集
- log4j + flume 1.6 集成
- flume自定义Interceptor
- 使用Flume聚合Tomcat 日志
- #Note# Analyzing Twitter Data with Apache Hadoo...
- apache flume 配置存储在Linux本地服务器
- flume、kafka、storm常用命令
- 开源日志系统比较
- Flume Log4J Appender Flume收集Log4j日志
- flume1.6 install
- Spark Streaming结合Flume、Kafka最新最全日志分析
- Flume向HDFS写数据实例
- flume+log4j整合到web项目
- 详细图解 Flume介绍、安装配置-1
- flume部署
- flume实时抓取log数据,并传到kafka中
- flume NG 中文 Welcome to Apache Flume 第一页 醉了
- flume 高可用性 高可靠性 agent source
- flume介绍及扩展开发心得