您的位置:首页 > 运维架构 > Apache

Apache Kafka笔记(五):消息模式、分布式部署等细节

2017-03-06 17:56 274 查看

1.消息模式

At most once模式

含义:一条消息的读和写最多一次

优缺点:可能会有消息丢失,但是绝对不会产生重复投递的情况

实现方式:设置producer的retires(失败重试次数)为0,限制每条消息只发送一次,同时确保Consumer在处理一批消息之前更新offset(这是默认的)

At least once模式

含义:一条消息的读和写做少一次

优缺点:有可能会产生重复投递的情况,但是绝对不会有消息丢失

实现方式:这是kafka的默认实现

Exactly once模式

含义:一条消息的读和写正好一次

优缺点:这其实正是人们最想要的,缺点在于效率相对默认模式会低

实现方式:通过设置Consumer的enable.auto.commit(默认true)为false来获得offset的掌控权,并且通过Consumer.commitSync()或者Consumer.commitAsync()方法来自行掌控offset在什么时候更新。也可以在enable.auto.commit为true的情况下,通过设置auoto.commit.intervals.ms来控制Consumer更新offset的间隔。

注意:这种自己管理offset的方式会比默认效率低一些,更为重要的是,当你在组织一个Consumer Group时,offset的处理将会变得复杂、难以控制。

2.分布式部署及注意事项

在多台机器上搭建zookeeper集群

修改每个节点的配置信息,每个broker的配置信息在config/server.properties配置文件下,指定每个broker的broker.id互不相同(在一台机器上模拟集群时还要指定listeners和log.dir互不相同,否则会因为以上三个属性冲突而导致broker无法启动),另外需要根据服务器地址信息指定host.name属性,同时指定zookeeper.connect

在多台机器上分别启动broker

创建主题,设置相应的partition和replication-factor

在Consumer和Producer代码中,通过设置bootstrap.servers来指定broker

注意:以上配置版本为2.10,老版本的配置不太一样

3.性能优化

网络和io操作线程配置优化

broker处理消息的最大线程数:

num.network.threads=xxx

broker处理磁盘IO的线程数:

num.io.threads=xxx

建议配置:

  用于接收并处理网络请求的线程数,默认为3。其内部实现是采用Selector模型。启动一个线程作为Acceptor来负责建立连接,再配合启动num.network.threads个线程来轮流负责从Sockets里读取请求,一般无需改动,除非上下游并发请求量过大。一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1。

  num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.

producer的log数据文件刷盘策略

  为了大幅度提高producer写入吞吐量,需要定期批量写文件。

  每当producer写入10000条消息时,刷数据到磁盘 log.flush.interval.messages=10000

  每间隔1秒钟时间,刷数据到磁盘 log.flush.interval.ms=1000

日志保留策略配置

  当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。

  保留三天,也可以更短 log.retention.hours=72

  段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多kafka启动时是单线程扫描目录(log.dir)下所有数据文件) log.segment.bytes=1073741824

replica相关配置:

replica.lag.time.max.ms:10000

replica.lag.max.messages:4000

num.replica.fetchers:1

在Replica上会启动若干Fetch线程把对应的数据同步到本地,而num.replica.fetchers这个参数是用来控制Fetch线程的数量。

每个Partition启动的多个Fetcher,通过共享offset既保证了同一时间内Consumer和Partition之间的一对一关系,又允许我们通过增多Fetch线程来提高效率。

default.replication.factor:1

这个参数指新创建一个topic时,默认的Replica数量

Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。

Producer优化

buffer.memory:33554432 (32m)

在Producer端用来存放尚未发送出去的Message的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由block.on.buffer.full的配置来决定。

compression.type:none

默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。

linger.ms:0

Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。

batch.size:16384

Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。

acks:1

这个配置可以设定发送消息后是否需要Broker端返回确认。

0: 不需要进行确认,速度最快。存在丢失数据的风险。

1: 仅需要Leader进行确认,不需要ISR进行确认。是一种效率和安全折中的方式。

all: 需要ISR中所有的Replica给予接收确认,速度最慢,安全性最高,但是由于ISR可能会缩小到仅包含一个Replica,所以设置参数为all并不能一定避免数据丢失。

Consumer优化

num.consumer.fetchers:1

启动Consumer的个数,适当增加可以提高并发度。

fetch.min.bytes:1

每次Fetch Request至少要拿到多少字节的数据才可以返回。

在Fetch Request获取的数据至少达到fetch.min.bytes之前,允许等待的最大时长。对应上面说到的Purgatory中请求的超时时间。

fetch.wait.max.ms:100

Tips

Kafka官方并不建议通过Broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。

可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。

脏页率超过第一个指标会启动pdflush开始Flush Dirty PageCache。

脏页率超过第二个指标会阻塞所有的写操作来进行Flush。

根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。

如果topic的数据量较小可以考虑减少log.flush.interval.ms和log.flush.interval.messages来强制刷写数据,减少可能由于缓存数据未写盘带来的不一致。

性能优化参考:

https://www.cloudera.com/documentation/kafka/latest/topics/kafka_performance.html#concept_f3v_hzk_br

4.动态扩展

  在项目中维护一个broker列表和topic对应关系的配置文件,方便通过修改配置文件来修改Producer和Consumer的I/O的broker cluster。

通过bin/kafka-topics.sh –alter 可以修改Topic的partition和replication.factor,新的broker加入之后,在项目的broker list中加入该broker的url,zookeeper会对它进行管理,分配任务。

5.容错处理

设置合理的Partition和replication.factor数量,让数据尽可能的分散在多台服务器上,当某个Broker故障失效时,ZooKeeper服务将通知生产者和消费者,生产者和消费者转而使用其它Broker。  

使用Consumer Group,在Consumer Group中,如果有组员挂了,组长(GroupCoordinator,由组员选举而来)会进行rebanlance,将任务分配给其他组员,保证服务的高可用性。

通过Producer和Consumer的属性设置来提高容错,比如retries(Producer写消息失败重试的次数),ack(消息写入的确认)等等。

6.关于SimpleConsumer

  可以在source代码中的examples目录中看到一个SimpleConsumerDemo.java,通过它可以实现对Consumer的完全掌控,基于FetchRequest对象可以将操作的粒度细化到读取哪个一个offset的消息。但是,在0.9版本以后:

the new consumer combines the capabilities of both the older “simple” and “high-level” consumer clients, providing both group coordination and lower level access to build your own consumption strategy with.

  正如前文Exactly once模式中提到的,新的Consumer中可以通过设置enable.auto.commit为false和Consumer.commitSync()/Consumer.commitAsync()来实现对offset的控制。新的Consumer和Producer中仍然可以使用client.id属性让Consumer消费指定的client.id的Producer生产的消息。

  

7.关于Consumer死掉,消息进度会不会丢失

  如果该Consumer是Consumer Group的一员,当这个Consumer挂掉时,Coordinator会将它的任务重新分配给其他组员,并且被分配任务的组员从被分配到的Prtition的last committed offset开始读取消息。

  如果是一个独立的Consumer可以通过调用KafkaConsumer.committed(TopicPartition partition)方法,来获取Partition最后的last committed offset,从而保证消息的进度能够维持。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka