一行代码引发的"血案"
2017-07-15 17:07
393 查看
昨天在使用pykafka的时候又遇到了之前我遇到过的
一、问题的出现
我在多台机器上面同时开启了多个进程来读写同一个topic,这个topic有5个partition,我想着开启5个进程来读写,这样可以提高速度。在测试过程中我发现会出现
[pykafka.balancedconsumer] [balancedconsumer.py:580] INFO: Unable to acquire partition <pykafka.partition.Partition at 0x7f9a0a5586d0 (id=4)>. Retrying
[pykafka.balancedconsumer] [balancedconsumer.py:580] INFO: Unable to acquire partition <pykafka.partition.Partition at 0x7f3320806710 (id=3)>. Retrying
但是经过
二、问题的排查
此时我想到肯定是其它的原因导致的这个问题,但是我明明记得我对操作pykafka的代码没有做过什么改动啊,唯一的改动就是把
为了验证我的猜测,我还是把
没办法,只能又开始由源码开刀了。
三、问题的真正原因
因为之前对pykafka的代码有过一些了解,所以这次读起来就相对比较简单了,我理解每次当zookeeper上面的znode状态发生变化,kafka都会执行相应的rebalance,如下的代码就是实现这个功能:
代码逻辑比较简单,就是设置三个watcher函数,一旦对应的znode状态发生变化就执行相应的callback,这个也是为什么当一个consumer加入以后会分配到partition的原因,当我们新增加一个consumer的时候就会触发
就是执行
代码读到这里的时候我们能够发现很有可能就是之前开启的consumer进程没有执行rebalance过程,导致后面新加入的consumer进程一直获取不到partition,接着我们到
函数逻辑也比较简单,之前的文章其实也分析过这个调用过程,真正的rebalance是在
grep一遍源码你马上就会发现
函数里面定义了一个超时函数
到这一步就基本解释了前面的疑惑,如果设置
这里也解释了为什么我之前没法遇到这个问题,因为我之前的队列一直都有数据,所以
四、如何解决问题
找到原因要解决就好办了,最简单的方式就是把
但是我觉得这应该算是pykafka的一个bug,我已经在github提了一个issue。
PartitionOwnedError、ConsumerStoppedException异常,关于这个异常我之前写过一篇分析的文章(链接在这里),我自认为之前应该是把这个问题彻底解决了的,但是这次它又幽灵般的出现了,使我百思不得其解。
一、问题的出现
我在多台机器上面同时开启了多个进程来读写同一个topic,这个topic有5个partition,我想着开启5个进程来读写,这样可以提高速度。在测试过程中我发现会出现
PartitionOwnedError、ConsumerStoppedException异常,这个问题之前我记得我通过参数
rebalance_max_retries、rebalance_backoff_ms已经解决过了,而且我确保代码中这两个参数都没有变化过。在日志中我也发现进程确实是会重试
rebalance_max_retries以后才会报出异常,下面是我摘取的部分日志:
[pykafka.balancedconsumer] [balancedconsumer.py:580] INFO: Unable to acquire partition <pykafka.partition.Partition at 0x7f9a0a5586d0 (id=4)>. Retrying
[pykafka.balancedconsumer] [balancedconsumer.py:580] INFO: Unable to acquire partition <pykafka.partition.Partition at 0x7f3320806710 (id=3)>. Retrying
但是经过
rebalance_max_retries的重试以后就会抛出
PartitionOwnedError异常,也就是说我这个consumer没有获取到分配给我的partition。
二、问题的排查
PartitionOwnedError异常抛出的原理性解释大家可以参考前面的文章,不再赘述。
此时我想到肯定是其它的原因导致的这个问题,但是我明明记得我对操作pykafka的代码没有做过什么改动啊,唯一的改动就是把
consumer_timeout_ms这个参数改成了-1(读取永不超时),难道是这个原因导致的吗?但是我明明在之前也测试过多次啊,之前都没有发现这个问题啊,这个就让我很不理解了。
为了验证我的猜测,我还是把
consumer_timeout_ms改成了5000(5s),然后问题就没有再出现了,也就是说确实是这一行代码导致的问题,但是这个还不能完全解答我另外的一个疑惑,就是为什么我之前的多次测试没有发现这个问题,偏偏是这次测试的时候出现了。
没办法,只能又开始由源码开刀了。
三、问题的真正原因
因为之前对pykafka的代码有过一些了解,所以这次读起来就相对比较简单了,我理解每次当zookeeper上面的znode状态发生变化,kafka都会执行相应的rebalance,如下的代码就是实现这个功能:
def _set_watches(self): """Set watches in zookeeper that will trigger rebalances. Rebalances should be triggered whenever a broker, topic, or consumer znode is changed in zookeeper. This ensures that the balance of the consumer group remains up-to-date with the current state of the cluster. """ proxy = weakref.proxy(self) _brokers_changed = self._build_watch_callback(BalancedConsumer._brokers_changed, proxy) _topics_changed = self._build_watch_callback(BalancedConsumer._topics_changed, proxy) _consumers_changed = self._build_watch_callback(BalancedConsumer._consumers_changed, proxy) self._setting_watches = True # Set all our watches and then rebalance broker_path = '/brokers/ids' try: self._broker_watcher = ChildrenWatch( self._zookeeper, broker_path, _brokers_changed ) except NoNodeException: raise Exception( 'The broker_path "%s" does not exist in your ' 'ZooKeeper cluster -- is your Kafka cluster running?' % broker_path) self._topics_watcher = ChildrenWatch( self._zookeeper, '/brokers/topics', _topics_changed ) self._consumer_watcher = ChildrenWatch( self._zookeeper, self._consumer_id_path, _consumers_changed ) self._setting_watches = False
代码逻辑比较简单,就是设置三个watcher函数,一旦对应的znode状态发生变化就执行相应的callback,这个也是为什么当一个consumer加入以后会分配到partition的原因,当我们新增加一个consumer的时候就会触发
_consumers_changed这个函数,这个函数的逻辑也很简单:
@_catch_thread_exception def _consumers_changed(self, consumers): if not self._running: return False # `False` tells ChildrenWatch to disable this watch if self._setting_watches: return log.debug("Rebalance triggered by consumer change ({})".format( self._consumer_id)) self._rebalance()
就是执行
_rebalance()函数,也就是触发了kafka的rebalance过程。
代码读到这里的时候我们能够发现很有可能就是之前开启的consumer进程没有执行rebalance过程,导致后面新加入的consumer进程一直获取不到partition,接着我们到
_rebalance()函数一看究竟:
def _rebalance(self): """Start the rebalancing process for this consumer This method is called whenever a zookeeper watch is triggered. """ if self._consumer is not None: self.commit_offsets() # this is necessary because we can't stop() while the lock is held # (it's not an RLock) with self._rebalancing_lock: if not self._running: raise ConsumerStoppedException log.info('Rebalancing consumer "%s" for topic "%s".' % ( self._consumer_id, self._topic.name)) self._update_member_assignment()
函数逻辑也比较简单,之前的文章其实也分析过这个调用过程,真正的rebalance是在
_update_member_assignment()函数中执行的,但是在这个函数之前有一行
with self._rebalancing_lock,也就是执行rebalance之前要获得
_rebalancing_lock锁,此时我能确认就是这个锁没有获取到导致的问题,也就是说其它地方把这个锁一直acquire了,没有释放,那么接下来就看看还有其它哪些函数会用到这个锁呢。
grep一遍源码你马上就会发现
consume()函数会用到这个锁,代码如下:
def consume(self, block=True): """Get one message from the consumer :param block: Whether to block while waiting for a message :type block: bool """ def consumer_timed_out(): """Indicates whether the consumer has received messages recently""" if self._consumer_timeout_ms == -1: return False disp = (time.time() - self._last_message_time) * 1000.0 return disp > self._consumer_timeout_ms message = None self._last_message_time = time.time() while message is None and not consumer_timed_out(): self._raise_worker_exceptions() try: # acquire the lock to ensure that we don't start trying to consume from # a _consumer that might soon be replaced by an in-progress rebalance with self._rebalancing_lock: message = self._consumer.consume(block=block) except (ConsumerStoppedException, AttributeError): if not self._running: raise ConsumerStoppedException continue if message: self._last_message_time = time.time() if not block: return message return message
函数里面定义了一个超时函数
consumer_timed_out()之前我代码是把
_consumer_timeout_ms设置成了-1,那么这个函数就会返回
False,此时就会进入
while循环中获取到了
_rebalancing_lock锁,接着就开始消费队列,
self._consumer本质是一个
SimpleConsumeror
RdKafkaSimpleConsumer(如果设置了use_rdkafka参数),我们在
BalanceConsumer构造函数中传入的
consumer_timeout_ms也会传给对应的
SimpleConsumer,所以如果我们设置的是-1(永不超时)那么这代码就会一直不返回,除非有消费到数据。
到这一步就基本解释了前面的疑惑,如果设置
consumer_timeout_ms = -1那么
consume()就会一直占有
_rebalancing_lock锁,当新的consumer加入的时候之前的consumer本来应该执行rebalance操作的,但是又因为
_rebalancing_lock锁一直没有获取到,所以就一直阻塞在那里,等到新加入的consumer重试了
rebalance_max_retries次以后就会因为获取不到partition而抛出
PartitionOwnedError异常。
这里也解释了为什么我之前没法遇到这个问题,因为我之前的队列一直都有数据,所以
consume()每次都能及时的返回然后释放
_rebalancing_lock锁。
四、如何解决问题
找到原因要解决就好办了,最简单的方式就是把
consumer_timeout_ms设置成一个非-1的值,如我之前设置的5000ms。
但是我觉得这应该算是pykafka的一个bug,我已经在github提了一个issue。
相关文章推荐
- "Spark Streaming + Kafka direct + checkpoints + 代码改变" 引发的问题(一)
- 一行代码引发的“血案”
- [WCF]缺少一行代码引发的血案
- setTimeout使用之由"作用域"引发的血案
- openstack运维实战系列(十三)之glance更改路径引发的"血案"
- "Spark Streaming + Kafka direct + checkpoints + 代码改变" 引发的问题
- "Table 'mysql.plugin' doesn't exist"引发de血案
- "abc"已经被创建并保存于字符串池中,因此JAVA虚拟机只会在堆中新创建一个String对象,但是它的值(value)是共享前一行代码执行时在栈中创建的三个char型值值'a'、'b'和'c'
- Java集合遍历引发的"血案"
- Java集合遍历引发的"血案"
- 极客无极限 一行HTML5代码引发的创意大爆炸
- Linux权限引发的"血案"
- SQL0668N 由于表 "db2inst1.test" 上的原因代码 "3",所以不允许操作(解因为LOAD引起的LOAD暂挂状态锁)
- 关于"是否需要有代码规范"的个人看法
- 一段多浏览器的"复制到剪贴板"javascript代码
- 发布网站时出现"加载配置文件时出错: 引发类型为“System.OutOfMemoryException”的异常"
- OD提示 "为了执行系统不支持的动作, OllyICE 在这个被调试的程序中注入了一点代码, 但是经过5秒仍未收到响应..." 解决办法
- "Lc.exe已退出 代码为-1 "
- VS2010编译时拿老版本的代码运行,出现"source code is different..."解决办法
- Android中热修复框架Robust原理解析+并将框架代码从"闭源"变成"开源"(上篇)