您的位置:首页 > 其它

Flume-ng分布式环境的部署和配置(二)

2013-06-27 10:48 381 查看
1.flume sink 测试
测试1 #hdfs sink
Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster
需要安装hadoop

在/usr/local/apache-flume-1.3.1-bin/conf/flume-env.sh加入
export HADOOP_HOME=/usr/local/hadoop

#修改配置文件
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console

#测试产生syslog
echo "<37>hello via syslog to hdfs testing one" | nc -u localhost 5140

#在启动的终端查看console输出,文件生成成功
2013-05-29 00:53:58,078 (hdfs-k1-call-runner-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected//Syslog.1369814037714.tmp
2013-05-29 00:54:28,220 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714.tmp to hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714

#在hadoop上查看文件
./hadoop dfs -cat hdfs://172.25.4.35:9000/user/hadoop/flume/collected/Syslog.1369814037714
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^;>Gv$hello via syslog to hdfs testing one

#修改配置文件以时间形式自动生成目录
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = Syslog.%{host}
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#生成JSON 格式的POST request, header的timestamp 参数如果格式不对则无法解析
需要生成13为的timestamp才能解析出正确的时间,包含MilliSec
#linux生成当前时间10位Unix timestamp
date +%s
#linux生成当前时间13位Unix timestamp
date +%s%N|awk '{print substr($0,1,13)}'

curl -X POST -d '[{ "headers":{"timestamp":"1369818213654","host":"cc-staging-loginmgr2"},"body": "hello via post"}]' http://localhost:5140
#在启动的终端查看console输出,文件生成成功
2013-05-29 02:03:38,646 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp
2013-05-29 02:04:08,714 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp to hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614

#在hadoop上查看文件
./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-05-29/0203
Found 1 items
-rw-r--r-- 3 root supergroup 129 2013-05-29 02:04 /user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614

#测试2 logger sink
Logs event at INFO level. Typically useful for testing/debugging purpose

#测试3 Avro sink
Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair

#Avro Source配置文件
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4545

#Avro Sink配置文件
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545

#先启动Avro的Source,监听端口
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console

#再启动Avro的Sink
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f avro_sink.conf -n a1 -Dflume.root.logger=INFO,console

#可以看到已经建立连接
2013-06-02 19:23:00,237 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7a0e28bf, /172.25.4.32:14894 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:14894

#在Avro Sink上生成测试log
echo "<37>hello via avro sink" | nc localhost 5140

#在Avro Source上可以看到log已经生成
2013-06-02 19:24:13,740 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 61 76 72 6F 20 73 hello via avro s }

#测试4 File Roll Sink
Stores events on the local filesystem

#修改配置文件
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

#启动file roll 配置文件
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f file_roll.conf -n a1 -Dflume.root.logger=INFO,console

#生成测试log
echo "<37>hello via file roll" | nc localhost 5140
echo "<37>hello via file roll 2" | nc localhost 5140

#查看/var/log/flume下是否生成文件,默认每30秒生成一个新文件
-rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1
-rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2
-rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3

cat 1370227443397-1 1370227443397-3
hello via file roll
hello via file roll 2

2.Flume Channels测试
#Memory Channel
The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures

#flume channel selectors
# Replicating Channel Selector通道复制测试
#2个channel和2个sink的配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

#查看是否都建立了连接
2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545
2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518

2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545
2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731

#生成测试log
echo "<37>hello via channel selector" | nc localhost 5140

#查看2个sink是否得到数据
2013-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }

2013-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }

#flume channel selectors
# Multiplexing Channel Selector 通道复用测试
#2个channel和2个sink的配置文件
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2
a1.sources.r1.selector.default = c1

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

#根据配置文件生成测试的header 为state的POST请求
curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' http://localhost:5140
#查看2个sink得到数据是否和配置文件一致
Sink1:
2013-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=CZ} body: 54 45 53 54 31 TEST1 }
2013-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 }

Sink2:
2013-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=US} body: 54 45 53 54 32 TEST2 }

本文出自 “改变你成就你” 博客,请务必保留此出处http://ydt619.blog.51cto.com/316163/1230590
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: