您的位置:首页 > 运维架构

flume自定义分流日志interceptor、source、sink,采用ganglia进行flume日志数据流监控

2020-06-29 04:57 1046 查看

1.Interceptor分流日志

1.1idea开发interceptor分流日志



1.2 创建自定义interceptor.jar

package com.cevent.interceptor;/**
* Created by Cevent on 2020/6/13.
*/

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

/**
* @author cevent
* @description 拦截器:文件分流
* @date 2020/6/13 21:40
*/
public class FlumeInterceptor implements Interceptor{
//1.初始化方法
public void initialize() {

}

//2.单一事件:添加自定义header所需<key,value>对
public Event intercept(Event event) {
//2.1获取header和body
Map<String,String> headers=event.getHeaders();
byte[] body=event.getBody();

//2.2根据body内容,添加自定义KEY,VALUE进入header,字符码判断是否是数字<='9',0-9之间
if(body[0]<='9' && body[0]>='0'){
headers.put("type","number");
}else{
headers.put("type","not_number");
}

return event;
}

//3.事件集合List
public List<Event> intercept(List<Event> eventlist) {
//遍历集合
for(Event event:eventlist){
intercept(event);
}

return eventlist;
}

//4.初始化开启资源,最后关闭
public void close() {

}

//5.创建内部类,实现builder
public static class InterceptorBuilderMe implements Interceptor.Builder {

public Interceptor build() {
return new FlumeInterceptor();
}

public void configure(Context context) {

}
}
}

1.3导入jar

2.自定义Source

2.1source源码

链接:http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#thrift-legacy-source

package com.cevent.source;/**
* Created by Cevent on 2020/6/14.
*/

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

/**
* @author cevent
* @description
* @date 2020/6/14 10:32
*/
public class CeventSource extends AbstractSource implements Configurable,PollableSource{

//使用configure配置自定义source元
private String prefix;
private long interval;

//1.source:获取上游数据,包装成event,发送给channel
public Status process() throws EventDeliveryException {
//1.1模拟上游:每隔一段时间向channel发送5条虚拟数据
//投递状态
Status status=null;
//获取channel
ChannelProcessor channelProcessor=this.getChannelProcessor();

//1.2数据处理的方法
try {
for(int i=1;i<=5;i++){
//1.3将数据包装为event
Event event=new SimpleEvent();
event.setHeaders(new HashMap<String, String>());
//event.setBody(("LOG"+i).getBytes());

event.setBody((prefix+i).getBytes());

//1.4获取channel进程
channelProcessor.processEvent(event);

//Thread.sleep(1000);
Thread.sleep(interval);
}

//1.5输出处理成功
status=Status.READY;
}
catch (InterruptedException e){
//1.6数据处理失败
status=Status.BACKOFF;
}

return status;
}

//2.source投送到channel,channel反馈已满载(投递失败),需要等待挂起,设置2s
public long getBackOffSleepIncrement() {
return 2000;
}

//3.投递失败,最大等待时间
public long getMaxBackOffSleepInterval() {
return 10000;
}

//4.根据配置文件,配置自定义source
public void configure(Context context) {
//4.1自定义source前缀,不用process的"LOG",如果没有获取到key,则使用默认value"LOG"
prefix=context.getString("defined_prefix","LOG");
interval=context.getLong("defined_interval",1000L);
}
}

2.2配置、启动source.conf

[cevent@hadoop207 apache-flume-1.7.0]$ ll
总用量 176
drwxr-xr-x.  2 cevent cevent  4096 6月  11 13:35 bin
-rw-r--r--.  1 cevent cevent 77387 10月 11 2016 CHANGELOG
drwxr-xr-x.  2 cevent cevent  4096 6月  12 12:11 conf
-rw-r--r--.  1 cevent cevent  6172 9月  26 2016 DEVNOTES
-rw-r--r--.  1 cevent cevent  2873 9月  26 2016 doap_Flume.rdf
drwxr-xr-x. 10 cevent cevent  4096 10月 13 2016 docs
drwxrwxr-x.  2 cevent cevent  4096 6月  12 16:43 files
drwxrwxr-x.  6 cevent cevent  4096 6月  13 22:21 job
drwxrwxr-x.  2 cevent cevent  4096 6月  14 13:26 lib
-rw-r--r--.  1 cevent cevent 27625 10月 13 2016 LICENSE
drwxrwxr-x.  2 cevent cevent  4096 6月  12 11:48 loggers
drwxrwxr-x.  2 cevent cevent  4096 6月  11 17:05 logs
-rw-r--r--.  1 cevent cevent   249 9月  26 2016 NOTICE
-rw-r--r--.  1 cevent cevent  2520 9月  26 2016 README.md
-rw-r--r--.  1 cevent cevent  1585 10月 11 2016 RELEASE-NOTES
-rw-rw-r--.  1 cevent cevent   177 6月  12 16:57 tail_dir.json
drwxrwxr-x.  2 cevent cevent  4096 6月  11 13:35 tools
-rw-rw-r--.  1 cevent cevent    16 6月  12 16:45 tutu.txt
drwxrwxr-x.  3 cevent cevent  4096 6月  12 14:23 upload
[cevent@hadoop207 apache-flume-1.7.0]$ cd
job/
[cevent@hadoop207 job]$ ll
总用量 32
-rw-rw-r--. 1 cevent cevent 1542 6月  12 14:22 flume-dir-hdfs.conf
-rw-rw-r--. 1 cevent cevent 1641 6月  12 13:36 flume-file-hdfs.conf
-rw-rw-r--. 1 cevent cevent  495 6月  11 17:02 flume-netcat-logger.conf
-rw-rw-r--. 1 cevent cevent 1522 6月  12 16:40 flume-taildir-hdfs.conf
drwxrwxr-x. 2 cevent cevent 4096 6月  13 13:28 group1
drwxrwxr-x. 2 cevent cevent 4096 6月  13 14:56 group2
drwxrwxr-x. 2 cevent cevent 4096 6月  13 20:59 group3
drwxrwxr-x. 2 cevent cevent 4096 6月  13 22:38 interceptor
[cevent@hadoop207 job]$ mkdir source
[cevent@hadoop207 job]$ vim source/flume-source-1.conf
### 自定义源数据source
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.cevent.source.CeventSource
##jar自定义属性
a1.sources.r1.defined_interval = 1000
a1.sources.r1.defined_prefix = ceventLOG

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

~
"source/flume-source-1.conf" [新] 23L, 589C 已写入
[cevent@hadoop207 job]$ cd ../..
[cevent@hadoop207 module]$ cd
apache-flume-1.7.0/
[cevent@hadoop207 apache-flume-1.7.0]$ ll
总用量 176
drwxr-xr-x.  2 cevent cevent  4096 6月  11 13:35 bin
-rw-r--r--.  1 cevent cevent 77387 10月 11 2016 CHANGELOG
drwxr-xr-x.  2 cevent cevent  4096 6月  12 12:11 conf
-rw-r--r--.  1 cevent cevent  6172 9月  26 2016 DEVNOTES
-rw-r--r--.  1 cevent cevent  2873 9月  26 2016 doap_Flume.rdf
drwxr-xr-x. 10 cevent cevent  4096 10月 13 2016 docs
drwxrwxr-x.  2 cevent cevent  4096 6月  12 16:43 files
drwxrwxr-x.  7 cevent cevent  4096 6月  14 13:30 job
drwxrwxr-x.  2 cevent cevent  4096 6月  14 13:26 lib
-rw-r--r--.  1 cevent cevent 27625 10月 13 2016 LICENSE
drwxrwxr-x.  2 cevent cevent  4096 6月  12 11:48 loggers
drwxrwxr-x.  2 cevent cevent  4096 6月  11 17:05 logs
-rw-r--r--.  1 cevent cevent   249 9月  26 2016 NOTICE
-rw-r--r--.  1 cevent cevent  2520 9月  26 2016 README.md
-rw-r--r--.  1 cevent cevent  1585 10月 11 2016 RELEASE-NOTES
-rw-rw-r--.  1 cevent cevent   177 6月  12 16:57 tail_dir.json
drwxrwxr-x.  2 cevent cevent  4096 6月  11 13:35 tools
-rw-rw-r--.  1 cevent cevent    16 6月  12 16:45 tutu.txt
drwxrwxr-x.  3 cevent cevent  4096 6月  12 14:23 upload
[cevent@hadoop207 apache-flume-1.7.0]$ 启动并控制台打印数据
bin/flume-ng agent
-n a1 -c conf/ -f job/source/flume-source-1.conf
-Dflume.root.logger=INFO,console
Info: Sourcing environment configuration script
/opt/module/apache-flume-1.7.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/opt/module/hadoop-2.7.2/bin/hadoop)
for HDFS access
Info: Including Hive libraries found via (/opt/module/hive-1.2.1) for
Hive access
+ exec /opt/module/jdk1.7.0_79/bin/java -Xmx20m
-Dflume.root.logger=INFO,console -cp
'/opt/module/apache-flume-1.7.0/conf:/opt/module/apache-flume-1.7.0/lib/*:/opt/module/hadoop-2.7.2/etc/hadoop:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/common/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.2/contrib/capacity-scheduler/*.jar:/opt/module/hive-1.2.1/lib/*'
-Djava.library.path=:/opt/module/hadoop-2.7.2/lib/native
org.apache.flume.node.Application -n a1 -f job/source/flume-source-1.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/module/apache-flume-1.7.0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
2020-06-14 13:32:12,811 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)]
Configuration provider starting
2020-06-14 13:32:12,816 (conf-file-poller-0) [INFO -
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)]
Reloading configuration file:job/source/flume-source-1.conf
2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)]
Added sinks: k1 Agent: a1
2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]
Processing:k1
2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]
Processing:k1
2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)]
Post-validation flume configuration contains configuration for agents: [a1]
2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO -
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)]
Creating channels
2020-06-14 13:32:12,856 (conf-file-poller-0) [INFO -
org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
Creating instance of channel c1 type memory
2020-06-14 13:32:12,860 (conf-file-poller-0) [INFO -
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)]
Created channel c1
2020-06-14 13:32:12,861 (conf-file-poller-0) [INFO -
org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)]
Creating instance of source r1, type com.cevent.source.CeventSource
2020-06-14 13:32:12,870 (conf-file-poller-0) [INFO -
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)]
Creating instance of sink: k1, type: logger
2020-06-14 13:32:12,873 (conf-file-poller-0) [INFO -
org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)]
Channel c1 connected to [r1, k1]
2020-06-14 13:32:12,881 (conf-file-poller-0) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:137)]
Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: {
source:com.cevent.source.CeventSource{name:r1,state:IDLE} counterGroup:{
name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@584c4b3e
counterGroup:{ name:null counters:{} } }}
channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2020-06-14 13:32:12,896 (conf-file-poller-0) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:144)]
Starting Channel c1
2020-06-14 13:32:12,934 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
Monitored counter group for type: CHANNEL, name: c1: Successfully registered
new MBean.
2020-06-14 13:32:12,935 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
Component type: CHANNEL, name: c1 started
2020-06-14 13:32:12,936 (conf-file-poller-0) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:171)]
Starting Sink k1
2020-06-14 13:32:12,937 (conf-file-poller-0) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:182)]
Starting Source r1
2020-06-14 13:32:12,948
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]
Event: { headers:{} body: 63 65 76 65 6E 74 4C 4F 47 31                   ceventLOG1 }
2020-06-14 13:32:13,947
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {
headers:{} body: 63 65 76 65 6E 74 4C 4F 47 32                   ceventLOG2 }
2020-06-14 13:32:14,947
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {
headers:{} body: 63 65 76 65 6E 74 4C 4F 47 33                   ceventLOG3 }

2.3source配置

### 自定义源数据source
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =
com.cevent.source.CeventSource
##jar自定义属性
a1.sources.r1.defined_interval = 1000
a1.sources.r1.defined_prefix = ceventLOG

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in
memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.4自定义源文件配置

(1)自定义源文件配置

package com.cevent.source;/**

* Created by Cevent on 2020/6/14.

*/

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.PollableSource;

import org.apache.flume.channel.ChannelProcessor;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.SimpleEvent;

import org.apache.flume.source.AbstractSource;

import java.io.*;

import java.util.HashMap;

/**

* @author cevent

* @description

* @date 2020/6/14 10:32

*/

public class CeventSourceFileData extends AbstractSource implements Configurable,PollableSource{

//使用configure配置自定义source元

private String file;

private long interval;

//1.source:获取上游数据,包装成event,发送给channel

public Status process() throws EventDeliveryException {

//1.1模拟上游:每隔一段时间向channel发送5条虚拟数据

//投递状态

Status status=null;

//获取channel

ChannelProcessor channelProcessor=this.getChannelProcessor();

BufferedReader bufferedReader=null;

//1.2数据处理的方法

try {

bufferedReader= new BufferedReader(new InputStreamReader(new FileInputStream(file)));

String strBuffered;

while ((strBuffered=bufferedReader.readLine())!=null){

Event event=new SimpleEvent();

event.setBody(strBuffered.getBytes());

event.setHeaders(new HashMap<String, String>());

//打包event

channelProcessor.processEvent(event);

Thread.sleep(interval);

}

//1.5输出处理成功

status=Status.READY;

}

catch (IOException  e){

//1.6数据处理失败

status=Status.BACKOFF;

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

if(bufferedReader!=null){

try {

bufferedReader.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

return status;

}

//2.source投送到channel,channel反馈已满载(投递失败),需要等待挂起,设置2s

public long getBackOffSleepIncrement() {

return 2000;

}

//3.投递失败,最大等待时间

public long getMaxBackOffSleepInterval() {

return 10000;

}

//4.根据配置文件,配置自定义source

public void configure(Context context) {

//4.1自定义source前缀,不用process的"LOG",如果没有获取到key,则使用默认value"LOG"

interval=context.getLong("defined_interval",1000L);

file=context.getString("defined_file");

}

}

2.5配置source.conf

### 自定义源数据文件source-file
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =
com.cevent.source.CeventSourceFileData
##jar自定义属性
a1.sources.r1.defined_interval = 1000
a1.sources.r1.defined_file =
/opt/module/datas/dept.txt

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in
memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.6启动source-file

[cevent@hadoop207 module]$ cd apache-flume-1.7.0/
[cevent@hadoop207 apache-flume-1.7.0]$ ll
总用量 176
drwxr-xr-x.  2 cevent cevent  4096 6月  11 13:35 bin
-rw-r--r--.  1 cevent cevent 77387 10月 11 2016 CHANGELOG
drwxr-xr-x.  2 cevent cevent  4096 6月  12 12:11 conf
-rw-r--r--.  1 cevent cevent  6172 9月  26 2016 DEVNOTES
-rw-r--r--.  1 cevent cevent  2873 9月  26 2016 doap_Flume.rdf
drwxr-xr-x. 10 cevent cevent  4096 10月 13 2016 docs
drwxrwxr-x.  2 cevent cevent  4096 6月  12 16:43 files
drwxrwxr-x.  7 cevent cevent  4096 6月  14 13:30 job
drwxrwxr-x.  2 cevent cevent  4096 6月  14 13:26 lib
-rw-r--r--.  1 cevent cevent 27625 10月 13 2016 LICENSE
drwxrwxr-x.  2 cevent cevent  4096 6月  12 11:48 loggers
drwxrwxr-x.  2 cevent cevent  4096 6月  11 17:05 logs
-rw-r--r--.  1 cevent cevent   249 9月  26 2016 NOTICE
-rw-r--r--.  1 cevent cevent  2520 9月  26 2016 README.md
-rw-r--r--.  1 cevent cevent  1585 10月 11 2016 RELEASE-NOTES
-rw-rw-r--.  1 cevent cevent   177 6月  12 16:57 tail_dir.json
drwxrwxr-x.  2 cevent cevent  4096 6月  11 13:35 tools
-rw-rw-r--.  1 cevent cevent    16 6月  12 16:45 tutu.txt
drwxrwxr-x.  3 cevent cevent  4096 6月  12 14:23 upload
[cevent@hadoop207 apache-flume-1.7.0]$ 启动conf
bin/flume-ng agent -n a1 -c conf/ -f job/source/flume-source-1.conf
-Dflume.root.logger=INFO,console
Info: Sourcing environment configuration
script /opt/module/apache-flume-1.7.0/conf/flume-env.sh
Info: Including Hadoop libraries found
via (/opt/module/hadoop-2.7.2/bin/hadoop) for HDFS access
Info: Including Hive libraries found via
(/opt/module/hive-1.2.1) for Hive access
+ exec /opt/module/jdk1.7.0_79/bin/java -Xmx20m
-Dflume.root.logger=INFO,console -cp '/opt/module/apache-flume-1.7.0/conf:/opt/module/apache-flume-1.7.0/lib/*:/opt/module/hadoop-2.7.2/etc/hadoop:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/common/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/yarn/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.2/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.2/contrib/capacity-scheduler/*.jar:/opt/module/hive-1.2.1/lib/*'
-Djava.library.path=:/opt/module/hadoop-2.7.2/lib/native
org.apache.flume.node.Application -n a1 -f job/source/flume-source-1.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/module/apache-flume-1.7.0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
2020-06-14 13:32:12,811 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)]
Configuration provider starting
2020-06-14 13:32:12,816 (conf-file-poller-0) [INFO -
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)]
Reloading configuration file:job/source/flume-source-1.conf
2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)]
Added sinks: k1 Agent: a1
2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]
Processing:k1
2020-06-14 13:32:12,824 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]
Processing:k1
2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO -
org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)]
Post-validation flume configuration contains configuration for agents: [a1]
2020-06-14 13:32:12,841 (conf-file-poller-0) [INFO -
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)]
Creating channels
2020-06-14 13:32:12,856 (conf-file-poller-0) [INFO -
org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
Creating instance of channel c1 type memory
2020-06-14 13:32:12,860 (conf-file-poller-0) [INFO -
org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)]
Created channel c1
2020-06-14 13:32:12,861 (conf-file-poller-0) [INFO -
org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)]
Creating instance of source r1, type com.cevent.source.CeventSource
2020-06-14 13:32:12,870 (conf-file-poller-0) [INFO -
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)]
Creating instance of sink: k1, type: logger
2020-06-14 13:32:12,873 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)]
Channel c1 connected to [r1, k1]
2020-06-14 13:32:12,881 (conf-file-poller-0) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:137)]
Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: {
source:com.cevent.source.CeventSource{name:r1,state:IDLE} counterGroup:{
name:null counters:{} } }} sinkRunners:{k1=SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@584c4b3e counterGroup:{
name:null counters:{} } }}
channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2020-06-14 13:32:12,896 (conf-file-poller-0) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting
Channel c1
2020-06-14 13:32:12,934 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
Monitored counter group for type: CHANNEL, name: c1: Successfully registered
new MBean.
2020-06-14 13:32:12,935 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
Component type: CHANNEL, name: c1 started
2020-06-14 13:32:12,936 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)]
Starting Sink k1
2020-06-14 13:32:12,937 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)]
Starting Source r1
2020-06-14 13:32:12,948
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {
headers:{} body: 63 65 76 65 6E 74 4C 4F 47 31                   ceventLOG1 }
2020-06-14 13:32:13,947
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {
headers:{} body: 63 65 76 65 6E 74 4C 4F 47 32                   ceventLOG2 }
2020-06-14 13:32:14,947
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {
headers:{} body: 63 65 76 65 6E 74 4C 4F 47 33                   ceventLOG3 }
2020-06-14 13:32:15,947
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {
headers:{} body: 63 65 76 65 6E 74 4C 4F 47 34                   ceventLOG4 }
2020-06-14 13:32:16,948
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: {
headers:{} body: 63 65 76 65 6E 74 4C 4F 47 35                   ceventLOG5 }

3.自定义Sink

3.1jar包配置

1.     jar包配置

package com.cevent.sink;/**

* Created by Cevent on 2020/6/14.

*/

import org.apache.flume.*;

import org.apache.flume.conf.Configurable;

import org.apache.flume.sink.AbstractSink;

/**

* @author cevent

* @description sink事务控制channel的传输

* @date 2020/6/14 14:17

*/

public class CeventSink extends AbstractSink implements Configurable {

private String prefix;

private String suffix;

/**

* 轮循数据方法,执行channel数据传输

* @return

* @throws EventDeliveryException

*/

public Status process() throws EventDeliveryException {

Status status=null;

//1.获取channel

Channel channel=this.getChannel();

//2.开启事务

Transaction transaction=channel.getTransaction();

transaction.begin();

try {

//3.处理数据

Event event;

//如果获取的数据=null,则等待

while((event=channel.take())==null){

Thread.sleep(1000);

}

//获取event成功

byte [] body=event.getBody();

String line=new String(body,"UTF-8");

System.out.println(prefix+line+suffix);

status=Status.READY;

//成功提交事务

transaction.commit();

}catch (Exception e){

//失败回滚

transaction.rollback();

status=Status.BACKOFF;

}finally {

//关闭事务

transaction.close();

}

return status;

}

/**

* 配置

* @param context

*/

public void configure
4000
(Context context) {

prefix=context.getString("prefix","cevent_prefix");

suffix=context.getString("suffix","cevent_suffix");

}

}

3.2Sink-conf配置

[cevent@hadoop207 job]$ mkdir sinks
[cevent@hadoop207 job]$ ll
总用量 40
-rw-rw-r--. 1 cevent cevent 1542 6月  12 14:22 flume-dir-hdfs.conf
-rw-rw-r--. 1 cevent cevent 1641 6月  12 13:36 flume-file-hdfs.conf
-rw-rw-r--. 1 cevent cevent  495 6月  11 17:02
flume-netcat-logger.conf
-rw-rw-r--. 1 cevent cevent 1522 6月  12 16:40 flume-taildir-hdfs.conf
drwxrwxr-x. 2 cevent cevent 4096 6月  13 13:28 group1
drwxrwxr-x. 2 cevent cevent 4096 6月  13 14:56 group2
drwxrwxr-x. 2 cevent cevent 4096 6月  13 20:59 group3
drwxrwxr-x. 2 cevent cevent 4096 6月  13 22:38 interceptor
drwxrwxr-x. 2 cevent cevent 4096 6月  14 16:51 sinks
drwxrwxr-x. 2 cevent cevent 4096 6月  14 14:07 source
[cevent@hadoop207 job]$ vim sinks/flume-sinks-1.conf
### 自定义sinks控制channel输出
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.cevent.sink.CeventSink
#a1.sinks.k1.prefix = cevent619:
a1.sinks.k1.suffix = :cevent619

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
~

3.3 启动nc

[cevent@hadoop207 apache-flume-1.7.0]$ nc
localhost 44444
cev123
OK

rce.start(NetcatSource.java:169)] Created
serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
cevent_prefixcev123:cevent619

3.4onf配置解析

### 自定义sinks控制channel输出
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.cevent.sink.CeventSink
#a1.sinks.k1.prefix = cevent619:
a1.sinks.k1.suffix = :cevent619

# Use a channel which buffers events in
memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.flume数据流监控ganglia

4.1安装yum-gangliam依赖

### ganglia神经网络==>数据监控
#1.安装httpd服务与php
sudo yum -y install httpd php
#2.安装 rrd tool 和 apr devel依赖
sudo yum -y install rrdtool
perl-rrdtool-rrdtool-devel
sudo yum -y install apr-devel
#3.安装ganglia,及3组件(gmetad、web、gmond)
sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
sudo yum -y install ganglia-gmetad
sudo yum -y install ganglia-web
sudo yum -y install ganglia-gmond

4.2配置ganglia前端

[cevent@hadoop207 apache-flume-1.7.0]$
sudo vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web
frontend
#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>

Order deny,allow

Deny from all

Allow from 127.0.0.1

Allow from ::1
#
Allow from .example.com
</Location>

解析

[cevent@hadoop207 apache-flume-1.7.0]$ sudo
vim /etc/httpd/conf.d/ganglia.conf

#

# Ganglia monitoring system php web
frontend

#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>

Order deny,allow

Deny from all

Allow from 127.0.0.1

Allow from ::1

#
Allow from .example.com

</Location>

<Location /ganglia>

Order deny,allow

## 拒绝所有连接

Deny from all

## 允许指定IP连接

Allow from 127.0.0.1

## 允许所有IP连接

Allow from all

## 允许主机连接

Allow from 192.168.1.1

Allow from ::1

#
Allow from .example.com

</Location>

优化配置

[cevent@hadoop207 apache-flume-1.7.0]$ sudo vim /etc/httpd/conf.d/ganglia.conf

Allow from 127
20000
.0.0.1
#
# Ganglia monitoring system php web
frontend
#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>

Order deny,allow

Deny from all

Allow from 127.0.0.1

Allow from 192.168.1.1

Allow from ::1
#
Allow from .example.com
</Location>
~
~

4.3配置数据库gmetad.conf

[cevent@hadoop207 apache-flume-1.7.0]$ sudo vim /etc/ganglia/gmetad.conf
[sudo] password for cevent:
# This is an example of a Ganglia Meta
Daemon configuration file
#
http://ganglia.sourceforge.net/
#
#
#-------------------------------------------------------------------------------
# Setting the debug_level to 1 will keep
daemon in the forground and
# show only error messages. Setting this
value higher than 1 will make
# gmetad output debugging information and
stay in the foreground.
# default: 0
# debug_level 10
#
#-------------------------------------------------------------------------------
# What to monitor. The most important
section of this file.
#
# The data_source tag specifies either a cluster
or a grid to
# monitor. If we detect the source is a
cluster, we will maintain a complete
# set of RRD databases for it, which can
be used to create historical
# graphs of the metrics. If the source is
a grid (it comes from another gmetad),
# we will only maintain summary RRDs for
it.
#
# Format:
# data_source "my cluster"
[polling interval] address1:port addreses2:port ...
#
# The keyword 'data_source' must
immediately be followed by a unique
# string which identifies the source,
then an optional polling interval in
# seconds. The source will be polled at
this interval on average.
# If the polling interval is omitted,
15sec is asssumed.
#
# If you choose to set the polling
interval to something other than the default,
# note that the web frontend determines a
host as down if its TN value is less
# than 4 * TMAX (20sec by default).  Therefore, if you set the polling interval
# to something around or greater than
80sec, this will cause the frontend to
# incorrectly display hosts as down even
though they are not.
#
# A list of machines which service the
data source follows, in the
# format ip:port, or name:port. If a port
is not specified then 8649
# (the default gmond port) is assumed.
# default: There is no default value
#
# data_source "my cluster" 10
localhost  my.machine.edu:8649  1.2.3.5:8655
# data_source "my grid" 50
1.3.4.7:8655 grid.org:8651 grid-backup.org:8651
# data_source "another source"
1.3.4.7:8655  1.3.4.8

data_source "cevent ganglia cluster" hadoop.cevent.com

#
# Round-Robin Archives
# You can specify custom Round-Robin
archives here (defaults are listed below)
#
# Old Default RRA: Keep 1 hour of metrics
at 15 second resolution. 1 day at 6 minute
# RRAs "RRA:AVERAGE:0.5:1:244"
"RRA:AVERAGE:0.5:24:244" "RRA:AVERAGE:0.5:168:244"
"RRA:AVERAGE:0.5:672:244" \
#
"RRA:AVERAGE:0.5:5760:374"
# New Default RRA
# Keep 5856 data points at 15 second
resolution assuming 15 second (default) polling. That's 1 day
# Two weeks of data points at 1 minute
resolution (average)
#RRAs "RRA:AVERAGE:0.5:1:5856"
"RRA:AVERAGE:0.5:4:20160" "RRA:AVERAGE:0.5:40:52704"

#
#-------------------------------------------------------------------------------
# Scalability mode. If on, we summarize
over downstream grids, and respect
# authority tags. If off, we take on
2.5.0-era behavior: we do not wrap our output
# in <GRID></GRID> tags, we
ignore all <GRID> tags we see, and always assume
# we are the "authority" on
data source feeds. This approach does not scale to
# large groups of clusters, but is
provided for backwards compatibility.

4.4配置监控后端gmond.conf

[cevent@hadoop207 apache-flume-1.7.0]$
sudo vim /etc/ganglia/gmond.conf
/* This configuration is as close to
2.5.x default behavior as possible

The values closely match ./gmond/metric.h definitions in 2.5.x */
globals {

daemonize = yes

setuid = yes

user = ganglia

debug_level = 0

max_udp_msg_len = 1472

mute = no

deaf = no

allow_extra_data = yes

host_dmax = 86400 /*secs. Expires (removes from web interface) hosts
in 1 day */

host_tmax = 20 /*secs */

cleanup_threshold = 300 /*secs */

gexec = no
#
By default gmond will use reverse DNS resolution when displaying your
hostname
#
Uncommeting following value will override that value.
#
override_hostname = "mywebserver.domain.com"
#
If you are not using multicast this value should be set to something other
than 0.
#
Otherwise if you restart aggregator gmond you will get empty graphs. 60
seconds is reasonable

send_metadata_interval = 0 /*secs */

}

/*
*
The cluster attributes specified will be used as part of the <CLUSTER>
*
tag that will wrap all hosts collected by this instance.
*/
cluster {
name = "cevent ganglia cluster"

owner = "unspecified"

latlong = "unspecified"

url = "unspecified"
}

/* The host section describes attributes
of the host, like the location */
host {

location = "unspecified"
}

/* Feel free to specify as many
udp_send_channels as you like.  Gmond

used to only support having a single channel */
udp_send_channel {

#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source
address
# that resolves to the
machine's hostname.  Without
# this, the metrics
may appear to come from any
# interface and the
DNS names associated with
# those IPs will be
used to create the RRDs.
# mcast_join = 239.2.11.71
host=hadoop207.cevent.com

port = 8649

ttl = 1
}

/* You can specify as many udp_recv_channels
as you like as well. */
udp_recv_channel {
# mcast_join = 239.2.11.71

port = 8649
bind = 0.0.0.0

retry_bind = true
#
Size of the UDP buffer. If you are handling lots of metrics you really
#
should bump it up to e.g. 10MB or even higher.
#
buffer = 10485760
}

/* You can specify as many
tcp_accept_channels as you like to share

an xml description of the state of the cluster */
tcp_accept_channel {

port = 8649
#
If you want to gzip XML output

gzip_output = no
}

/* Channel to receive sFlow datagrams */
#udp_recv_channel {
#
port = 6343
#}

/* Optional sFlow settings */
#sflow {
# udp_port = 6343
# accept_vm_metrics = yes
# accept_jvm_metrics = yes
# multiple_jvm_instances = no
# accept_http_metrics = yes
# multiple_http_instances = no
# accept_memcache_metrics = yes
# multiple_memcache_instances = no
#}

/* Each metrics module that is referenced
by gmond must be specified and

loaded. If the module has been statically linked with gmond, it does

not require a load path. However all dynamically loadable modules must

include a load path. */
modules {

module {

name = "core_metrics"
}

module {

name = "cpu_module"

path = "modcpu.so"
}

module {
/* This configuration is as close to
2.5.x default behavior as possible

The values closely match ./gmond/metric.h definitions in 2.5.x */

4.5配置setenforce

[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config
修改为:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
#     enforcing - SELinux
security policy is enforced.
#     permissive - SELinux
prints warnings instead of enforcing.
#     disabled - No SELinux
policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
#     targeted - Targeted
processes are protected,
#     mls - Multi Level
Security protection.
SELINUXTYPE=targeted

本操作重启生效,如果不想重启,可以临时设置setenforce=0,临时生效

[cevent@hadoop207 apache-flume-1.7.0]$ sudo
setenforce 0

4.6启动ganglia

[cevent@hadoop207 apache-flume-1.7.0]$ sudo service httpd start
正在启动 httpd:                                           [确定]
[cevent@hadoop207 apache-flume-1.7.0]$ sudo service gmetad start
Starting GANGLIA gmetad: Warning: we
failed to resolve data source name hadoop.cevent.com

[确定]
[cevent@hadoop207 apache-flume-1.7.0]$ sudo service gmond start
Starting GANGLIA gmond:                                    [确定]

生效连接:http://hadoop207.cevent.com/ganglia/

4.7配置flume-env.sh,测试监控

[cevent@hadoop207 apache-flume-1.7.0]$
vim conf/flume-env.sh
# Licensed to the Apache Software
Foundation (ASF) under one
# or more contributor license
agreements.  See the NOTICE file
# distributed with this work for
additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License,
Version 2.0 (the
# "License"); you may not use
this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#
http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or
agreed to in writing, software
# distributed under the License is
distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.
# See the License for the specific
language governing permissions and
# limitations under the License.

# If this file is placed at
FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.

# Enviroment variables can be set here.

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.7.0_79
export PATH=$PATH:$JAVA_HOME/bin

#HADOOP_HOME
export
HADOOP_HOME=/opt/module/hadoop-2.7.2
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

#HIVE_HOME
export HIVE_HOME=/opt/module/hive-1.2.1
export PATH=$PATH:$HIVE_HOME/bin

# Give Flume more memory and
pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

export
JAVA_OPTS="-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=hadoop207.cevent.com:8649
-Xms100m
-Xmx200m"

# Let Flume write raw event data and
configuration information to its log files for debugging
# purposes. Enabling these flags is not
recommended in production,
# as it may result in logging sensitive
user information or encryption secrets.
# export JAVA_OPTS="$JAVA_OPTS
-Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true
"

# Note that the Flume conf directory is
always included in the classpath.
#FLUME_CLASSPATH=""

4.8启动

[cevent@hadoop207 apache-flume-1.7.0]$ bin/flume-ng agent \
> --conf conf/ \
> --name a1 \
> --conf-file
job/flume-netcat-logger.conf \
>
-Dflume.root.logger==INFO,console \
>
-Dflume.monitoring.type=ganglia \
>
-Dflume.monitoring.hosts=hadoop207.cevent.com:8649
Info: Sourcing environment configuration
script /opt/module/apache-flume-1.7.0/conf/flume-env.sh
Info: Including Hadoop libraries found
via (/opt/module/hadoop-2.7.2/bin/hadoop) for HDFS access

[cevent@hadoop207 job]$ nc localhost
44444
cevent619
OK
echo1226
OK
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: