Kafka flume 整合
2015-06-01 13:08
387 查看
前提
前提是要先把flume和kafka独立的部分先搭建好。
下载插件包
下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin把lib目录下的
和package下的
都放到flume的lib目录
修改原有的flume-conf文件
在插件包里有一个flume-conf.properties,把这个文件放到flume的conf文件夹里然后修改以下内容
producer.sources.s.type = exec producer.sources.s.command = tail -f -n+1 ~/tmp/test.log producer.sources.s.channels = c …… producer.sinks.r.custom.topic.name=test …… consumer.sources.s.custom.topic.name=test |
启动zookeeper
zkServer.sh start |
启动kafka broker
bin/kafka-server-start.sh config/server.properties |
创建kafka topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
启动kafka consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning |
启动flume
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console |
测试
echo "this is a test" >> ~/tmp/test.txt此时只要能在consumer里现“this
is a test”就表示成功
升级版本
看到前面我们下载的plugin版本不是最新的,是不是很不爽?不爽就换了他我们用到的plugin的jar有:
像
这个不用换,他只是单纯的实现
我用的kafka版本是:kafka_2.11-0.8.2.1
替换的所换成这个版本的lib下的
kafka_2.11-0.8.2.1.jar
kafka-clients-0.8.2.1.jar
metrics-core-2.2.0.jar
metrics-annotation-2.2.0.jar照常用吧不用替换
scala-library-2.11.5.jar
scala-parser-combinators_2.11-1.0.2.jar
zkclient-0.3.jar
flumeng-kafka-plugin.jar这个要保持,他只是单纯的实现
替换完了再跑之前的测试就行
前提
前提是要先把flume和kafka独立的部分先搭建好。
下载插件包
下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin把lib目录下的
和package下的
都放到flume的lib目录
修改原有的flume-conf文件
在插件包里有一个flume-conf.properties,把这个文件放到flume的conf文件夹里然后修改以下内容
producer.sources.s.type = exec producer.sources.s.command = tail -f -n+1 ~/tmp/test.log producer.sources.s.channels = c …… producer.sinks.r.custom.topic.name=test …… consumer.sources.s.custom.topic.name=test |
启动zookeeper
zkServer.sh start |
启动kafka broker
bin/kafka-server-start.sh config/server.properties |
创建kafka topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
启动kafka consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning |
启动flume
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console |
测试
echo "this is a test" >> ~/tmp/test.txt此时只要能在consumer里现“this
is a test”就表示成功
升级版本
看到前面我们下载的plugin版本不是最新的,是不是很不爽?不爽就换了他我们用到的plugin的jar有:
像
这个不用换,他只是单纯的实现
我用的kafka版本是:kafka_2.11-0.8.2.1
替换的所换成这个版本的lib下的
kafka_2.11-0.8.2.1.jar
kafka-clients-0.8.2.1.jar
metrics-core-2.2.0.jar
metrics-annotation-2.2.0.jar照常用吧不用替换
scala-library-2.11.5.jar
scala-parser-combinators_2.11-1.0.2.jar
zkclient-0.3.jar
flumeng-kafka-plugin.jar这个要保持,他只是单纯的实现
替换完了再跑之前的测试就行
相关文章推荐
- 在T-SQL语句中访问远程数据库(openrowset/opendatasource/openquery)
- Leetcode Problem.172—Factorial Trailing Zeroes
- 对List中的HashMap进行排序。
- 21分钟 MySQL 入门教程
- 关于android的ActionBarActivity过期的问题
- springmvc 表单字段list提交问题
- MD5加密不一致
- 腾讯网UED体验设计之旅
- android 屏幕适配
- Linux引导流程解析(1)
- Java:内存相关认识
- PC2日记——坑爹的第一天2014/08/28
- 可将canvas图像导出为多种格式图片的jQuery插件
- 16进制颜色代码大全
- CentOS安装Apache
- 个体户如何开通微信公众平台服务号
- 如何安装 Google coder for Raspberry Pi
- Syncthing vs BitTorrent Sync
- Python用subprocess的Popen来调用系统命令
- 友元的位置关系