Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署
2016-08-04 21:33
447 查看
基于Logstash跑通Kafka还是需要注意很多东西,最重要的就是理解Kafka的原理。
Logstash工作原理
由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处:1 生产者的负载与消费者的负载解耦
2 消费者按照自己的能力fetch数据
3 消费者可以自定义消费的数量
另外,由于broker采用了主题topic-->分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。
Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader-->followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。
由于上面的架构设计,使得生产者与broker相连;消费者与zookeeper相连。有了这样的对应关系,就容易部署logstash-->kafka-->logstash的方案了。
接下来,按照下面的步骤就可以实现logstash与kafka的对接了。
启动kafka
启动zookeeper:$zookeeper/bin/zkServer.sh start
启动kafka:
$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &
创建主题
创建主题:$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1
查看主题:
$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
测试环境
执行生产者脚本:$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello
执行消费者脚本,查看是否写入:
$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello
输入测试
input{ stdin{} } output{ kafka{ topic_id => "hello" bootstrap_servers => "192.168.0.4:9092" # kafka的地址 batch_size => 5 } stdout{ codec => rubydebug } }
读取测试
logstash配置文件:input{ kafka { codec => "plain" group_id => "logstash1" auto_offset_reset => "smallest" reset_beginning => true topic_id => "hello" #white_list => ["hello"] #black_list => nil zk_connect => "192.168.0.5:2181" # zookeeper的地址 } } output{ stdout{ codec => rubydebug } }
相关文章推荐
- Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署
- Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署
- Kafka与Logstash的数据采集对接
- Kafka与Logstash的数据采集
- Kafka与Logstash的数据采集
- 对接 kafka的 spark程序 程序可以正常运行 就是接受kafka的数据时很慢 ,怎么调优?
- Object-C 运行时机制几种最基本数据类型
- flume + Kafka采集数据 超简单
- MySQL数据库运行状态数据的采集方法
- MySQL数据库运行状态数据采集的脚本
- 运行部署在Weblogic上的Web应用时,过一段时间程序就提示数据连接断了,重起Weblogic就好了。过一段时间又出现同样的问题。
- JVM 运行时数据区及GC机制
- 《转》ceilometer的数据采集机制入门
- ceilometer的数据采集机制
- 数据库表表面上存在索引和防错机制,然而一个简单的查询就会耗费很长时间。Web应用程序或许在开发环境中运行良好,但在产品环境中表现同样糟糕。如果你是个数据库管理员,你很有可能已经在某个阶段遇到上述情况。
- 【google M】android 网络服务状态运行机制(语音、数据注册状态;运营商信息PLMN,SPN;radio技术;信号强度)
- presto的安装与部署(对接kafka)
- Java的运行时数据存储机制
- 看图说话:为什么大数据落地难?
- ceilometer的数据采集机制