您的位置:首页 > 其它

Kafka flume 整合

2015-06-01 13:08 387 查看




前提

前提是要先把flume和kafka独立的部分先搭建好。


下载插件包

下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

把lib目录下的



和package下的



都放到flume的lib目录


修改原有的flume-conf文件

在插件包里有一个flume-conf.properties,把这个文件放到flume的conf文件夹里

然后修改以下内容

producer.sources.s.type = exec

producer.sources.s.command = tail -f -n+1 ~/tmp/test.log

producer.sources.s.channels = c

……

producer.sinks.r.custom.topic.name=test

……

consumer.sources.s.custom.topic.name=test


启动zookeeper

zkServer.sh start


启动kafka broker

bin/kafka-server-start.sh config/server.properties


创建kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


启动kafka consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning


启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console


测试

echo "this is a test" >> ~/tmp/test.txt

此时只要能在consumer里现“this
is a test”就表示成功


升级版本

看到前面我们下载的plugin版本不是最新的,是不是很不爽?不爽就换了他

我们用到的plugin的jar有:





这个不用换,他只是单纯的实现

我用的kafka版本是:kafka_2.11-0.8.2.1

替换的所换成这个版本的lib下的

kafka_2.11-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

metrics-core-2.2.0.jar

metrics-annotation-2.2.0.jar照常用吧不用替换

scala-library-2.11.5.jar

scala-parser-combinators_2.11-1.0.2.jar

zkclient-0.3.jar

flumeng-kafka-plugin.jar这个要保持,他只是单纯的实现

替换完了再跑之前的测试就行
前提

前提是要先把flume和kafka独立的部分先搭建好。


下载插件包

下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

把lib目录下的



和package下的



都放到flume的lib目录


修改原有的flume-conf文件

在插件包里有一个flume-conf.properties,把这个文件放到flume的conf文件夹里

然后修改以下内容

producer.sources.s.type = exec

producer.sources.s.command = tail -f -n+1 ~/tmp/test.log

producer.sources.s.channels = c

……

producer.sinks.r.custom.topic.name=test

……

consumer.sources.s.custom.topic.name=test


启动zookeeper

zkServer.sh start


启动kafka broker

bin/kafka-server-start.sh config/server.properties


创建kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


启动kafka consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning


启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console


测试

echo "this is a test" >> ~/tmp/test.txt

此时只要能在consumer里现“this
is a test”就表示成功


升级版本

看到前面我们下载的plugin版本不是最新的,是不是很不爽?不爽就换了他

我们用到的plugin的jar有:





这个不用换,他只是单纯的实现

我用的kafka版本是:kafka_2.11-0.8.2.1

替换的所换成这个版本的lib下的

kafka_2.11-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

metrics-core-2.2.0.jar

metrics-annotation-2.2.0.jar照常用吧不用替换

scala-library-2.11.5.jar

scala-parser-combinators_2.11-1.0.2.jar

zkclient-0.3.jar

flumeng-kafka-plugin.jar这个要保持,他只是单纯的实现

替换完了再跑之前的测试就行
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: