您的位置:首页 > Web前端

关于KafkaConsumer销毁时发生的错误:KafkaConsumer is not safe for multi-threaded access

2017-12-29 17:35 471 查看
关于KafkaConsumer销毁时发生的错误:KafkaConsumer is not safe for multi-threaded access

问题背景:

1.web工程启动时,使用Timer来周期执行任务:KafkaConsumer.poll()获取消息

2.web工程销毁时,使用KafkaConsumer.close()方法释放资源。

问题原因:

Timer启动任务时会创建一个新的线程,例如:ThreadA,在ThreadA里面周期执行KafkaConsumer.poll()方法。

工程销毁时,执行KafkaConsumer.close()的线程却是主线程:MainThreadB。于是kafka就认为有两个线程在同时操作他,报错:KafkaConsumer is not safe for multi-threaded access

解决办法:

解决思路就是让ThreadA来执行KafkaConsumer.close(),这样就保证了操作KafkaConsumer的都是同一个线程,就不会发生错误了。

开始我想的是在Task的run方法里,通过参数来判断执行:KafkaConsumer.close(),但是这个参数在什么情况下该发生变化,这个不好控制。

比如我在Timer.cancell()之后或之前让他变化,Task会有时执行,有时不执行。这样就有的时候会销毁KafkaConsumer,有的时候销毁不了。

然后,我又改成在Timer执行cancell时,通过让该Timer再schedule一个Task:TaskC,在TaskC里执行KafkaConsumer.close(),这样Timer会使用同一个线程去执行这个TaskC。

如果要保证在Timer调用cancell时,已经完成了TaskC的执行,这就得再加点参数来判断,我是通过重写Timer的cancell方法实现的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐