您的位置:首页 > 编程语言 > Java开发

高并发环境下低级死循环bug

2016-09-19 15:43 295 查看

业务背景

在内存中,对mq消息进行分类计数。

问题描述

生产环境,运行一段时间后,发现消息队列有大量堆积。如果把计数逻辑注释掉,只接收用户访问消息而不进行处理,则mq队列无堆积。mq栈dump信息如下:
ConsumeMessageThread_75 TID: 214 STATE: WAITINGConsumeMessageThread_75 sun.misc.Unsafe.park(Native Method)ConsumeMessageThread_75 java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)ConsumeMessageThread_75 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)ConsumeMessageThread_75 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)ConsumeMessageThread_75 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)ConsumeMessageThread_75 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)ConsumeMessageThread_75 java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)ConsumeMessageThread_75 com.youku.paycenter.acl.service.impl.AsyncAclServiceImpl.getAtomicLong(AsyncAclServiceImpl.java:72)ConsumeMessageThread_75 com.youku.paycenter.acl.service.impl.AsyncAclServiceImpl.count(AsyncAclServiceImpl.java:57)ConsumeMessageThread_75 com.youku.paycenter.acl.mq.consumer.AclCountConsumer.receive(AclCountConsumer.java:70)ConsumeMessageThread_75 com.youku.paycenter.mq.rocketmq.RocketMqPushConsumer.syncHandleMessage(RocketMqPushConsumer.java:207)ConsumeMessageThread_75 com.youku.paycenter.mq.rocketmq.RocketMqPushConsumer.consumeMessage(RocketMqPushConsumer.java:191)ConsumeMessageThread_75 com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)ConsumeMessageThread_75 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)ConsumeMessageThread_75 java.util.concurrent.FutureTask.run(FutureTask.java:262)ConsumeMessageThread_75 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)ConsumeMessageThread_75 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)ConsumeMessageThread_75 java.lang.Thread.run(Thread.java:745)
分析发现有部分消息消费的线程处于等待状态,代码指向AsyncAclServiceImpl.getAtomicLong(AsyncAclServiceImpl.java:72)。

问题代码

privatefinal AtomicLong getAtomicLong(Stringkey)
{
AtomicLong atomicLong = tempData.get(key); // 注释1
while (null== atomicLong) // 注释2
{
try
{
lock.lock(); // 注释3
while (null== tempData.get(key)) // 注释4
{
atomicLong = new AtomicLong(0);
tempData.put(key, atomicLong);
}
} finally
{
lock.unlock();
}
}
return atomicLong;
}

问题分析

看问题代码,该段代码的功能是查询tempData(ConcurrentHashMap)中是否缓存的有计数器,没有的话创建一个计数器,放入缓存,然后返回。有的话,直接返回。假设并发情况下有2个线程同时执行注释1,然后依次经过注释2到达注释3处的代码。假设第一个线程获得了锁,进入了内层while循环,此时缓存依旧为空,因此会进行创建然后放入缓存的动作,然后退出,然后释放锁。之后,第2个线程在注释3处被唤醒,再次执行注释4的时候发现条件已经不成立,于是释放锁,进入外层循环,这时候问题应该很明显了,因为atomicLong在注释1处已经执行,只不过当时拿到的是null,于是在最外层,第2个线程进入了死循环。

优化代码

private final AtomicLong getAtomicLong(String key)
{
AtomicLong atomicLong = tempData.get(key);
while (null == atomicLong)
{
try
{
lock.lock();
if (null == (atomicLong = tempData.get(key))) // 注释1
{
atomicLong = new AtomicLong(0);
tempData.put(key, atomicLong);
}
} finally
{
lock.unlock();
}
}
return atomicLong;
}
改动点非常简单,在注释1处,把内层循环替换为if,最主要的是判断的时候同时对atomicLong进行赋值操作。

心得

编写高并发代码的时候,除了要格外细心,还需要尽可能的模拟真实环境的数据进行并发测试,找有经验的同事进行代码审查也是非常有必要的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 死循环 高并发