您的位置:首页 > 其它


2013-06-27 10:51 344 查看
1.Flume Sink Processors测试
#Failover Sink Processor
Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered)
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname =
a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname =
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

echo "<37>test1 failover" | nc localhost 5140

2013-06-05 00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }

echo "<37>test2 failover" | nc localhost 5140

2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }
2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65 72 test2 failover }

echo "<37>test4 failover" | nc localhost 5140
echo "<37>test5 failover" | nc localhost 5140

2013-06-05 00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65 72 test4 failover }
2013-06-05 00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65 72 test5 failover }

#Load balancing Sink Processor测试
Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed.

#配置文件,注:load balance type下必须指定同一个channel到不同的sinks,否则不生效
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname =
a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname =
a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

[root@cc-staging-loginmgr2 ~]# echo "<37>test2 loadbalance" | nc localhost 5140
[root@cc-staging-loginmgr2 ~]# echo "<37>test3 loadbalance" | nc localhost 5140
[root@cc-staging-loginmgr2 ~]# echo "<37>test4 loadbalance" | nc localhost 5140
[root@cc-staging-loginmgr2 ~]# echo "<37>test5 loadbalance" | nc localhost 5140

2013-06-06 01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C 61 6E 63 test2 loadbalanc }
2013-06-06 01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C 61 6E 63 test4 loadbalanc }

2013-06-06 01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C 61 6E 63 test3 loadbalanc }
2013-06-06 01:36:37,057 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C 61 6E 63 test5 loadbalanc }

2. Event Serializers测试
Body Text Serializer
Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification(把body中的内容变成文本内容)

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' http://localhost:5140
#查看file roll 文件中的文本内容
cat /var/log/flume/1370675739270-1

#Avro Event Serializer
Alias: avro_event. This interceptor serializes Flume events into an Avro container file
把flume event变成avro 中包含的文件

1.Flume Interceptors测试
Timestamp Interceptor
This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp

Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind =
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}.

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console

echo "<37>test dynamic interceptor" | nc localhost 5140

./hadoop dfs -ls hdfs://
Found 1 items
-rw-r--r-- 3 root supergroup 140 2013-06-16 23:32 /user/hadoop/flume/collected/2013-06-16/2331/cc-staging-loginmgr2..1371450697118

Static Interceptor
Static interceptor allows user to append a static header with static value to all events

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console

echo "<37>test1 static interceptor" | nc localhost 5140

2013-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }

2. zabbix监控Flume
Young GC counts
sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $6}'

Full GC counts
sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $8}'

JVM total memory usage
sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $3}'

JVM total instances usage
sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $2}'

启动时加上JSON repoting参数,这样就可以通过http://localhost:34545/metrics访问
flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done

[root@cc-staging-loginmgr2 conf]# curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'


[root@cc-staging-loginmgr2 conf]#cat /opt/scripts/monitor_flume.sh
curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'|grep $1|awk -F: '{print $2}'

#在zabbix agent配置文件进行部署
cat /etc/zabbix/zabbix_agentd/zabbix_agentd.userparams.conf
UserParameter=ygc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $6}'
UserParameter=fgc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $8}'
UserParameter=jvm.memory.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $3}'
UserParameter=jvm.instances.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $2}'
UserParameter=flume.monitor[*],/bin/bash /opt/scripts/monitor_flume.sh $1

本文出自 “改变你成就你” 博客,请务必保留此出处http://ydt619.blog.51cto.com/316163/1230597
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息