flume开发-自定义拦截器(Interceptor)
2017-06-06 17:21
525 查看
拦截器是简单的插件式组件,设置在source和channel之间。source接收到的时间,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。flume官方实现了很多拦截器也可以自定义拦截器。通过实现自定义的拦截器可以对日志进行ETL。
自定义拦截器只需要实现Interceptor的继承类。具体步骤如下:
1. 实现一个Interceptor的继承类。下面的example实现了继承类ETLInterceptor. package 是com.test.flume .
[java] view
plain copy
//add begin
package com.test.flume;
//add end
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
clog.sources.source_log3.interceptors.i3.type=com.thextrader.dmp.streaming.flume.BidInfoLogUrlFilter$Builder
public class ETLInterceptor implements Interceptor{
private static Logger logger = LoggerFactory.getLogger(AuctionETLInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charsets.UTF_8);
String newBody=body;
try{
//add begin
String[] splits = body.split("\\^");
splits[6]="";
splits[9]="";
splits[10]="";
splits[11]="";
splits[17]="";
splits[25]="";
if(splits.length>28){
newBody = String.join("^", Arrays.copyOfRange(splits,0,28));
}else{
newBody = String.join("^", splits);
}
//add end
event.setBody(newBody.toString().getBytes());
}catch (Exception e){
logger.warn(body,e);
event=null;
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
for (Event event : events) {
Event interceptedEvent = intercept(event);
if (interceptedEvent != null) {
intercepted.add(interceptedEvent);
}
}
return intercepted;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
//使用Builder初始化Interceptor
@Override
public Interceptor build() {
//add begin
return new ETLInterceptor();
//add end
}
@Override
public void configure(Context context) {
}
}
}
public Event intercept(Event event) 函数中写你需要的ETL等逻辑。
public static class Builder implements Interceptor.Builder 函数中new 出继承类 ETLInterceptor。
2. 将上面的java代码打成jar包。
在flume的安装目录下的plugins.d 目录下新建文件夹ETLInterceptor.文件夹这种新建三个文件夹lib,libext,native。
将jar包放入lib文件夹中。
3. 配置flume source的interceptor type为com.test.flume.ETLInterceptor.$Builder
4.启动flume ,自定义的拦截器就生效了。
自定义拦截器只需要实现Interceptor的继承类。具体步骤如下:
1. 实现一个Interceptor的继承类。下面的example实现了继承类ETLInterceptor. package 是com.test.flume .
[java] view
plain copy
//add begin
package com.test.flume;
//add end
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
clog.sources.source_log3.interceptors.i3.type=com.thextrader.dmp.streaming.flume.BidInfoLogUrlFilter$Builder
public class ETLInterceptor implements Interceptor{
private static Logger logger = LoggerFactory.getLogger(AuctionETLInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charsets.UTF_8);
String newBody=body;
try{
//add begin
String[] splits = body.split("\\^");
splits[6]="";
splits[9]="";
splits[10]="";
splits[11]="";
splits[17]="";
splits[25]="";
if(splits.length>28){
newBody = String.join("^", Arrays.copyOfRange(splits,0,28));
}else{
newBody = String.join("^", splits);
}
//add end
event.setBody(newBody.toString().getBytes());
}catch (Exception e){
logger.warn(body,e);
event=null;
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
for (Event event : events) {
Event interceptedEvent = intercept(event);
if (interceptedEvent != null) {
intercepted.add(interceptedEvent);
}
}
return intercepted;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
//使用Builder初始化Interceptor
@Override
public Interceptor build() {
//add begin
return new ETLInterceptor();
//add end
}
@Override
public void configure(Context context) {
}
}
}
public Event intercept(Event event) 函数中写你需要的ETL等逻辑。
public static class Builder implements Interceptor.Builder 函数中new 出继承类 ETLInterceptor。
2. 将上面的java代码打成jar包。
在flume的安装目录下的plugins.d 目录下新建文件夹ETLInterceptor.文件夹这种新建三个文件夹lib,libext,native。
将jar包放入lib文件夹中。
3. 配置flume source的interceptor type为com.test.flume.ETLInterceptor.$Builder
4.启动flume ,自定义的拦截器就生效了。
相关文章推荐
- flume开发-自定义拦截器(Interceptor)
- flume开发-自定义拦截器(Interceptor)
- Flume-1.6.0自定义拦截器(Interceptor)
- 自定义flume 拦截器(interceptor)
- flume自定义拦截器(Interceptor)拼接header和body信息
- flume自定义interceptor和hbase sink
- struts2自定义拦截器(interceptor)
- 【Flume】【源码分析】flume中拦截器的源码分析,以TimestampInterceptor为例
- WebServices中使用cxf开发日志拦截器以及自定义拦截器
- Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行
- flume学习(八):自定义拦截器
- flume 拦截器(interceptor)
- 自定义interception(拦截器)(一般用不上!)就是继承Interceptor接口(action运行时间例子)
- S8.1_Struts2_Interceptor 拦截器的原理 拦截器与过滤器的区别 自定义拦截器 拦截器防止表单重复提交
- flume学习(九):自定义拦截器
- flume开发--自定义Sink
- Asp.net Web Api开发(第一篇) 自定义HTTP消息拦截器
- flume自定义Interceptor
- struts2的开发(自定义拦截器)
- Struts2开发自定义拦截器