您的位置:首页 > 其它

kafka——Producer端如何保证消息不丢失不乱序

2019-01-05 09:21 1781 查看

Producer端

在谈论kafka生产者发送消息是否无序和丢失之前,我们先来了解一下生产者三种发送消息的模式以便于我们更好的理解数据丢失和数据乱序的原因和如何避免消息丢失和乱序。
当参数acks和retries设置不为0时,只要send(record)就会重试,重试的次数和参数reties有关,和同步异步无关。
发送并忘记:producer.send(record):参数acks不设置为0,且参数retries大于0,就会触发重试,重试的次数和retries设置的大小有关。
同步发送消息:producer.send(record).get():调用Future对象的get方法就会一直等待sender线程发送一批消息后的响应。
异步发送消息:producer.send(record , callback):当参数acks和retries设置不为0时,发送失败则会继续重试, callback对象可以处理不可重试的异常。
总结:发送并忘记和异步发送消息的区别在于异步发送消息可以处理不可重试的异常;同步发送消息和异步发送消息的区别在于同步发送消息会等待Kafka响应而异步在发送消息以后可以不用等待响应,能够接着处理其他任务。
了解到这你们可能会疑惑,这些模式和数据丢失和乱序有什么关联,下面的内容将是基于同步和异步模式来展开kafka数据丢失和数据乱序的原因。

消息丢失和乱序的原因

上述的模式都是基于参数acks和retries不为0的情况,那么我们来设想一下以下几种情况:

0.如果acks为0,同步发送消息将不和Kafka集群进行消息接受确认,当网络发生异常等情况时,将会存在消息丢失的可能。(笔者暂未知道这个是否会有消息丢失的可能,欢迎指教)

1.在使用同步发送消息时我们将参数acks设为1(即只保证写入leader成功),如果刚好leader partition挂了,数据就会丢失。

producer.type=sync
request.required.acks=1

2.使用异步模式的时候,当缓存区满了,如果参数queue.enqueue.timeout.ms为0(还没收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),数据就会被立马丢弃掉,这时数据会丢失。

producer.type=async
request.required.acks=1
queue.enqueue.timeout.ms = 0 //0代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃

3.在使用异步发送消息时,我们有2批次数据准备发送到相同的分区,第二批次的消息写入成功,第一批次的消息写入失败,重试后第一批次如果写入失败,这时数据将会丢失。
4. 在与3相同的前提下, 重试后如果第一批次写入成功,这时数据将会乱序。
从我们设想的这些情况中,我们便能对数据丢失和乱序的原因了然于胸, 也自然会像庖丁解牛一般对如何避免数据丢失和乱序游刃有余,下面我们将谈到在数据生产时避免数据丢失的方法。

在数据生产时避免数据丢失的方法

只要能避免上述的情况,那么就可以保证消息不会被丢失。
1.在同步模式的时候,确认机制设置为all,也就是让消息写入leader和所有的副本。

producer.type=sync
request.required.acks=all

2.可以通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值。注意:如果buffer满了数据还没有发送出去,如果设置的是立即清理模式,风险很大,一定要设置为阻塞模式,这样也能保证消息不丢失。

producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000 //异步发送模式下,缓存数据的最长时间,之后便会被发送到broker
queue.buffering.max.messages=10000 //异步模式下最多缓存的消息条数
queue.enqueue.timeout.ms = -1 //0代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃
batch.num.messages=200 //一次批量发送需要达到的消息条数,当然如果queue.buffering.max.ms达到的时候也会被发送

3.自定义回调逻辑处理消息发送失败,比如记录在日志中
结论:producer有丢数据的可能,但是可以通过配置保证消息的不丢失
解决完producer丢失数据的问题后,剩下的如何规避数据乱序便摆在了我们面前,下面我们来谈谈这个问题。

如何避免发送之后重发数据乱序

针对异步发送消息产生的数据乱序,我们可以通过设置参数max.in.flight.requests.per.connection = 1来限制客户端在单个连接上能够发送的未响应请求的个数,设置此值是1 表示kafka broker在响应请求之前client不能再向同一个broker发送请求,这样便可以避免消息乱序。

如何避免发送之后数据丢弃和数据乱序

基于之前的如何避免数据丢失、乱序问题,笔者在此特意总结了一份配置列表,个人认为该配置清单应该能够比较好的规避producer端数据丢失和乱序的情况发生:(但说明一下,软件配置的很多决策都是在权衡利弊,下面的配置也不例外:应用了这些配置,你可能会发现你的producer/consumer吞吐量会下降,这是正常的,因为你换取了更高的数据安全性)
acks =all 所有的follower都响应了才认为消息提交成功
retried尽量调得大些
max.in.flight.requests.per.connection = 1
使用KafkaProducer.send(record,callback)而不是send(record)方法,自定义回调逻辑处理消息发送失败

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: