apache flume-ng做agent拦截器可以输出IP
2016-12-26 12:09
363 查看
最近在做flume日志收集工作,需要将每台日机器上的日志收集,然后发送到kafka,接着有专门的日志处理程序从kafka中进行消费,其中有个问题就是在消费的时候需要知道当前的日志来自哪台机器,查阅了flume文档发现没有提供该功能,因此就自己定一个flume拦截器。基本的流程就是将日志内容从flume event中取出,然后拼接上自定义的IP,然后再将body设置到event中。
架构部署图如下
flume拦截器代码如下
AppendIPInterceptor
在上面的代码中,close和initialize很容易看懂分别是拦截器关闭需要做的事情和拦截器初始化需要做的事情,在日志中增加IP这个业务中,不需要在这个两个方法中写任何内容。
主要的业务代码还是写在intercept中,在flume系统中每一条日志都是一个event,enven中包含heard、和body,增加IP这个业务主要是将event中的body取出,然后拼接上IP,接着将拼接后的结果在设置到event中的body中。
AppendIPBuilder
上面代码是一个builder,是用来启动拦截器,在flume.conf中需要指定该类的绝对路径。
flume.conf
上面代码就是在flume.conf的配置,a1.sources.r1.interceptors.i1.type指定AppendIPBuilder的绝对路径,a1.sources.r1.interceptors.i1.serviceId配置一下机器的IP或者机器名字。
有同学可能会考虑性能问题,性能问题我确实还没有测试过,哈哈。
扫描下方Q群二维码快速加入Java学习交流群
架构部署图如下
flume拦截器代码如下
AppendIPInterceptor
package com.eju.ess; import java.util.List; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import com.google.common.base.Charsets; public class AppendIPInterceptor implements Interceptor { private String serviceId=null; public AppendIPInterceptor(String _serviceId){ serviceId=_serviceId; } public Event intercept(Event arg0) { String eventBody = new String(arg0.getBody(),Charsets.UTF_8); String fmt="%s - %s"; arg0.setBody(String.format(fmt, serviceId,eventBody).getBytes()); return arg0; } public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } public void close() { //~ null } public void initialize() { //~ null } /*public static class Builder implements Interceptor.Builder { public void configure(Context context) { } public Interceptor build() { return new AppendIPInterceptor(); } } */ }
在上面的代码中,close和initialize很容易看懂分别是拦截器关闭需要做的事情和拦截器初始化需要做的事情,在日志中增加IP这个业务中,不需要在这个两个方法中写任何内容。
主要的业务代码还是写在intercept中,在flume系统中每一条日志都是一个event,enven中包含heard、和body,增加IP这个业务主要是将event中的body取出,然后拼接上IP,接着将拼接后的结果在设置到event中的body中。
AppendIPBuilder
package com.eju.ess; import org.apache.flume.Context; import org.apache.flume.interceptor.Interceptor; public class AppendIPBuilder implements Interceptor.Builder{ private String serviceId=null; public void configure(Context context) { String configServiceId=context.getString("serviceId"); serviceId=configServiceId; } public Interceptor build() { return new AppendIPInterceptor(serviceId); } }
上面代码是一个builder,是用来启动拦截器,在flume.conf中需要指定该类的绝对路径。
flume.conf
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.eju.ess.AppendIPBuilder a1.sources.r1.interceptors.i1.serviceId = 10.99.70.51
上面代码就是在flume.conf的配置,a1.sources.r1.interceptors.i1.type指定AppendIPBuilder的绝对路径,a1.sources.r1.interceptors.i1.serviceId配置一下机器的IP或者机器名字。
有同学可能会考虑性能问题,性能问题我确实还没有测试过,哈哈。
扫描下方Q群二维码快速加入Java学习交流群
相关文章推荐
- Apache 使用localhost(127.0.01)可以访问 但是使用本机IP(局域网)不能访问
- Apache 127.0.0.1可以访问,使用本机IP(局域网)不能访问解决方法
- flume-ng 自定义拦截器,对header中的字段进行正则匹配分离出更多header
- log4j输出日志到flume-ng(个人记录)
- Apache 使用localhost(127.0.0.1)可以访问,使用本机IP(局域网)不能访问
- Apache 使用localhost(127.0.0.1)可以访问,使用本机IP(局域网)不能访问
- Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS
- Apache 使用localhost(127.0.0.1)可以访问,使用本机局域网IP(192.168.1.*)不能访问
- 用Maven编译Apache flume-ng 1.5.0源码及问题解决
- Apache使用localhost可以访问但使用本机IP(局域网)不能访问
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行
- Flume-ng agent配置说明
- flume NG 中文 Welcome to Apache Flume 第一页 醉了
- Apache Flume-ng 1.5.0正式发布
- Apache Flume – Architecture of Flume NG
- apache php localhost,127.0.0.1可以访问,ip不可以访问
- 【Apache Flume系列】Flume-ng failover 以及Load balance测试及注意事项
- 用Maven编译Apache flume-ng 1.5.0源码及问题解决
- apache flume agent安装