使用CountDownLatch来重写BlockingQueue解决线程通信不及时问题
2016-11-26 13:41
363 查看
上文说到了使用wait/notify和synchronized同时使用时出现通信不及时的弊端,如下可以解决
代码如下:
package blockingQueue;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class MyBlocingQueueUsingCountDownLatch {
//volatile和final不能同时使用,volatile表示不同线程可见此变量,意味着可以进行修改,final则指定对象不可变
private final LinkedList<Object> list = new LinkedList<Object>();
//计数器
private AtomicInteger count = new AtomicInteger(0);
//最小长度
private final int minSize = 0;
//最大长度采用构造器注入
private final int maxSize;
//构造器
public MyBlocingQueueUsingCountDownLatch(int size) {
this.maxSize = size;
}
//构建CountDownLanch对象代替锁,构造函数中的数据表示使用过多少次countDown才能唤醒
private final CountDownLatch countDownLatch = new CountDownLatch(1);
public int getSizer(){
return this.count.get();
}
public void put(Object obj){
try {
//如果等于最大的容量则阻塞
while(count.get() == maxSize){
System.out.println("存放操作停止 " + "..." + "当前的栈容量为 " + count);
countDownLatch.await();
}
//放入新的元素
list.add(obj);
//计数器递增
count.incrementAndGet();
//唤醒在take中阻塞的线程
countDownLatch.countDown();
System.out.println("新增的元素为 " + obj + "..." + "当前的栈容量为 " + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Object take(){
Object obj = null;
try {
while(count.get() == minSize){
System.out.println("取出操作停止 " + "..." + "当前的栈容量为 " + count);
countDownLatch.await();
}
//取出元素
obj = list.removeFirst();
//计数器递减
count.decrementAndGet();
//唤醒在put中阻塞的线程
countDownLatch.countDown();
System.out.println("移除的元素为 " + obj + "..." + "当前的栈容量为 " + count);
return obj;
} catch (Exception e) {
e.printStackTrace();
}
return obj;
}
public static void main(String[] args) {
final MyBlockingQueue blockingQueue = new MyBlockingQueue(5);
blockingQueue.put("本田");
blockingQueue.put("奔驰");
blockingQueue.put("别克");
blockingQueue.put("宝马");
blockingQueue.put("三菱");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
int count = 2014;
while(true){
blockingQueue.put("本田" + count +"型");
count++;
if(count == 20001){
throw new RuntimeException("2w辆车后停产");
}
}
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
while(true){
blockingQueue.take();
}
}
});
thread2.start();
}
}
运行结果
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19972型...当前的栈容量为 1
新增的元素为 本田19973型...当前的栈容量为 2
新增的元素为 本田19974型...当前的栈容量为 3
新增的元素为 本田19975型...当前的栈容量为 4
新增的元素为 本田19976型...当前的栈容量为 5
移除的元素为 本田19972型...当前的栈容量为 4
移除的元素为 本田19973型...当前的栈容量为 3
移除的元素为 本田19974型...当前的栈容量为 2
移除的元素为 本田19975型...当前的栈容量为 1
移除的元素为 本田19976型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19977型...当前的栈容量为 1
新增的元素为 本田19978型...当前的栈容量为 2
新增的元素为 本田19979型...当前的栈容量为 3
新增的元素为 本田19980型...当前的栈容量为 4
新增的元素为 本田19981型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19977型...当前的栈容量为 4
移除的元素为 本田19978型...当前的栈容量为 3
移除的元素为 本田19979型...当前的栈容量为 2
移除的元素为 本田19980型...当前的栈容量为 1
移除的元素为 本田19981型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19982型...当前的栈容量为 1
新增的元素为 本田19983型...当前的栈容量为 2
新增的元素为 本田19984型...当前的栈容量为 3
移除的元素为 本田19982型...当前的栈容量为 2
移除的元素为 本田19983型...当前的栈容量为 1
移除的元素为 本田19984型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19985型...当前的栈容量为 1
新增的元素为 本田19986型...当前的栈容量为 2
新增的元素为 本田19987型...当前的栈容量为 3
新增的元素为 本田19988型...当前的栈容量为 4
新增的元素为 本田19989型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19985型...当前的栈容量为 4
移除的元素为 本田19986型...当前的栈容量为 3
移除的元素为 本田19987型...当前的栈容量为 2
移除的元素为 本田19988型...当前的栈容量为 1
移除的元素为 本田19989型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19990型...当前的栈容量为 1
新增的元素为 本田19991型...当前的栈容量为 2
新增的元素为 本田19992型...当前的栈容量为 3
移除的元素为 本田19990型...当前的栈容量为 2
移除的元素为 本田19991型...当前的栈容量为 1
移除的元素为 本田19992型...当前的栈容量为 0
新增的元素为 本田19993型...当前的栈容量为 1
新增的元素为 本田19994型...当前的栈容量为 2
新增的元素为 本田19995型...当前的栈容量为 3
新增的元素为 本田19996型...当前的栈容量为 4
新增的元素为 本田19997型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19993型...当前的栈容量为 4
移除的元素为 本田19994型...当前的栈容量为 3
移除的元素为 本田19995型...当前的栈容量为 2
移除的元素为 本田19996型...当前的栈容量为 1
移除的元素为 本田19997型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19998型...当前的栈容量为 1
新增的元素为 本田19999型...当前的栈容量为 2
新增的元素为 本田20000型...当前的栈容量为 3
移除的元素为 本田19998型...当前的栈容量为 2
移除的元素为 本田19999型...当前的栈容量为 1
移除的元素为 本田20000型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
Exception in thread "Thread-0" java.lang.RuntimeException: 2w辆车后停产
at blockingQueue.MyBlocingQueueUsingCountDownLatch$1.run(MyBlocingQueueUsingCountDownLatch.java:82)
at java.lang.Thread.run(Thread.java:745)
代码如下:
package blockingQueue;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class MyBlocingQueueUsingCountDownLatch {
//volatile和final不能同时使用,volatile表示不同线程可见此变量,意味着可以进行修改,final则指定对象不可变
private final LinkedList<Object> list = new LinkedList<Object>();
//计数器
private AtomicInteger count = new AtomicInteger(0);
//最小长度
private final int minSize = 0;
//最大长度采用构造器注入
private final int maxSize;
//构造器
public MyBlocingQueueUsingCountDownLatch(int size) {
this.maxSize = size;
}
//构建CountDownLanch对象代替锁,构造函数中的数据表示使用过多少次countDown才能唤醒
private final CountDownLatch countDownLatch = new CountDownLatch(1);
public int getSizer(){
return this.count.get();
}
public void put(Object obj){
try {
//如果等于最大的容量则阻塞
while(count.get() == maxSize){
System.out.println("存放操作停止 " + "..." + "当前的栈容量为 " + count);
countDownLatch.await();
}
//放入新的元素
list.add(obj);
//计数器递增
count.incrementAndGet();
//唤醒在take中阻塞的线程
countDownLatch.countDown();
System.out.println("新增的元素为 " + obj + "..." + "当前的栈容量为 " + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Object take(){
Object obj = null;
try {
while(count.get() == minSize){
System.out.println("取出操作停止 " + "..." + "当前的栈容量为 " + count);
countDownLatch.await();
}
//取出元素
obj = list.removeFirst();
//计数器递减
count.decrementAndGet();
//唤醒在put中阻塞的线程
countDownLatch.countDown();
System.out.println("移除的元素为 " + obj + "..." + "当前的栈容量为 " + count);
return obj;
} catch (Exception e) {
e.printStackTrace();
}
return obj;
}
public static void main(String[] args) {
final MyBlockingQueue blockingQueue = new MyBlockingQueue(5);
blockingQueue.put("本田");
blockingQueue.put("奔驰");
blockingQueue.put("别克");
blockingQueue.put("宝马");
blockingQueue.put("三菱");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
int count = 2014;
while(true){
blockingQueue.put("本田" + count +"型");
count++;
if(count == 20001){
throw new RuntimeException("2w辆车后停产");
}
}
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
while(true){
blockingQueue.take();
}
}
});
thread2.start();
}
}
运行结果
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19972型...当前的栈容量为 1
新增的元素为 本田19973型...当前的栈容量为 2
新增的元素为 本田19974型...当前的栈容量为 3
新增的元素为 本田19975型...当前的栈容量为 4
新增的元素为 本田19976型...当前的栈容量为 5
移除的元素为 本田19972型...当前的栈容量为 4
移除的元素为 本田19973型...当前的栈容量为 3
移除的元素为 本田19974型...当前的栈容量为 2
移除的元素为 本田19975型...当前的栈容量为 1
移除的元素为 本田19976型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19977型...当前的栈容量为 1
新增的元素为 本田19978型...当前的栈容量为 2
新增的元素为 本田19979型...当前的栈容量为 3
新增的元素为 本田19980型...当前的栈容量为 4
新增的元素为 本田19981型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19977型...当前的栈容量为 4
移除的元素为 本田19978型...当前的栈容量为 3
移除的元素为 本田19979型...当前的栈容量为 2
移除的元素为 本田19980型...当前的栈容量为 1
移除的元素为 本田19981型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19982型...当前的栈容量为 1
新增的元素为 本田19983型...当前的栈容量为 2
新增的元素为 本田19984型...当前的栈容量为 3
移除的元素为 本田19982型...当前的栈容量为 2
移除的元素为 本田19983型...当前的栈容量为 1
移除的元素为 本田19984型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19985型...当前的栈容量为 1
新增的元素为 本田19986型...当前的栈容量为 2
新增的元素为 本田19987型...当前的栈容量为 3
新增的元素为 本田19988型...当前的栈容量为 4
新增的元素为 本田19989型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19985型...当前的栈容量为 4
移除的元素为 本田19986型...当前的栈容量为 3
移除的元素为 本田19987型...当前的栈容量为 2
移除的元素为 本田19988型...当前的栈容量为 1
移除的元素为 本田19989型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19990型...当前的栈容量为 1
新增的元素为 本田19991型...当前的栈容量为 2
新增的元素为 本田19992型...当前的栈容量为 3
移除的元素为 本田19990型...当前的栈容量为 2
移除的元素为 本田19991型...当前的栈容量为 1
移除的元素为 本田19992型...当前的栈容量为 0
新增的元素为 本田19993型...当前的栈容量为 1
新增的元素为 本田19994型...当前的栈容量为 2
新增的元素为 本田19995型...当前的栈容量为 3
新增的元素为 本田19996型...当前的栈容量为 4
新增的元素为 本田19997型...当前的栈容量为 5
存放操作停止 ...当前的栈容量为 5
移除的元素为 本田19993型...当前的栈容量为 4
移除的元素为 本田19994型...当前的栈容量为 3
移除的元素为 本田19995型...当前的栈容量为 2
移除的元素为 本田19996型...当前的栈容量为 1
移除的元素为 本田19997型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
新增的元素为 本田19998型...当前的栈容量为 1
新增的元素为 本田19999型...当前的栈容量为 2
新增的元素为 本田20000型...当前的栈容量为 3
移除的元素为 本田19998型...当前的栈容量为 2
移除的元素为 本田19999型...当前的栈容量为 1
移除的元素为 本田20000型...当前的栈容量为 0
取出操作停止 ...当前的栈容量为 0
Exception in thread "Thread-0" java.lang.RuntimeException: 2w辆车后停产
at blockingQueue.MyBlocingQueueUsingCountDownLatch$1.run(MyBlocingQueueUsingCountDownLatch.java:82)
at java.lang.Thread.run(Thread.java:745)
相关文章推荐
- 代码详解の使用CountDownLatch解决面试问题:T1和T2线程执行计算,T3线程计算结果的统计
- c#中使用多线程访问winform中控件的若干问题 解决线程间操作无效: 从不是创建控件的线程访问它
- 【C/S通信交互之Http篇】使用Curl与Jetty(Server)实现手机网游Http通信框架&解决curl.h头文件找不到问题
- 线程间通信问题的解决
- 【C/S通信交互之Http篇】Cocos2dx(Client)使用Curl与Jetty(Server)实现手机网游Http通信框架(内含解决curl.h头文件找不到问题)
- 使用CycleBarrier代替CountDownLatch解决阻塞问题
- Java -- 使用阻塞队列(BlockingQueue)控制线程通信
- 学习java多线程的笔记3-使用BlockingQueue阻塞队列来模拟两个线程之间的通信
- 多线程(线程间通信-多生产者多消费者问题-JDK1.5解决办法-范例),停止线程,线程中方法的区别,匿名内部类实现多线程,线程总结
- 嵌入式 解决线程使用sleep或usleep等函数导致整个进程睡眠的问题
- java多线程-线程间通信-示例代码-解决安全问题-等待唤醒机制wait()notify()notifyAll()
- 使用Handler在线程之间传递消息所遇到的问题及解决方法
- 使用复制存储过程执行解决“事务复制中的表大量更新导致无法及时同步”的问题
- 线程通信---使用阻塞队列(BlockingQueue)控制线程通信
- 线程安全问题(解决) -------使用spring 的action-servlet.xml解决struts线程问题
- 关于android 使用Scoket通信中文乱码问题的解决
- iphone线程中使用异步网络的问题,以及如何用NSRunLoop来解决
- JavaSE第一百零四讲:哲学家就餐问题、死锁与使用wait及notify方法实现线程之间的相互通信
- 使用线程加载UIImagePickerController,解决卡屏问题
- 使用spring 的action-servlet.xml解决struts线程问题