Flume NG 学习笔记(六)Selector(复用与复制)测试
2014-10-24 17:45
176 查看
学习心得(三)流配置中介绍多路复用流的时候,有说到Flume支持从一个源发送事件到多个通道中,这被称为事件流的复用。这里需要在配置中定义事件流的复制/复用,选择1个或者多个通道进行数据流向。
而关于selector配置前面也讲过:
<Agent>.sources.<Source1>.selector.type= replicating
这个源的选择类型为复制。这个参数不指定一个选择的时候,默认情况下它复制
复用则是麻烦一下,流的事情是被筛选的发生到不同的渠道,需要指定源和扇出通道的规则,感觉与case when 类似。
复用的参数为:
<Agent>.sources.<Source1>.selector.type= multiplexing
首先是作为源发送的代理配置
这里设置了2个channels与2个sinks,那么我们也要设置2个sinks对应的代理配置:
下面是第一个接受复制事件代理配置
下面是第二个接受复制事件代理配置:
#敲命令
首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console
在启动源发送的代理
flume-ng agent -cconf -f conf/replicate_source_case11.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "hello looklook5"| nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
可以看到他的正常启动以及发送数据成功
#在启动源第一个接事件的代理终端查看console输出
可以看到他的正常启动,以及接受到源代理发送的数据
#在启动源第二个接事件的代理终端查看console输出
同样可以可以看到他的正常启动,以及接受到源代理发送的数据
Ok,成功
下面是源代理的配置
这里设置了2个channels与2个sinks 同时判断头部属性,当CZ的时,事件发送到sinks1,US时发送到sink2,其他的都发送到sink2,因此我们还有配置2个sinks对于的代理。这里的2个接受代理我们沿用之前复制的接受代理。
#敲命令
与之前复制的情况一样,首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
flume-ng agent -cconf -f conf/multi_sink1_case12.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/multi_sink2_case12.conf -n a1 -Dflume.root.logger=INFO,console
在启动源发送的代理
flume-ng agent -cconf -f conf/multi_source_case12.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
curl -X POST -d '[{"headers" :{"state" : "CZ"},"body" :"TEST1"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "US"},"body" :"TEST2"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "SH"},"body" :"TEST3"}]' http://192.168.233.128:50000
#在启动源发送的代理终端查看console输出
可以看到他的正常启动以及发送数据成功
#在启动源第一个接事件的代理终端查看console输出
这里可以清楚的看到,这个接事件代理只收到了2个事件,因为第二个事件因为我们设置复用,将头部信息对于的事件分流的关系,发送到另一个接事件代理去了。
#在启动源第二个接事件的代理终端查看console输出
[b]
[/b]
Ok,第二个接事件代理因为复用分流,果然只获得了第二个事件信息。
而关于selector配置前面也讲过:
<Agent>.sources.<Source1>.selector.type= replicating
这个源的选择类型为复制。这个参数不指定一个选择的时候,默认情况下它复制
复用则是麻烦一下,流的事情是被筛选的发生到不同的渠道,需要指定源和扇出通道的规则,感觉与case when 类似。
复用的参数为:
<Agent>.sources.<Source1>.selector.type= multiplexing
一、下面给出复制的测试例子:
这里需要配置1个代理作为源发送与2个代理作为接受复制事件,共3个flume配置首先是作为源发送的代理配置
#配置文件:replicate_source_case11.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 50000 a1.sources.r1.host = 192.168.233.128 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.233.129 a1.sinks.k1.port = 50000 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 192.168.233.130 a1.sinks.k2.port = 50000 # Use a channel which buffers events inmemory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
这里设置了2个channels与2个sinks,那么我们也要设置2个sinks对应的代理配置:
下面是第一个接受复制事件代理配置
#配置文件:replicate_sink1_case11.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.channels = c1 a2.sources.r1.bind = 192.168.233.129 a2.sources.r1.port = 50000 # Describe the sink a2.sinks.k1.type = logger a2.sinks.k1.channel = c1 # Use a channel which buffers events inmemory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100
下面是第二个接受复制事件代理配置:
#配置文件:replicate_sink2_case11.conf # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.channels = c1 a3.sources.r1.bind = 192.168.233.130 a3.sources.r1.port = 50000 # Describe the sink a3.sinks.k1.type = logger a3.sinks.k1.channel = c1 # Use a channel which buffers events inmemory a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100
#敲命令
首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console
在启动源发送的代理
flume-ng agent -cconf -f conf/replicate_source_case11.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "hello looklook5"| nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
可以看到他的正常启动以及发送数据成功
#在启动源第一个接事件的代理终端查看console输出
可以看到他的正常启动,以及接受到源代理发送的数据
#在启动源第二个接事件的代理终端查看console输出
同样可以可以看到他的正常启动,以及接受到源代理发送的数据
Ok,成功
二、下面给出复用的测试例子:
因为复用的流的事件要声明一个头部,然后我们检查头部对应的值,因为我们这边源类用http source下面是源代理的配置
#配置文件:multi_source_case12.conf a1.sources= r1 a1.sinks= k1 k2 a1.channels= c1 c2 #Describe/configure the source a1.sources.r1.type= org.apache.flume.source.http.HTTPSource a1.sources.r1.port= 50000 a1.sources.r1.host= 192.168.233.128 a1.sources.r1.selector.type= multiplexing a1.sources.r1.channels= c1 c2 a1.sources.r1.selector.header= state a1.sources.r1.selector.mapping.CZ= c1 a1.sources.r1.selector.mapping.US= c2 a1.sources.r1.selector.default= c1 #Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel= c1 a1.sinks.k1.hostname= 192.168.233.129 a1.sinks.k1.port= 50000 a1.sinks.k2.type= avro a1.sinks.k2.channel= c2 a1.sinks.k2.hostname= 192.168.233.130 a1.sinks.k2.port= 50000 # Usea channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity= 1000 a1.channels.c1.transactionCapacity= 100 a1.channels.c2.type= memory a1.channels.c2.capacity= 1000 a1.channels.c2.transactionCapacity= 100
这里设置了2个channels与2个sinks 同时判断头部属性,当CZ的时,事件发送到sinks1,US时发送到sink2,其他的都发送到sink2,因此我们还有配置2个sinks对于的代理。这里的2个接受代理我们沿用之前复制的接受代理。
#敲命令
与之前复制的情况一样,首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
flume-ng agent -cconf -f conf/multi_sink1_case12.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/multi_sink2_case12.conf -n a1 -Dflume.root.logger=INFO,console
在启动源发送的代理
flume-ng agent -cconf -f conf/multi_source_case12.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
curl -X POST -d '[{"headers" :{"state" : "CZ"},"body" :"TEST1"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "US"},"body" :"TEST2"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "SH"},"body" :"TEST3"}]' http://192.168.233.128:50000
#在启动源发送的代理终端查看console输出
可以看到他的正常启动以及发送数据成功
#在启动源第一个接事件的代理终端查看console输出
这里可以清楚的看到,这个接事件代理只收到了2个事件,因为第二个事件因为我们设置复用,将头部信息对于的事件分流的关系,发送到另一个接事件代理去了。
#在启动源第二个接事件的代理终端查看console输出
[b]
[/b]
Ok,第二个接事件代理因为复用分流,果然只获得了第二个事件信息。
相关文章推荐
- Flume NG 学习笔记(六)Selector(复用与复制)测试
- Flume NG 学习笔记(七)Sink Processors(故障转移与负载均衡)测试
- Flume NG 学习笔记(八)Interceptors(拦截器)测试
- Redis 学习笔记(三):主从复制、主从配置和性能测试
- Shell学习笔记之条件测试
- 测试驱动开发(tdd) 学习笔记(1)基本思想原则和术语
- Struts 1 学习笔记-4-2(Struts中提交空字段的测试)
- Struts 1 学习笔记-5-1(Struts中I18N的简单测试)
- 测试驱动的开发学习笔记
- Struts 1 学习笔记-4-4(Struts中ActionForward的测试)
- Struts 1 学习笔记-3-3(JSTL格式化标签库测试)
- J2EE学习笔记二:配置一个简单的J2EE测试环境
- 软件测试学习笔记--(关于排错)
- 软件测试与质量保证学习笔记(英)_UNIT1_Concept of Software Quality Assurance(原)
- 测试驱动开发(tdd) 学习笔记(1)基本思想原则和术语
- Struts 1 学习笔记-2-2(Struts标签的测试用例)
- 【学习笔记】Thinking in java (第三版)第六章 Reusing Classes(复用类)
- 学习EJB for JBoss 3.2笔记(2)测试jsp
- Spring学习以及测试程序笔记
- Hibernate 学习笔记-1-3(Hibernate的基本映射以及几种主键生成策略的测试)