您的位置:首页 > 其它

Flume ng1.6 + kafka 2.11 整合

2015-12-25 00:00 369 查看
摘要: flume 作为生产者收集日志 将 日志发送到kafka , flume配置实例;

第一步

安装Apache Flume 1.6;

第二步
安装kafka;
启动kafka步骤:
<1>启动[b]zookeeper服务[/b]> bin/zookeeper-server-start.sh config/zookeeper.properties &<2>启动kafka> [b]bin/kafka-server-start.sh config/server.properties[/b]<3>启动consumer(--from-beginning:此参数可以不要,表示启动之前接收到的消息也要消费 )> [b]bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning[/b]

第三步

配置flume 配置文件kafka-test.conf :

#Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#Describe/configure the source

a1.sources.r1.type=netcat

a1.sources.r1.bind=localhost

a1.sources.r1.port=3333

#Describe the sink ( kafka sink 配置)

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.topic = test

a1.sinks.k1.brokerList =172.19.16.213:9092

a1.sinks.k1.requiredAcks = 0

a1.sinks.k1.batchSize = 20

#kafka.producer.type=sync

#kafka.partitioner.class=org.apache.flume.plugins.SinglePartition

#Use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

#Bind the source and sink to the channel

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

4.第四步 启动flume

bin/flume-ng agent -c conf -f conf/kafka-test.conf --name a1 -Dflume.root.logger=INFO,console



bin/flume-ng agent -c conf -f conf/kafka-test.conf --name a1 &

5.第五步测试



6.验证是否发送成功

在启动consumer控制台查看接收到的消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: