您的位置:首页 > 其它

了解消息组件Kafka

2017-09-18 17:39 429 查看
# Kafka
一:Kafka工作原理:Kafka作为消息组件至少应该包含三个 部分:生产者(Provider):只是进行指定消息的数据发送消息的组件(中间件):负责消息的临时存储,等待消费者进行消息接收。消费者(Consumer):负责通过消息组件取得消息的内容。 

  Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。  作用:有些情况下,处理数据的过程会失败,除非数据被持久化,否则将造成丢失。消息队列将数据进行持久化直到他们已经被完全处理。峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用。如果以能处理峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列可以顶住突发的访问压力,而不会导致因为超负荷而完全崩溃。异步通信:很多时候,用户不想也不需要立即处理消息,消息队列提供了异步处理机制,允许用户将一个消息放入队列,但并不立即处理它,想向队列中放多少数据就放多少,然后在需要的时候再去处理。
为了保证数据的完整性,Kafka将所有的数据都保存在Zookeeper上。Kafka架构:Kafka整体采用显式分布式架构,producer,broker(kafka),和consumer都可以有多个。数据从producer发送到broker,broker承担一个中间缓存和分发的作用,broker分发注册到系统中的consumer。
Kafka核心概念:


Topic:特指Kafka处理的消息源的不同分类。Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列,partition中的每条消息都会被分配一个有序的id(offset)。Broker:缓存代理,Kafka集群中的一台或者多台服务器统称为broker。另外Kafka考虑到了消费者的处理,所有提供的数据格式中包含有“key=value”的格式。为了规避随机写带来的时间消耗,Kafka采用顺序写的方式储存数据,因此效率非常高,这是Kafka高吞吐量的一个很重要的保证。在高负载状态下,为了防止无效率的字节复制,Kafka采用了标准化的二进制消息格式,这样数据块就可以在他们之间自由传输,降低了字节复制的成本开销。另外对于传统的消息组件而言,一般会删除已经被消费的消息,而Kafka集群会默认保留两天所有的信息,可以根据自居的需求在server.properties修改。kafka通过append来实现消息的追加,保证消息都是有序的有先来后到的顺序。kafka的性能不会受到消息数量的影响。总结:Kafka的消息以Topic的形式存在,Topic可以分为多个分区Paritition,Producer发送消息(topic)到broker时,会根据Paritition机制选择将其存储在哪一个Partition,如果Partition机制设置合理,所有的消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKAHOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。为了保证数据的安全性,每个分区又有多个Replica。producer和consumer都只与Replica中的leader交互,每个follower从leader拉取数据进行同步。二:Linux安装KafkaKafka Linux启动命令(均为绝对路径,可自己根据需求修改):1:启动kafka内置zookeepr:/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties > /usr/data/zookeeper.log 2>&1 &2:启动KafkaServer:/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /usr/data/kafka.log 2>&1 &3:创建消息主题:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic qianyue-contract-create4:主题列表:/usr/local/kafka/bin/kafka-topics.sh  --list --zookeeper localhost:21815:设置多个分区主题:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic qianyue-contract-create6:删除主题:/usr/local/kafka/bin/kafka-topics.sh --delete --topic jiashun-js --zookeeper localhost:21817:修改kafka配置文件:vim /usr/local/kafka/config/server.properties8:消费消息:/usr/local/kafka/bin/kafka-console-consumer.sh --from-beginning --zookeeper localhost:2181 --topic jiangzong
三:Java调用Kafka服务1:建立一个Maven项目,引入Kafka依赖包。2:建立服务发送者:配置消息地址:ip+port,配置Topic消息源。使用Properties定义一些环境属性:Properties props = new Properties();Kafka中是以key和value的形式进行消息的处理,为了保证Kafka的性能,专门提供有统一的类型。可以进行数据类型序列化。定义消息的发送者对象,依靠对象进行消息的传递:Producer<String,Integer> producer = new KafkaProducer<String, Integer>(props);producer.send(new ProducerRecord<String, Integer>(TOPIC,key,value));producer.close();3:建立消息的消费者对象:同样定义消息地址和消息主题。消息消费者一定要设置反序列化程序类,与生产者进行对应。定义消费者的处理对象并设置消费者读取的主题名称。消费者需要一直进行数据的读取处理操作。4:但是现在发现即使程序中启用了多个消费端,最终也只能有一个消费者可以接收发送者发送的内容,因为此时设置的分区只有一个,可以通过此条命令创建多个消息分区:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic jiashun-topic。此时所设置的消费者数量可以为三个,同时这三个消费者处于同一个组之中,所以变成了三个消费者处理一个生产者的情况。5:如果此时设置的消费者id不同,那么就表示该消息将变为主题订阅消息,所有的消费者将取得各自的全部内容。
四:Kerberos认证Kafka可以利用kerberos这种机制同时结合jaas配置文件进行更加有效的安全认证处理。Keberos认证需要考虑如下的几个层次:zookeeper的访问安全:kafka_zookeeper_jaas.conf;kafkaServer的访问安全:kafka_server_jaas.conf客户端命令和Java:kafka_client_jaas.conf在Zookeeper和KafkaServer之间进行互相配置的时候,必须有一个相同的用户名和密码。
五:Kafka分布式集群要想真正得到Kafka消息组件的性能处理提升,只依靠单机永远不可能实现,就需要通过集群来进行性能的均衡,多台主机控制完成共同的消息处理。在Kafka里面可以多设置一些分区,但是一台主机的CPU再强悍,可以划分的分区也是有限的,加入了集群之后就可以使用更多的CPU进行更好的性能监控。分区=集群中的CPU的核心数。配置Kafka集群有一点要处理就是Kafka主机应该具备不同的“broker.id”

扩充:常用消息组件对比:RabbitMQ:非常重量级,更适合与大型企业级的应用,同时实现了Broker架构,对路由,负载均衡和数据持久化都有很好的支持;Redis:Redis是一个基于Key-Value对的NOSQL数据库,本身也支持MQ功能,所以完全可以当做一个轻量级的消息队列服务来使用。当数据量较小时,Redis的性能要高于RabbitMQ,而如果数据量大小超过10K,Redis则慢的要死。ActiveMQ:老牌消息组件,性能一般。Kafka:是一个
e23c
高性能跨语言分布式分布、订阅消息队列系统。具备以下特征:快速持久化,高吞吐,完全的分布式系统,Broker,Producer,Consumer都原生自动支持分布式,自动实现负载均衡。并能和Hadoop,Storm等大数据处理系统更好的结合。
Kafka的几个重要的配置总结:
broker配置非负整数,用于唯一标识brokerbroker.id 0kafka持久化数据存储的路径,可以指定多个,以逗号分隔log.dirs /tmp/kafka-logsbroker接收连接请求的端口port 9092指定zk连接字符串,[hostname:port]以逗号分隔zookeeper.connect单条消息最大大小控制,消费端的最大拉取大小需要略大于该值message.max.bytes 1000000接收网络请求的线程数num.network.threads 3用于执行请求的I/O线程数num.io.threads 8用于各种后台处理任务(如文件删除)的线程数background.threads 10待处理请求最大可缓冲的队列大小queued.max.requests 500配置该机器的IP地址host.name默认分区个数num.partitions  1分段文件大小,超过后会轮转log.segment.bytes 1024 1024 1024日志没达到大小,如果达到这个时间也会轮转log.roll.{ms,hours}   168日志保留时间log.retention.{ms,minutes,hours}不存在topic的时候是否自动创建auto.create.topics.enable truepartition默认的备份因子default.replication.factor 1如果这个时间内follower没有发起fetch请求,被认为dead,从ISR移除replica.lag.time.max.ms   10000如果follower相比leader落后这么多以上消息条数,会被从ISR移除replica.lag.max.messages  4000从leader可以拉取的消息最大大小replica.fetch.max.bytes 1024 1024从leader拉取消息的fetch线程数num.replica.fetchers 1zk会话超时时间zookeeper.session.timeout.ms  6000zk连接所用时间zookeeper.connection.timeout.mszk follower落后leader的时间zookeeper.sync.time.ms 2000是否开启topic可以被删除的方式delete.topic.enable false*producer配置参与消息确认的broker数量控制,0代表不需要任何确认 1代表需要leader replica确认 -1代表需要ISR中所有进行确认request.required.acks 0从发送请求到收到ACK确认等待的最长时间(超时时间)request.timeout.ms  10000设置消息发送模式,默认是同步方式, async异步模式下允许消息累计到一定量或一段时间又另外线程批量发送,吞吐量好但丢失数据风险增大producer.type sync消息序列化类实现方式,默认是byte[]数组形式serializer.class kafka.serializer.DefaultEncoderkafka消息分区策略实现方式,默认是对key进行hashpartitioner.class kafka.producer.DefaultPartitioner对发送的消息采取的压缩编码方式,有none|gzip|snappycompression.codec none指定哪些topic的message需要压缩compressed.topics  null消息发送失败的情况下,重试发送的次数 存在消息发送是成功的,只是由于网络导致ACK没收到的重试,会出现消息被重复发送的情况message.send.max.retries 3在开始重新发起metadata更新操作需要等待的时间retry.backoff.ms 100metadata刷新间隔时间,如果负值则失败的时候才会刷新,如果0则每次发送后都刷新,正值则是一种周期行为topic.metadata.refresh.interval.ms 600 1000异步发送模式下,缓存数据的最长时间,之后便会被发送到broker*queue.buffering.max.ms 5000producer端异步模式下最多缓存的消息条数*queue.buffering.max.messages 100000代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃*queue.enqueue.timeout.ms -1一次批量发送需要达到的消息条数,当然如果queue.buffering.max.ms达到的时候也会被发送batch.num.messages 200
consumer配置指明当前消费进程所属的消费组,一个partition只能被同一个消费组的一个消费者消费group.id针对一个partition的fetch request所能拉取的最大消息字节数,必须大于等于Kafka运行的最大消息fetch.message.max.bytes  1024 1024是否自动周期性提交已经拉取到消费端的消息offset*auto.commit.enable true自动提交offset到zookeeper的时间间隔auto.commit.interval.ms  60 1000消费均衡的重试次数rebalance.max.retries  4消费均衡两次重试之间的时间间隔rebalance.backoff.ms 2000当重新去获取partition的leader前需要等待的时间refresh.leader.backoff.ms   200如果zookeeper上没有offset合理的初始值情况下获取第一条消息开始的策略smallest|largesetauto.offset.reset largest如果其超时,将会可能触发rebalance并认为已经死去zookeeper.session.timeout.ms  6000确认zookeeper连接建立操作客户端能等待的最长时间zookeeper.connection.timeout.ms 6000

作用:有些情况下,处理数据的过程会失败,除非数据被持久化,否则将造成丢失。消息队列将数据进行持久化直到他们已经被完全处理。峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用。如果以能处理峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列可以顶住突发的访问压力,而不会导致因为超负荷而完全崩溃。异步通信:很多时候,用户不想也不需要立即处理消息,消息队列提供了异步处理机制,允许用户将一个消息放入队列,但并不立即处理它,想向队列中放多少数据就放多少,然后在需要的时候再去处理。
为了保证数据的完整性,Kafka将所有的数据都保存在Zookeeper上。Kafka架构:Kafka整体采用显式分布式架构,producer,broker(kafka),和consumer都可以有多个。数据从producer发送到broker,broker承担一个中间缓存和分发的作用,broker分发注册到系统中的consumer。Kafka核心概念:Topic:特指Kafka处理的消息源的不同分类。Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列,partition中的每条消息都会被分配一个有序的id(offset)。Broker:缓存代理,Kafka集群中的一台或者多台服务器统称为broker。另外Kafka考虑到了消费者的处理,所有提供的数据格式中包含有“key=value”的格式。为了规避随机写带来的时间消耗,Kafka采用顺序写的方式储存数据,因此效率非常高,这是Kafka高吞吐量的一个很重要的保证。在高负载状态下,为了防止无效率的字节复制,Kafka采用了标准化的二进制消息格式,这样数据块就可以在他们之间自由传输,降低了字节复制的成本开销。另外对于传统的消息组件而言,一般会删除已经被消费的消息,而Kafka集群会默认保留两天所有的信息,可以根据自居的需求在server.properties修改。Kafka的消息保存在Topic中,Topic可以分为多个分区Paritition,Producer发送消息到broker时,会根据Paritition机制选择将其存储在哪一个Partition,如果Partition机制设置合理,所有的消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKAHOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。为了保证数据的安全性,每个分区又有多个Replica。producer和consumer都只与Replica中的leader交互,每个follower从leader拉取数据进行同步。

二:Linux安装KafkaKafka Linux启动命令:1:启动kafka内置zookeepr:/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties > /usr/data/zookeeper.log 2>&1 &2:启动KafkaServer:/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /usr/data/kafka.log 2>&1 &3:创建消息主题:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiashun-topic4:主题列表:/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:21815:设置多个分区主题:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic jiashun-topic
三:Java调用Kafka服务1:建立一个Maven项目,引入Kafka依赖包。2:建立服务发送者:配置消息地址:ip+port,配置Topic消息源。使用Properties定义一些环境属性:Properties props = new Properties();Kafka中是以key和value的形式进行消息的处理,为了保证Kafka的性能,专门提供有统一的类型。可以进行数据类型序列化。定义消息的发送者对象,依靠对象进行消息的传递:Producer<String,Integer> producer = new KafkaProducer<String, Integer>(props);producer.send(new ProducerRecord<String, Integer>(TOPIC,key,value));producer.close();3:建立消息的消费者对象:同样定义消息地址和消息主题。消息消费者一定要设置反序列化程序类,与生产者进行对应。定义消费者的处理对象并设置消费者读取的主题名称。消费者需要一直进行数据的读取处理操作。4:但是现在发现即使程序中启用了多个消费端,最终也只能有一个消费者可以接收发送者发送的内容,因为此时设置的分区只有一个,可以通过此条命令创建多个消息分区:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic jiashun-topic。此时所设置的消费者数量可以为三个,同时这三个消费者处于同一个组之中,所以变成了三个消费者处理一个生产者的情况。5:如果此时设置的消费者id不同,那么就表示该消息将变为主题订阅消息,所有的消费者将取得各自的全部内容。
四:Kerberos认证Kafka可以利用kerberos这种机制同时结合jaas配置文件进行更加有效的安全认证处理。
Keberos认证需要考虑如下的几个层次:zookeeper的访问安全:kafka_zookeeper_jaas.conf;kafkaServer的访问安全:kafka_server_jaas.conf客户端命令和Java:kafka_client_jaas.conf在Zookeeper和KafkaServer之间进行互相配置的时候,必须有一个相同的用户名和密码。
五:Kafka分布式集群要想真正得到Kafka消息组件的性能处理提升,只依靠单机永远不可能实现,就需要通过集群来进行性能的均衡,多台主机控制完成共同的消息处理。在Kafka里面可以多设置一些分区,但是一台主机的CPU再强悍,可以划分的分区也是有限的,加入了集群之后就可以使用更多的CPU进行更好的性能监控。分区=集群中的CPU的核心数。配置Kafka集群有一点要处理就是Kafka主机应该具备不同的“broker.id”

扩充:常用消息组件对比:RabbitMQ:非常重量级,更适合与大型企业级的应用,同时实现了Broker架构,对路由,负载均衡和数据持久化都有很好的支持;Redis:Redis是一个基于Key-Value对的NOSQL数据库,本身也支持MQ功能,所以完全可以当做一个轻量级的消息队列服务来使用。当数据量较小时,Redis的性能要高于RabbitMQ,而如果数据量大小超过10K,Redis则慢的要死。ActiveMQ:老牌消息组件,性能一般。Kafka:是一个高性能跨语言分布式分布、订阅消息队列系统。具备以下特征:快速持久化,高吞吐,完全的分布式系统,Broker,Producer,Consumer都原生自动支持分布式,自动实现负载均衡。并能和Hadoop,Storm等大数据处理系统更好的结合。
Kafka的几个重要的配置总结:
broker配置非负整数,用于唯一标识brokerbroker.id 0kafka持久化数据存储的路径,可以指定多个,以逗号分隔log.dirs /tmp/kafka-logsbroker接收连接请求的端口port 9092指定zk连接字符串,[hostname:port]以逗号分隔zookeeper.connect单条消息最大大小控制,消费端的最大拉取大小需要略大于该值message.max.bytes 1000000接收网络请求的线程数num.network.threads 3用于执行请求的I/O线程数num.io.threads 8用于各种后台处理任务(如文件删除)的线程数background.threads 10待处理请求最大可缓冲的队列大小queued.max.requests 500配置该机器的IP地址host.name 默认分区个数num.partitions  1分段文件大小,超过后会轮转log.segment.bytes 1024 1024 1024日志没达到大小,如果达到这个时间也会轮转log.roll.{ms,hours}   168日志保留时间log.retention.{ms,minutes,hours}不存在topic的时候是否自动创建auto.create.topics.enable truepartition默认的备份因子default.replication.factor 1如果这个时间内follower没有发起fetch请求,被认为dead,从ISR移除replica.lag.time.max.ms   10000如果follower相比leader落后这么多以上消息条数,会被从ISR移除replica.lag.max.messages  4000从leader可以拉取的消息最大大小replica.fetch.max.bytes 1024 1024从leader拉取消息的fetch线程数num.replica.fetchers 1zk会话超时时间zookeeper.session.timeout.ms  6000zk连接所用时间zookeeper.connection.timeout.mszk follower落后leader的时间zookeeper.sync.time.ms 2000是否开启topic可以被删除的方式delete.topic.enable false*producer配置参与消息确认的broker数量控制,0代表不需要任何确认 1代表需要leader replica确认 -1代表需要ISR中所有进行确认request.required.acks 0从发送请求到收到ACK确认等待的最长时间(超时时间)request.timeout.ms  10000设置消息发送模式,默认是同步方式, async异步模式下允许消息累计到一定量或一段时间又另外线程批量发送,吞吐量好但丢失数据风险增大producer.type sync消息序列化类实现方式,默认是byte[]数组形式serializer.class kafka.serializer.DefaultEncoderkafka消息分区策略实现方式,默认是对key进行hashpartitioner.class kafka.producer.DefaultPartitioner对发送的消息采取的压缩编码方式,有none|gzip|snappycompression.codec none指定哪些topic的message需要压缩compressed.topics  null消息发送失败的情况下,重试发送的次数 存在消息发送是成功的,只是由于网络导致ACK没收到的重试,会出现消息被重复发送的情况message.send.max.retries 3在开始重新发起metadata更新操作需要等待的时间retry.backoff.ms 100metadata刷新间隔时间,如果负值则失败的时候才会刷新,如果0则每次发送后都刷新,正值则是一种周期行为topic.metadata.refresh.interval.ms 600 1000异步发送模式下,缓存数据的最长时间,之后便会被发送到broker*queue.buffering.max.ms 5000producer端异步模式下最多缓存的消息条数*queue.buffering.max.messages 100000代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃*queue.enqueue.timeout.ms -1一次批量发送需要达到的消息条数,当然如果queue.buffering.max.ms达到的时候也会被发送batch.num.messages 200
consumer配置指明当前消费进程所属的消费组,一个partition只能被同一个消费组的一个消费者消费group.id针对一个partition的fetch request所能拉取的最大消息字节数,必须大于等于Kafka运行的最大消息fetch.message.max.bytes  1024 1024是否自动周期性提交已经拉取到消费端的消息offset*auto.commit.enable true自动提交offset到zookeeper的时间间隔auto.commit.interval.ms  60 1000消费均衡的重试次数rebalance.max.retries  4消费均衡两次重试之间的时间间隔rebalance.backoff.ms 2000当重新去获取partition的leader前需要等待的时间refresh.leader.backoff.ms   200如果zookeeper上没有offset合理的初始值情况下获取第一条消息开始的策略smallest|largesetauto.offset.reset largest如果其超时,将会可能触发rebalance并认为已经死去zookeeper.session.timeout.ms  6000确认zookeeper连接建立操作客户端能等待的最长时间zookeeper.connection.timeout.ms 6000
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  消息组件 Kafka