Flume中的拦截器(Interceptor)介绍与使用(二)
2017-03-24 17:26
411 查看
Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用,Flume-ng 1.6中目前提供了以下拦截器:
Timestamp Interceptor;
Host Interceptor;
Static Interceptor;
UUID Interceptor;
Morphline Interceptor;
Search and Replace Interceptor;
Regex Filtering Interceptor;
Regex Extractor Interceptor;
本文接上一篇《Flume中的拦截器(Interceptor)介绍与使用(一)》,继续对剩下几种拦截器进行学习和介绍,并附上使用示例。
具体配置示例如下:
## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = search_replace
agent_lxw1234.sources.sources1.interceptors.i1.searchPattern = [0-9]+
agent_lxw1234.sources.sources1.interceptors.i1.replaceString = lxw1234
agent_lxw1234.sources.sources1.interceptors.i1.charset = UTF-8
# sink 1 配置
##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel
该配置将events中的数字替换为lxw1234。
原始的events内容为:
实际的events内容为:
配置示例如下:
## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = regex_filter
agent_lxw1234.sources.sources1.interceptors.i1.regex = ^lxw1234.*
agent_lxw1234.sources.sources1.interceptors.i1.excludeEvents = false
# sink 1 配置
##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel
该配置表示过滤掉不是以lxw1234开头的events。
如果excludeEvents设为true,则表示过滤掉以lxw1234开头的events。
原始events内容为:
拦截后的events内容为:
配置示例如下:
## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = regex_extractor
agent_lxw1234.sources.sources1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*?)
agent_lxw1234.sources.sources1.interceptors.i1.serializers = s1 s2
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.type = default
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.name = cookieid
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.type = default
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.name = ip
# sink 1 配置
##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel
该配置从原始events中抽取出cookieid和ip,加入到events header中。
原始的events内容为:
events header中的内容为:
Flume的拦截器可以配合Sink完成许多业务场景需要的功能,
比如:按照时间及主机生成目标文件目录及文件名;
配合Kafka Sink完成多分区的写入等等。
Timestamp Interceptor;
Host Interceptor;
Static Interceptor;
UUID Interceptor;
Morphline Interceptor;
Search and Replace Interceptor;
Regex Filtering Interceptor;
Regex Extractor Interceptor;
本文接上一篇《Flume中的拦截器(Interceptor)介绍与使用(一)》,继续对剩下几种拦截器进行学习和介绍,并附上使用示例。
Search and Replace Interceptor
该拦截器用于将events中的正则匹配到的内容做相应的替换。具体配置示例如下:
## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = search_replace
agent_lxw1234.sources.sources1.interceptors.i1.searchPattern = [0-9]+
agent_lxw1234.sources.sources1.interceptors.i1.replaceString = lxw1234
agent_lxw1234.sources.sources1.interceptors.i1.charset = UTF-8
# sink 1 配置
##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel
该配置将events中的数字替换为lxw1234。
原始的events内容为:
实际的events内容为:
Regex Filtering Interceptor
该拦截器使用正则表达式过滤原始events中的内容。配置示例如下:
## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = regex_filter
agent_lxw1234.sources.sources1.interceptors.i1.regex = ^lxw1234.*
agent_lxw1234.sources.sources1.interceptors.i1.excludeEvents = false
# sink 1 配置
##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel
该配置表示过滤掉不是以lxw1234开头的events。
如果excludeEvents设为true,则表示过滤掉以lxw1234开头的events。
原始events内容为:
拦截后的events内容为:
Regex Extractor Interceptor
该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events header中。配置示例如下:
## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = regex_extractor
agent_lxw1234.sources.sources1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*?)
agent_lxw1234.sources.sources1.interceptors.i1.serializers = s1 s2
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.type = default
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.name = cookieid
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.type = default
agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.name = ip
# sink 1 配置
##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel
该配置从原始events中抽取出cookieid和ip,加入到events header中。
原始的events内容为:
events header中的内容为:
Flume的拦截器可以配合Sink完成许多业务场景需要的功能,
比如:按照时间及主机生成目标文件目录及文件名;
配合Kafka Sink完成多分区的写入等等。
相关文章推荐
- Flume中的拦截器(Interceptor)介绍与使用(二)
- Flume中的拦截器(Interceptor)介绍与使用(一)
- Flume中的拦截器(Interceptor)介绍与使用
- 【Flume】【源码分析】flume中拦截器的源码分析,以TimestampInterceptor为例
- flume 1.4的介绍及使用示例
- 在struts2中使用拦截器(Interceptor)控制登录和权限
- CXF拦截器(Interceptor)的使用
- 在struts2中使用拦截器(Interceptor)控制登录和权限
- struts2 MessageStoreInterceptor 拦截器的使用
- 使用Flume NG构建数据收集系统(第一部分 Flume介绍)
- 在struts2中使用拦截器(Interceptor)控制登录和权限
- Struts2默认拦截器(ExceptionMappingInterceptor)的使用及源码阅读
- 【摘】struts2教程- Interceptor(默认拦截器)介绍
- Struts 拦截器interceptor 的使用
- struts2中使用拦截器(Interceptor)控制登录和权限
- struts2中使用拦截器(Interceptor)控制登录和权限
- struts2- Interceptor(默认拦截器)介绍
- 在struts中使用拦截器(Interceptor)控制登录和权限
- struts2教程- Interceptor(默认拦截器)介绍
- Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行