【Flume】【源码分析】flume中拦截器的源码分析,以TimestampInterceptor为例
2015-01-21 10:34
423 查看
本文将以TimestampInterceptor为例来分析一下flume中拦截器的工作原理
首先来看下改拦截器的实现结构
该接口的方法定义如下:
下面来看TimestampInterceptor的具体实现
并且在configure方法中读取了preserveExisting属性,默认值为false
该配置的作用表明
This interceptor can preserve an existing timestamp if it is already present in the configuration.
再来看intercept方法
批量的
2、否则的话,我们生成一个时间戳,并将这个时间戳放到event的header中,作为一个属性保存,再返回给event
综上所述:
Flume中拦截器的作用就是对于event中header的部分可以按需塞入一些属性,当然你如果想要处理event的body内容,也是可以的,但是event的body内容是系统下游阶段真正处理的内容,如果让Flume来修饰body的内容的话,那就是强耦合了,这就违背了当初使用Flume来解耦的初衷了。
做法可以有,但是合不合适是另一回事了!!!!balance一直是一个世界性难题!!!
首先来看下改拦截器的实现结构
1、实现了Interceptor接口
该接口的方法定义如下:public void initialize();
public Event intercept(Event event);
public List<Event> intercept(List<Event> events);
public void close();
/** Builder implementations MUST have a no-arg constructor */ public interface Builder extends Configurable { public Interceptor build(); }
2、接口中定义了一个内部接口Builder
该接口又继承自Configurable接口【接口只能继承接口,不能实现接口】该接口的方法定义如下:
public void configure(Context context);该方法很容易理解,就是用来读取flume的配置文件内容的。
下面来看TimestampInterceptor的具体实现
public static class Builder implements Interceptor.Builder { private boolean preserveExisting = PRESERVE_DFLT; @Override public Interceptor build() { return new TimestampInterceptor(preserveExisting); } @Override public void configure(Context context) { preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT); } }该内部类实现了Interceptor的接口Builder,必须得有一个无参的构造方法,通过该构造方法就实例化了一个拦截器对象
并且在configure方法中读取了preserveExisting属性,默认值为false
该配置的作用表明
This interceptor can preserve an existing timestamp if it is already present in the configuration.
再来看intercept方法
批量的
@Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; }简单的循环调用了intercept对event逐一处理
public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); if (preserveExisting && headers.containsKey(TIMESTAMP)) { // we must preserve the existing timestamp } else { long now = System.currentTimeMillis(); headers.put(TIMESTAMP, Long.toString(now)); } return event; }
该方法即拦截器的核心内容
1、如果拿到的event的header中本身包括timestamp这个key并且预留保存属性为true,我们就直接返回该event就行了2、否则的话,我们生成一个时间戳,并将这个时间戳放到event的header中,作为一个属性保存,再返回给event
综上所述:
Flume中拦截器的作用就是对于event中header的部分可以按需塞入一些属性,当然你如果想要处理event的body内容,也是可以的,但是event的body内容是系统下游阶段真正处理的内容,如果让Flume来修饰body的内容的话,那就是强耦合了,这就违背了当初使用Flume来解耦的初衷了。
做法可以有,但是合不合适是另一回事了!!!!balance一直是一个世界性难题!!!
相关文章推荐
- Mybatis Interceptor 拦截器原理 源码分析
- Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行
- Mybatis Interceptor 拦截器原理 源码分析
- flume【源码分析】分析Flume的拦截器
- flume【源码分析】分析Flume的拦截器
- Mybatis Interceptor 拦截器原理 源码分析
- flume【源码分析】分析Flume的拦截器
- OKHttp源码分析拦截器-RetryAndFollowUpInterceptor
- 日志收集之flume-ng源码分析
- Java程序员从笨鸟到菜鸟之(三十八)细谈struts2(三)struts2拦截器源码分析
- Flume-NG启动过程源码分析(二)(原创)
- 【Flume】【源码分析】flume中FailoverSinkProcessor容错处理机制源码分析
- 【Java】【Flume】Flume-NG启动过程源码分析(三)
- 【Flume】【源码分析】flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用?
- Struts2默认拦截器(ExceptionMappingInterceptor)的使用及源码阅读
- struts2(三)struts2拦截器源码分析
- struts2拦截器源码分析
- Flume 实战(2)--Flume-ng-sdk源码分析
- 日志收集之flume-ng源码分析
- Flume-NG源码阅读之Interceptor(原创)