您的位置:首页 > 产品设计 > UI/UE

使用CountDownLatch来重写BlockingQueue解决线程通信不及时问题

2016-11-26 13:41 363 查看
上文说到了使用wait/notify和synchronized同时使用时出现通信不及时的弊端,如下可以解决

代码如下:

package blockingQueue;

import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class MyBlocingQueueUsingCountDownLatch {
//volatile和final不能同时使用,volatile表示不同线程可见此变量,意味着可以进行修改,final则指定对象不可变
private final LinkedList<Object> list = new LinkedList<Object>();
//计数器
private AtomicInteger count = new AtomicInteger(0);
//最小长度
private final int minSize = 0;
//最大长度采用构造器注入
private final int maxSize;
//构造器
public MyBlocingQueueUsingCountDownLatch(int size) {
this.maxSize = size;
}
//构建CountDownLanch对象代替锁,构造函数中的数据表示使用过多少次countDown才能唤醒
private final CountDownLatch countDownLatch = new CountDownLatch(1);

public int getSizer(){
return this.count.get();
}

public void put(Object obj){
try {
//如果等于最大的容量则阻塞
while(count.get() == maxSize){
System.out.println("存放操作停止 " + "..." + "当前的栈容量为 " + count);
countDownLatch.await();
}
//放入新的元素
list.add(obj);
//计数器递增
count.incrementAndGet();
//唤醒在take中阻塞的线程
countDownLatch.countDown();
System.out.println("新增的元素为 " + obj + "..." + "当前的栈容量为 " + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public Object take(){
Object obj = null;
try {
while(count.get() == minSize){
System.out.println("取出操作停止 " + "..." + "当前的栈容量为 " + count);
countDownLatch.await();
}
//取出元素
obj = list.removeFirst();
//计数器递减
count.decrementAndGet();
//唤醒在put中阻塞的线程
countDownLatch.countDown();
System.out.println("移除的元素为 " + obj + "..." + "当前的栈容量为 " + count);
return obj;
} catch (Exception e) {
e.printStackTrace();
}
return obj;
}

public static void main(String[] args) {
final MyBlockingQueue blockingQueue = new MyBlockingQueue(5);
blockingQueue.put("本田");
blockingQueue.put("奔驰");
blockingQueue.put("别克");
blockingQueue.put("宝马");
blockingQueue.put("三菱");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
int count = 2014;
while(true){
blockingQueue.put("本田" + count +"型");
count++;
if(count == 20001){
throw new RuntimeException("2w辆车后停产");
}
}
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
while(true){
blockingQueue.take();
}
}
});
thread2.start();
}

}
运行结果
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19972型...当前的栈容量为 1
新增的元素为 本田19973型...当前的栈容量为 2
新增的元素为 本田19974型...当前的栈容量为 3
新增的元素为 本田19975型...当前的栈容量为 4
新增的元素为 本田19976型...当前的栈容量为 5
移除的元素为 本田19972型...当前的栈容量为 4
移除的元素为 本田19973型...当前的栈容量为 3
移除的元素为 本田19974型...当前的栈容量为 2
移除的元素为 本田19975型...当前的栈容量为 1
移除的元素为 本田19976型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19977型...当前的栈容量为 1
新增的元素为 本田19978型...当前的栈容量为 2
新增的元素为 本田19979型...当前的栈容量为 3
新增的元素为 本田19980型...当前的栈容量为 4
新增的元素为 本田19981型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19977型...当前的栈容量为 4
移除的元素为 本田19978型...当前的栈容量为 3
移除的元素为 本田19979型...当前的栈容量为 2
移除的元素为 本田19980型...当前的栈容量为 1
移除的元素为 本田19981型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19982型...当前的栈容量为 1
新增的元素为 本田19983型...当前的栈容量为 2
新增的元素为 本田19984型...当前的栈容量为 3
移除的元素为 本田19982型...当前的栈容量为 2
移除的元素为 本田19983型...当前的栈容量为 1
移除的元素为 本田19984型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19985型...当前的栈容量为 1
新增的元素为 本田19986型...当前的栈容量为 2
新增的元素为 本田19987型...当前的栈容量为 3
新增的元素为 本田19988型...当前的栈容量为 4
新增的元素为 本田19989型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19985型...当前的栈容量为 4
移除的元素为 本田19986型...当前的栈容量为 3
移除的元素为 本田19987型...当前的栈容量为 2
移除的元素为 本田19988型...当前的栈容量为 1
移除的元素为 本田19989型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19990型...当前的栈容量为 1
新增的元素为 本田19991型...当前的栈容量为 2
新增的元素为 本田19992型...当前的栈容量为 3
移除的元素为 本田19990型...当前的栈容量为 2
移除的元素为 本田19991型...当前的栈容量为 1
移除的元素为 本田19992型...当前的栈容量为 0
新增的元素为 本田19993型...当前的栈容量为 1
新增的元素为 本田19994型...当前的栈容量为 2
新增的元素为 本田19995型...当前的栈容量为 3
新增的元素为 本田19996型...当前的栈容量为 4
新增的元素为 本田19997型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19993型...当前的栈容量为 4
移除的元素为 本田19994型...当前的栈容量为 3
移除的元素为 本田19995型...当前的栈容量为 2
移除的元素为 本田19996型...当前的栈容量为 1
移除的元素为 本田19997型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19998型...当前的栈容量为 1
新增的元素为 本田19999型...当前的栈容量为 2
新增的元素为 本田20000型...当前的栈容量为 3
移除的元素为 本田19998型...当前的栈容量为 2
移除的元素为 本田19999型...当前的栈容量为 1
移除的元素为 本田20000型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
Exception in thread "Thread-0" java.lang.RuntimeException: 2w辆车后停产
at blockingQueue.MyBlocingQueueUsingCountDownLatch$1.run(MyBlocingQueueUsingCountDownLatch.java:82)
at java.lang.Thread.run(Thread.java:745)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程 通信
相关文章推荐