flume自定义分流日志interceptor、source、sink,采用ganglia进行flume日志数据流监控
2020-06-29 04:57
1046 查看
1.Interceptor分流日志
1.1idea开发interceptor分流日志
1.2 创建自定义interceptor.jar
package com.cevent.interceptor;/** * Created by Cevent on 2020/6/13. */ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; import java.util.Map; /** * @author cevent * @description 拦截器:文件分流 * @date 2020/6/13 21:40 */ public class FlumeInterceptor implements Interceptor{ //1.初始化方法 public void initialize() { } //2.单一事件:添加自定义header所需<key,value>对 public Event intercept(Event event) { //2.1获取header和body Map<String,String> headers=event.getHeaders(); byte[] body=event.getBody(); //2.2根据body内容,添加自定义KEY,VALUE进入header,字符码判断是否是数字<='9',0-9之间 if(body[0]<='9' && body[0]>='0'){ headers.put("type","number"); }else{ headers.put("type","not_number"); } return event; } //3.事件集合List public List<Event> intercept(List<Event> eventlist) { //遍历集合 for(Event event:eventlist){ intercept(event); } return eventlist; } //4.初始化开启资源,最后关闭 public void close() { } //5.创建内部类,实现builder public static class InterceptorBuilderMe implements Interceptor.Builder { public Interceptor build() { return new FlumeInterceptor(); } public void configure(Context context) { } } }
1.3导入jar
2.自定义Source
2.1source源码
链接:http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#thrift-legacy-source
package com.cevent.source;/** * Created by Cevent on 2020/6/14. */ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; /** * @author cevent * @description * @date 2020/6/14 10:32 */ public class CeventSource extends AbstractSource implements Configurable,PollableSource{ //使用configure配置自定义source元 private String prefix; private long interval; //1.source:获取上游数据,包装成event,发送给channel public Status process() throws EventDeliveryException { //1.1模拟上游:每隔一段时间向channel发送5条虚拟数据 //投递状态 Status status=null; //获取channel ChannelProcessor channelProcessor=this.getChannelProcessor(); //1.2数据处理的方法 try { for(int i=1;i<=5;i++){ //1.3将数据包装为event Event event=new SimpleEvent(); event.setHeaders(new HashMap<String, String>()); //event.setBody(("LOG"+i).getBytes()); event.setBody((prefix+i).getBytes()); //1.4获取channel进程 channelProcessor.processEvent(event); //Thread.sleep(1000); Thread.sleep(interval); } //1.5输出处理成功 status=Status.READY; } catch (InterruptedException e){ //1.6数据处理失败 status=Status.BACKOFF; } return status; } //2.source投送到channel,channel反馈已满载(投递失败),需要等待挂起,设置2s public long getBackOffSleepIncrement() { return 2000; } //3.投递失败,最大等待时间 public long getMaxBackOffSleepInterval() { return 10000; } //4.根据配置文件,配置自定义source public void configure(Context context) { //4.1自定义source前缀,不用process的"LOG",如果没有获取到key,则使用默认value"LOG" prefix=context.getString("defined_prefix","LOG"); interval=context.getLong("defined_interval",1000L); } }
2.2配置、启动source.conf
[cevent@hadoop207 apache-flume-1.7.0]$ ll 总用量 176 drwxr-xr-x. 2 cevent cevent 4096 6月 11 13:35 bin -rw-r--r--. 1 cevent cevent 77387 10月 11 2016 CHANGELOG drwxr-xr-x. 2 cevent cevent 4096 6月 12 12:11 conf -rw-r--r--. 1 cevent cevent 6172 9月 26 2016 DEVNOTES -rw-r--r--. 1 cevent cevent 2873 9月 26 2016 doap_Flume.rdf drwxr-xr-x. 10 cevent cevent 4096 10月 13 2016 docs drwxrwxr-x. 2 cevent cevent 4096 6月 12 16:43 files drwxrwxr-x. 6 cevent cevent 4096 6月 13 22:21 job drwxrwxr-x. 2 cevent cevent 4096 6月 14 13:26 lib -rw-r--r--. 1 cevent cevent 27625 10月 13 2016 LICENSE drwxrwxr-x. 2 cevent cevent 4096 6月 12 11:48 loggers drwxrwxr-x. 2 cevent cevent 4096 6月 11 17:05 logs -rw-r--r--. 1 cevent cevent 249 9月 26 2016 NOTICE -rw-r--r--. 1 cevent cevent 2520 9月 26 2016 README.md -rw-r--r--. 1 cevent cevent 1585 10月 11 2016 RELEASE-NOTES -rw-rw-r--. 1 cevent cevent 177 6月 12 16:57 tail_dir.json drwxrwxr-x. 2 cevent cevent 4096 6月 11 13:35 tools -rw-rw-r--. 1 cevent cevent 16 6月 12 16:45 tutu.txt drwxrwxr-x. 3 cevent cevent 4096 6月 12 14:23 upload [cevent@hadoop207 apache-flume-1.7.0]$ cd job/ [cevent@hadoop207 job]$ ll 总用量 32 -rw-rw-r--. 1 cevent cevent 1542 6月 12 14:22 flume-dir-hdfs.conf -rw-rw-r--. 1 cevent cevent 1641 6月 12 13:36 flume-file-hdfs.conf -rw-rw-r--. 1 cevent cevent 495 6月 11 17:02 flume-netcat-logger.conf -rw-rw-r--. 1 cevent cevent 1522 6月 12 16:40 flume-taildir-hdfs.conf drwxrwxr-x. 2 cevent cevent 4096 6月 13 13:28 group1 drwxrwxr-x. 2 cevent cevent 4096 6月 13 14:56 group2 drwxrwxr-x. 2 cevent cevent 4096 6月 13 20:59 group3 drwxrwxr-x. 2 cevent cevent 4096 6月 13 22:38 interceptor [cevent@hadoop207 job]$ mkdir source [cevent@hadoop207 job]$ vim source/flume-source-1.conf ### 自定义源数据source # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.cevent.source.CeventSource ##jar自定义属性 a1.sources.r1.defined_interval = 1000 a1.sources.r1.defined_prefix = ceventLOG # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~ "source/flume-source-1.conf" [新] 23L, 589C 已写入 [cevent@hadoop207 job]$ cd ../.. [cevent@hadoop207 module]$ cd apache-flume-1.7.0/ [cevent@hadoop207 apache-flume-1.7.0]$ ll 总用量 176 drwxr-xr-x. 2 cevent cevent 4096 6月 11 13:35 bin -rw-r--r--. 1 cevent cevent 77387 10月 11 2016 CHANGELOG drwxr-xr-x. 2 cevent cevent 4096 6月 12 12:11 conf -rw-r--r--. 1 cevent cevent 6172 9月 26 2016 DEVNOTES -rw-r--r--. 1 cevent cevent 2873 9月 26 2016 doap_Flume.rdf drwxr-xr-x. 10 cevent cevent 4096 10月 13 2016 docs drwxrwxr-x. 2 cevent cevent 4096 6月 12 16:43 files drwxrwxr-x. 7 cevent cevent 4096 6月 14 13:30 job drwxrwxr-x. 2 cevent cevent 4096 6月 14 13:26 lib -rw-r--r--. 1 cevent cevent 27625 10月 13 2016 LICENSE drwxrwxr-x. 2 cevent cevent 4096 6月 12 11:48 loggers drwxrwxr-x. 2 cevent cevent 4096 6月 11 17:05 logs -rw-r--r--. 1 cevent cevent 249 9月 26 2016 NOTICE -rw-r--r--. 1 cevent cevent 2520 9月 26 2016 README.md -rw-r--r--. 1 cevent cevent 1585 10月 11 2016 RELEASE-NOTES -rw-rw-r--. 1 cevent cevent 177 6月 12 16:57 tail_dir.json drwxrwxr-x. 2 cevent cevent 4096 6月 11 13:35 tools -rw-rw-r--. 1 cevent cevent 16 6月 12 16:45 tutu.txt drwxrwxr-x. 3 cevent cevent 4096 6月 12 14:23 upload [cevent@hadoop207 apache-flume-1.7.0]$ 启动并控制台打印数据 bin/flume-ng agent -n a1 -c conf/ -f job/source/flume-source-1.conf -Dflume.root.logger=INFO,console Info: Sourcing environment configuration script /opt/module/apache-flume-1.7.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/opt/module/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Including Hive libraries found via (/opt/module/hive-1.2.1) for Hive access + exec /opt/module/jdk1.7.0_79/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/module/apache-flume-1.7.0/conf:/opt/module/apache-flume-1.7.0/lib/*:/opt/module/hadoop-2.7.2/etc/hadoop:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/common/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.2/contrib/capacity-scheduler/*.jar:/opt/module/hive-1.2.1/lib/*' -Djava.library.path=:/opt/module/hadoop-2.7.2/lib/native org.apache.flume.node.Application -n a1 -f job/source/flume-source-1.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2020-06-14 13:32:12,811 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting 2020-06-14 13:32:12,816 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:job/source/flume-source-1.conf 2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: a1 2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1 2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1 2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a1] 2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels 2020-06-14 13:32:12,856 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory 2020-06-14 13:32:12,860 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1 2020-06-14 13:32:12,861 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type com.cevent.source.CeventSource 2020-06-14 13:32:12,870 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger 2020-06-14 13:32:12,873 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [r1, k1] 2020-06-14 13:32:12,881 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: { source:com.cevent.source.CeventSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@584c4b3e counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 2020-06-14 13:32:12,896 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1 2020-06-14 13:32:12,934 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2020-06-14 13:32:12,935 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2020-06-14 13:32:12,936 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1 2020-06-14 13:32:12,937 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1 2020-06-14 13:32:12,948 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 31 ceventLOG1 } 2020-06-14 13:32:13,947 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 32 ceventLOG2 } 2020-06-14 13:32:14,947 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 33 ceventLOG3 }
2.3source配置
### 自定义源数据source # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.cevent.source.CeventSource ##jar自定义属性 a1.sources.r1.defined_interval = 1000 a1.sources.r1.defined_prefix = ceventLOG # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2.4自定义源文件配置
(1)自定义源文件配置 package com.cevent.source;/** * Created by Cevent on 2020/6/14. */ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.io.*; import java.util.HashMap; /** * @author cevent * @description * @date 2020/6/14 10:32 */ public class CeventSourceFileData extends AbstractSource implements Configurable,PollableSource{ //使用configure配置自定义source元 private String file; private long interval; //1.source:获取上游数据,包装成event,发送给channel public Status process() throws EventDeliveryException { //1.1模拟上游:每隔一段时间向channel发送5条虚拟数据 //投递状态 Status status=null; //获取channel ChannelProcessor channelProcessor=this.getChannelProcessor(); BufferedReader bufferedReader=null; //1.2数据处理的方法 try { bufferedReader= new BufferedReader(new InputStreamReader(new FileInputStream(file))); String strBuffered; while ((strBuffered=bufferedReader.readLine())!=null){ Event event=new SimpleEvent(); event.setBody(strBuffered.getBytes()); event.setHeaders(new HashMap<String, String>()); //打包event channelProcessor.processEvent(event); Thread.sleep(interval); } //1.5输出处理成功 status=Status.READY; } catch (IOException e){ //1.6数据处理失败 status=Status.BACKOFF; e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(bufferedReader!=null){ try { bufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } } } return status; } //2.source投送到channel,channel反馈已满载(投递失败),需要等待挂起,设置2s public long getBackOffSleepIncrement() { return 2000; } //3.投递失败,最大等待时间 public long getMaxBackOffSleepInterval() { return 10000; } //4.根据配置文件,配置自定义source public void configure(Context context) { //4.1自定义source前缀,不用process的"LOG",如果没有获取到key,则使用默认value"LOG" interval=context.getLong("defined_interval",1000L); file=context.getString("defined_file"); } }
2.5配置source.conf
### 自定义源数据文件source-file # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.cevent.source.CeventSourceFileData ##jar自定义属性 a1.sources.r1.defined_interval = 1000 a1.sources.r1.defined_file = /opt/module/datas/dept.txt # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2.6启动source-file
[cevent@hadoop207 module]$ cd apache-flume-1.7.0/ [cevent@hadoop207 apache-flume-1.7.0]$ ll 总用量 176 drwxr-xr-x. 2 cevent cevent 4096 6月 11 13:35 bin -rw-r--r--. 1 cevent cevent 77387 10月 11 2016 CHANGELOG drwxr-xr-x. 2 cevent cevent 4096 6月 12 12:11 conf -rw-r--r--. 1 cevent cevent 6172 9月 26 2016 DEVNOTES -rw-r--r--. 1 cevent cevent 2873 9月 26 2016 doap_Flume.rdf drwxr-xr-x. 10 cevent cevent 4096 10月 13 2016 docs drwxrwxr-x. 2 cevent cevent 4096 6月 12 16:43 files drwxrwxr-x. 7 cevent cevent 4096 6月 14 13:30 job drwxrwxr-x. 2 cevent cevent 4096 6月 14 13:26 lib -rw-r--r--. 1 cevent cevent 27625 10月 13 2016 LICENSE drwxrwxr-x. 2 cevent cevent 4096 6月 12 11:48 loggers drwxrwxr-x. 2 cevent cevent 4096 6月 11 17:05 logs -rw-r--r--. 1 cevent cevent 249 9月 26 2016 NOTICE -rw-r--r--. 1 cevent cevent 2520 9月 26 2016 README.md -rw-r--r--. 1 cevent cevent 1585 10月 11 2016 RELEASE-NOTES -rw-rw-r--. 1 cevent cevent 177 6月 12 16:57 tail_dir.json drwxrwxr-x. 2 cevent cevent 4096 6月 11 13:35 tools -rw-rw-r--. 1 cevent cevent 16 6月 12 16:45 tutu.txt drwxrwxr-x. 3 cevent cevent 4096 6月 12 14:23 upload [cevent@hadoop207 apache-flume-1.7.0]$ 启动conf bin/flume-ng agent -n a1 -c conf/ -f job/source/flume-source-1.conf -Dflume.root.logger=INFO,console Info: Sourcing environment configuration script /opt/module/apache-flume-1.7.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/opt/module/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Including Hive libraries found via (/opt/module/hive-1.2.1) for Hive access + exec /opt/module/jdk1.7.0_79/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/module/apache-flume-1.7.0/conf:/opt/module/apache-flume-1.7.0/lib/*:/opt/module/hadoop-2.7.2/etc/hadoop:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/common/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.2/contrib/capacity-scheduler/*.jar:/opt/module/hive-1.2.1/lib/*' -Djava.library.path=:/opt/module/hadoop-2.7.2/lib/native org.apache.flume.node.Application -n a1 -f job/source/flume-source-1.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2020-06-14 13:32:12,811 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting 2020-06-14 13:32:12,816 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:job/source/flume-source-1.conf 2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: a1 2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1 2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1 2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a1] 2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels 2020-06-14 13:32:12,856 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory 2020-06-14 13:32:12,860 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1 2020-06-14 13:32:12,861 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type com.cevent.source.CeventSource 2020-06-14 13:32:12,870 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger 2020-06-14 13:32:12,873 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [r1, k1] 2020-06-14 13:32:12,881 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: { source:com.cevent.source.CeventSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@584c4b3e counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 2020-06-14 13:32:12,896 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1 2020-06-14 13:32:12,934 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2020-06-14 13:32:12,935 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2020-06-14 13:32:12,936 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1 2020-06-14 13:32:12,937 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1 2020-06-14 13:32:12,948 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 31 ceventLOG1 } 2020-06-14 13:32:13,947 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 32 ceventLOG2 } 2020-06-14 13:32:14,947 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 33 ceventLOG3 } 2020-06-14 13:32:15,947 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 34 ceventLOG4 } 2020-06-14 13:32:16,948 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 35 ceventLOG5 }
3.自定义Sink
3.1jar包配置
1. jar包配置 package com.cevent.sink;/** * Created by Cevent on 2020/6/14. */ import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; /** * @author cevent * @description sink事务控制channel的传输 * @date 2020/6/14 14:17 */ public class CeventSink extends AbstractSink implements Configurable { private String prefix; private String suffix; /** * 轮循数据方法,执行channel数据传输 * @return * @throws EventDeliveryException */ public Status process() throws EventDeliveryException { Status status=null; //1.获取channel Channel channel=this.getChannel(); //2.开启事务 Transaction transaction=channel.getTransaction(); transaction.begin(); try { //3.处理数据 Event event; //如果获取的数据=null,则等待 while((event=channel.take())==null){ Thread.sleep(1000); } //获取event成功 byte [] body=event.getBody(); String line=new String(body,"UTF-8"); System.out.println(prefix+line+suffix); status=Status.READY; //成功提交事务 transaction.commit(); }catch (Exception e){ //失败回滚 transaction.rollback(); status=Status.BACKOFF; }finally { //关闭事务 transaction.close(); } return status; } /** * 配置 * @param context */ public void configure 4000 (Context context) { prefix=context.getString("prefix","cevent_prefix"); suffix=context.getString("suffix","cevent_suffix"); } }
3.2Sink-conf配置
[cevent@hadoop207 job]$ mkdir sinks [cevent@hadoop207 job]$ ll 总用量 40 -rw-rw-r--. 1 cevent cevent 1542 6月 12 14:22 flume-dir-hdfs.conf -rw-rw-r--. 1 cevent cevent 1641 6月 12 13:36 flume-file-hdfs.conf -rw-rw-r--. 1 cevent cevent 495 6月 11 17:02 flume-netcat-logger.conf -rw-rw-r--. 1 cevent cevent 1522 6月 12 16:40 flume-taildir-hdfs.conf drwxrwxr-x. 2 cevent cevent 4096 6月 13 13:28 group1 drwxrwxr-x. 2 cevent cevent 4096 6月 13 14:56 group2 drwxrwxr-x. 2 cevent cevent 4096 6月 13 20:59 group3 drwxrwxr-x. 2 cevent cevent 4096 6月 13 22:38 interceptor drwxrwxr-x. 2 cevent cevent 4096 6月 14 16:51 sinks drwxrwxr-x. 2 cevent cevent 4096 6月 14 14:07 source [cevent@hadoop207 job]$ vim sinks/flume-sinks-1.conf ### 自定义sinks控制channel输出 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.cevent.sink.CeventSink #a1.sinks.k1.prefix = cevent619: a1.sinks.k1.suffix = :cevent619 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~
3.3 启动nc
[cevent@hadoop207 apache-flume-1.7.0]$ nc localhost 44444 cev123 OK rce.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] cevent_prefixcev123:cevent619
3.4onf配置解析
### 自定义sinks控制channel输出 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.cevent.sink.CeventSink #a1.sinks.k1.prefix = cevent619: a1.sinks.k1.suffix = :cevent619 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4.flume数据流监控ganglia
4.1安装yum-gangliam依赖
### ganglia神经网络==>数据监控 #1.安装httpd服务与php sudo yum -y install httpd php #2.安装 rrd tool 和 apr devel依赖 sudo yum -y install rrdtool perl-rrdtool-rrdtool-devel sudo yum -y install apr-devel #3.安装ganglia,及3组件(gmetad、web、gmond) sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm sudo yum -y install ganglia-gmetad sudo yum -y install ganglia-web sudo yum -y install ganglia-gmond
4.2配置ganglia前端
[cevent@hadoop207 apache-flume-1.7.0]$ sudo vim /etc/httpd/conf.d/ganglia.conf # # Ganglia monitoring system php web frontend # Alias /ganglia /usr/share/ganglia <Location /ganglia> Order deny,allow Deny from all Allow from 127.0.0.1 Allow from ::1 # Allow from .example.com </Location>
解析
[cevent@hadoop207 apache-flume-1.7.0]$ sudo vim /etc/httpd/conf.d/ganglia.conf # # Ganglia monitoring system php web frontend # Alias /ganglia /usr/share/ganglia <Location /ganglia> Order deny,allow Deny from all Allow from 127.0.0.1 Allow from ::1 # Allow from .example.com </Location> <Location /ganglia> Order deny,allow ## 拒绝所有连接 Deny from all ## 允许指定IP连接 Allow from 127.0.0.1 ## 允许所有IP连接 Allow from all ## 允许主机连接 Allow from 192.168.1.1 Allow from ::1 # Allow from .example.com </Location>
优化配置
[cevent@hadoop207 apache-flume-1.7.0]$ sudo vim /etc/httpd/conf.d/ganglia.conf Allow from 127 20000 .0.0.1 # # Ganglia monitoring system php web frontend # Alias /ganglia /usr/share/ganglia <Location /ganglia> Order deny,allow Deny from all Allow from 127.0.0.1 Allow from 192.168.1.1 Allow from ::1 # Allow from .example.com </Location> ~ ~
4.3配置数据库gmetad.conf
[cevent@hadoop207 apache-flume-1.7.0]$ sudo vim /etc/ganglia/gmetad.conf [sudo] password for cevent: # This is an example of a Ganglia Meta Daemon configuration file # http://ganglia.sourceforge.net/ # # #------------------------------------------------------------------------------- # Setting the debug_level to 1 will keep daemon in the forground and # show only error messages. Setting this value higher than 1 will make # gmetad output debugging information and stay in the foreground. # default: 0 # debug_level 10 # #------------------------------------------------------------------------------- # What to monitor. The most important section of this file. # # The data_source tag specifies either a cluster or a grid to # monitor. If we detect the source is a cluster, we will maintain a complete # set of RRD databases for it, which can be used to create historical # graphs of the metrics. If the source is a grid (it comes from another gmetad), # we will only maintain summary RRDs for it. # # Format: # data_source "my cluster" [polling interval] address1:port addreses2:port ... # # The keyword 'data_source' must immediately be followed by a unique # string which identifies the source, then an optional polling interval in # seconds. The source will be polled at this interval on average. # If the polling interval is omitted, 15sec is asssumed. # # If you choose to set the polling interval to something other than the default, # note that the web frontend determines a host as down if its TN value is less # than 4 * TMAX (20sec by default). Therefore, if you set the polling interval # to something around or greater than 80sec, this will cause the frontend to # incorrectly display hosts as down even though they are not. # # A list of machines which service the data source follows, in the # format ip:port, or name:port. If a port is not specified then 8649 # (the default gmond port) is assumed. # default: There is no default value # # data_source "my cluster" 10 localhost my.machine.edu:8649 1.2.3.5:8655 # data_source "my grid" 50 1.3.4.7:8655 grid.org:8651 grid-backup.org:8651 # data_source "another source" 1.3.4.7:8655 1.3.4.8 data_source "cevent ganglia cluster" hadoop.cevent.com # # Round-Robin Archives # You can specify custom Round-Robin archives here (defaults are listed below) # # Old Default RRA: Keep 1 hour of metrics at 15 second resolution. 1 day at 6 minute # RRAs "RRA:AVERAGE:0.5:1:244" "RRA:AVERAGE:0.5:24:244" "RRA:AVERAGE:0.5:168:244" "RRA:AVERAGE:0.5:672:244" \ # "RRA:AVERAGE:0.5:5760:374" # New Default RRA # Keep 5856 data points at 15 second resolution assuming 15 second (default) polling. That's 1 day # Two weeks of data points at 1 minute resolution (average) #RRAs "RRA:AVERAGE:0.5:1:5856" "RRA:AVERAGE:0.5:4:20160" "RRA:AVERAGE:0.5:40:52704" # #------------------------------------------------------------------------------- # Scalability mode. If on, we summarize over downstream grids, and respect # authority tags. If off, we take on 2.5.0-era behavior: we do not wrap our output # in <GRID></GRID> tags, we ignore all <GRID> tags we see, and always assume # we are the "authority" on data source feeds. This approach does not scale to # large groups of clusters, but is provided for backwards compatibility.
4.4配置监控后端gmond.conf
[cevent@hadoop207 apache-flume-1.7.0]$ sudo vim /etc/ganglia/gmond.conf /* This configuration is as close to 2.5.x default behavior as possible The values closely match ./gmond/metric.h definitions in 2.5.x */ globals { daemonize = yes setuid = yes user = ganglia debug_level = 0 max_udp_msg_len = 1472 mute = no deaf = no allow_extra_data = yes host_dmax = 86400 /*secs. Expires (removes from web interface) hosts in 1 day */ host_tmax = 20 /*secs */ cleanup_threshold = 300 /*secs */ gexec = no # By default gmond will use reverse DNS resolution when displaying your hostname # Uncommeting following value will override that value. # override_hostname = "mywebserver.domain.com" # If you are not using multicast this value should be set to something other than 0. # Otherwise if you restart aggregator gmond you will get empty graphs. 60 seconds is reasonable send_metadata_interval = 0 /*secs */ } /* * The cluster attributes specified will be used as part of the <CLUSTER> * tag that will wrap all hosts collected by this instance. */ cluster { name = "cevent ganglia cluster" owner = "unspecified" latlong = "unspecified" url = "unspecified" } /* The host section describes attributes of the host, like the location */ host { location = "unspecified" } /* Feel free to specify as many udp_send_channels as you like. Gmond used to only support having a single channel */ udp_send_channel { #bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address # that resolves to the machine's hostname. Without # this, the metrics may appear to come from any # interface and the DNS names associated with # those IPs will be used to create the RRDs. # mcast_join = 239.2.11.71 host=hadoop207.cevent.com port = 8649 ttl = 1 } /* You can specify as many udp_recv_channels as you like as well. */ udp_recv_channel { # mcast_join = 239.2.11.71 port = 8649 bind = 0.0.0.0 retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really # should bump it up to e.g. 10MB or even higher. # buffer = 10485760 } /* You can specify as many tcp_accept_channels as you like to share an xml description of the state of the cluster */ tcp_accept_channel { port = 8649 # If you want to gzip XML output gzip_output = no } /* Channel to receive sFlow datagrams */ #udp_recv_channel { # port = 6343 #} /* Optional sFlow settings */ #sflow { # udp_port = 6343 # accept_vm_metrics = yes # accept_jvm_metrics = yes # multiple_jvm_instances = no # accept_http_metrics = yes # multiple_http_instances = no # accept_memcache_metrics = yes # multiple_memcache_instances = no #} /* Each metrics module that is referenced by gmond must be specified and loaded. If the module has been statically linked with gmond, it does not require a load path. However all dynamically loadable modules must include a load path. */ modules { module { name = "core_metrics" } module { name = "cpu_module" path = "modcpu.so" } module { /* This configuration is as close to 2.5.x default behavior as possible The values closely match ./gmond/metric.h definitions in 2.5.x */
4.5配置setenforce
[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config 修改为: # This file controls the state of SELinux on the system. # SELINUX= can take one of these three values: # enforcing - SELinux security policy is enforced. # permissive - SELinux prints warnings instead of enforcing. # disabled - No SELinux policy is loaded. SELINUX=disabled # SELINUXTYPE= can take one of these two values: # targeted - Targeted processes are protected, # mls - Multi Level Security protection. SELINUXTYPE=targeted 本操作重启生效,如果不想重启,可以临时设置setenforce=0,临时生效 [cevent@hadoop207 apache-flume-1.7.0]$ sudo setenforce 0
4.6启动ganglia
[cevent@hadoop207 apache-flume-1.7.0]$ sudo service httpd start 正在启动 httpd: [确定] [cevent@hadoop207 apache-flume-1.7.0]$ sudo service gmetad start Starting GANGLIA gmetad: Warning: we failed to resolve data source name hadoop.cevent.com [确定] [cevent@hadoop207 apache-flume-1.7.0]$ sudo service gmond start Starting GANGLIA gmond: [确定]
生效连接:http://hadoop207.cevent.com/ganglia/
4.7配置flume-env.sh,测试监控
[cevent@hadoop207 apache-flume-1.7.0]$ vim conf/flume-env.sh # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced # during Flume startup. # Enviroment variables can be set here. #JAVA_HOME export JAVA_HOME=/opt/module/jdk1.7.0_79 export PATH=$PATH:$JAVA_HOME/bin #HADOOP_HOME export HADOOP_HOME=/opt/module/hadoop-2.7.2 export PATH=$PATH:$HADOOP_HOME/bin export PATH=$PATH:$HADOOP_HOME/sbin #HIVE_HOME export HIVE_HOME=/opt/module/hive-1.2.1 export PATH=$PATH:$HIVE_HOME/bin # Give Flume more memory and pre-allocate, enable remote monitoring via JMX # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" export JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=hadoop207.cevent.com:8649 -Xms100m -Xmx200m" # Let Flume write raw event data and configuration information to its log files for debugging # purposes. Enabling these flags is not recommended in production, # as it may result in logging sensitive user information or encryption secrets. # export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true " # Note that the Flume conf directory is always included in the classpath. #FLUME_CLASSPATH=""
4.8启动
[cevent@hadoop207 apache-flume-1.7.0]$ bin/flume-ng agent \ > --conf conf/ \ > --name a1 \ > --conf-file job/flume-netcat-logger.conf \ > -Dflume.root.logger==INFO,console \ > -Dflume.monitoring.type=ganglia \ > -Dflume.monitoring.hosts=hadoop207.cevent.com:8649 Info: Sourcing environment configuration script /opt/module/apache-flume-1.7.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/opt/module/hadoop-2.7.2/bin/hadoop) for HDFS access [cevent@hadoop207 job]$ nc localhost 44444 cevent619 OK echo1226 OK
相关文章推荐
- flume笔记(四)之 自定义Interceptor、Source、Sink
- Flume自定义Source、Sink和Interceptor(简单功能实现)
- flume-ng 自定义sink消费flume source
- flume自定义sink source
- 分析hadoop日志之前传-采用flume进行日志收集
- flume自定义source,sink
- flume自定义interceptor
- 使用Flume+Kafka+SparkStreaming进行实时日志分析
- 微软企业库5.0 学习之路——第九步、使用PolicyInjection模块进行AOP—PART4——建立自定义Call Handler实现用户操作日志记录
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战006-DataStream与MySql自定义sink和source(Scala版)001
- flume的source, channel, sink 列表
- Flume(5)-Ganglia监控
- Hadoop实战-Flume之Source interceptor(十一)(2017-05-16 22:40)
- ubuntu12.04通过Ganglia利用NVML模块进行GPU监控
- Flume学习笔记 --- Flume内置source,channel, sink介绍
- 自定义Spring的Aop切面进行参数校验和日志记录
- Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行
- Flume组件汇总 source、sink、channel
- flume开发--自定义Sink
- flume中几种常见的source、channel、sink