您的位置:首页 > 其它

Flume-ng分布式环境的部署和配置(三)

2013-06-27 10:51 344 查看
1.Flume Sink Processors测试
#Failover Sink Processor
Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered)
#配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

#生成测试log
echo "<37>test1 failover" | nc localhost 5140

#在sink2上产生log,sink1由于优先级小,没有产生
2013-06-05 00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }

#主动关闭sink2,再次生成测试log
echo "<37>test2 failover" | nc localhost 5140

#在sink1上会同时生成test1和test2
2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }
2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65 72 test2 failover }

#再次打开sink2,log会根据优先级再到sink2上
echo "<37>test4 failover" | nc localhost 5140
echo "<37>test5 failover" | nc localhost 5140

2013-06-05 00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65 72 test4 failover }
2013-06-05 00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65 72 test5 failover }

#Load balancing Sink Processor测试
Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed.

#配置文件,注:load balance type下必须指定同一个channel到不同的sinks,否则不生效
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#生成4个测试log
[root@cc-staging-loginmgr2 ~]# echo "<37>test2 loadbalance" | nc localhost 5140
[root@cc-staging-loginmgr2 ~]# echo "<37>test3 loadbalance" | nc localhost 5140
[root@cc-staging-loginmgr2 ~]# echo "<37>test4 loadbalance" | nc localhost 5140
[root@cc-staging-loginmgr2 ~]# echo "<37>test5 loadbalance" | nc localhost 5140

#查看sink输出结果是否为轮询模式
Sink1:
2013-06-06 01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C 61 6E 63 test2 loadbalanc }
2013-06-06 01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C 61 6E 63 test4 loadbalanc }

Sink2:
2013-06-06 01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C 61 6E 63 test3 loadbalanc }
2013-06-06 01:36:37,057 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C 61 6E 63 test5 loadbalanc }

2. Event Serializers测试
Body Text Serializer
Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification(把body中的内容变成文本内容)

#配置文件
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

#生成测试log
curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' http://localhost:5140
#查看file roll 文件中的文本内容
cat /var/log/flume/1370675739270-1
TEST1 BODY TEXT
TEST2 BODY TEXT
TEST3 BODY TEXT

#Avro Event Serializer
Alias: avro_event. This interceptor serializes Flume events into an Avro container file
把flume event变成avro 中包含的文件

1.Flume Interceptors测试
Timestamp Interceptor
This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp

Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host

#配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}.

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#启动agent
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console

#生成测试log
echo "<37>test dynamic interceptor" | nc localhost 5140

#查看hdfs生成的文件,可以看到timestamp和hostname都已经生成在header里面,可以根据自定义的格式生成文件夹
./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-06-16/2331/
Found 1 items
-rw-r--r-- 3 root supergroup 140 2013-06-16 23:32 /user/hadoop/flume/collected/2013-06-16/2331/cc-staging-loginmgr2..1371450697118

Static Interceptor
Static interceptor allows user to append a static header with static value to all events

#配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#启动agent
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console

#生成测试log
echo "<37>test1 static interceptor" | nc localhost 5140

#查看console输出结果
2013-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }

2. zabbix监控Flume
#JVM性能监控
Young GC counts
sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $6}'

Full GC counts
sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $8}'

JVM total memory usage
sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $3}'

JVM total instances usage
sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $2}'

#flume应用参数监控
启动时加上JSON repoting参数,这样就可以通过http://localhost:34545/metrics访问
flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

#生成一些数据
for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done

#通过shell脚本对JSON输出进行排版
[root@cc-staging-loginmgr2 conf]# curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'

CHANNEL.c1:
EventPutSuccessCount:100
ChannelFillPercentage:0.0
Type:CHANNEL
StopTime:0
EventPutAttemptCount:100
ChannelSize:0
StartTime:1371709073310
EventTakeSuccessCount:100
ChannelCapacity:1000
EventTakeAttemptCount:115

#配置监控flume的脚本文件
[root@cc-staging-loginmgr2 conf]#cat /opt/scripts/monitor_flume.sh
curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'|grep $1|awk -F: '{print $2}'

#在zabbix agent配置文件进行部署
cat /etc/zabbix/zabbix_agentd/zabbix_agentd.userparams.conf
UserParameter=ygc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $6}'
UserParameter=fgc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $8}'
UserParameter=jvm.memory.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $3}'
UserParameter=jvm.instances.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $2}'
UserParameter=flume.monitor[*],/bin/bash /opt/scripts/monitor_flume.sh $1

本文出自 “改变你成就你” 博客,请务必保留此出处http://ydt619.blog.51cto.com/316163/1230597
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: