学习互联网架构第六课(使用wait/notify模拟Queue)
2017-06-14 15:29
155 查看
BlockingQueue:顾名思义,首先它是一个队列,并且支持阻塞的机制,阻塞的放入和得到数据。我们要实现LinkedBlockingQueue下面两个简单的方法put和take。
put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。
take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。
下面我们便来写个类模拟BlockingQueue,代码如下
package com.internet.queue;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MyQueue {
//1.需要一个盛放元素的集合
private final LinkedList<Object> list = new LinkedList<Object>();
//2.需要一个计数器,AtomicInteger是具有原子性的,也就是最终结果肯定是正确的
private AtomicInteger count = new AtomicInteger(0);
//3.需要指定上限和下限
private int minSize = 0;
private int maxSize;
//4.使用构造方法来给maxSize赋值,因为这样比较灵活
public MyQueue(int maxSize){
this.maxSize = maxSize;
}
//要有空的构造函数
public MyQueue(){}
//5.初始化一个对象,用于加锁
private final Object lock = new Object();
//6.put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,
//则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。put方法中Object代表
//要添加的对象
public void put(Object obj){
synchronized (lock) {
//判断队列是否已经满了,如果已经满了的话,就不允许再添加了,要排队等着
if(count.get() == this.maxSize){
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
//如果队列还没有加满,那么就允许添加元素,并做下记录
list.add(obj);
//加入一个元素计数器就要加1
count.incrementAndGet();
//这时有可能是这种场景:那就是当前队列中没有任何元素了,有个线程想要获取元素,于是乎
//它便阻塞着,一直等着有新元素加到队列中来,因此我们要提醒get方法我这儿添加元素了,
//需要拿的话就来拿吧
lock.notify();
System.out.println("新加入的元素为:"+obj);
}
}
//7.take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态
//直到BlockingQueue有新的数据被加入。
public Object take(){
Object ret = null;
synchronized (lock) {
//如果当前队列中元素数量已经达到最小了,那么该线程要等待
if(count.get() == this.minSize){
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
//如果还没有达到最小数量,那么就移除第一个元素
ret = list.removeFirst();
//移除元素,计数器要减1
count.decrementAndGet();
//有这种场景:当前队列已经满了,但是还有两个线程想往队列中添加元素,于是乎它们便只能等着,
//这时我们移除一个元素,这样队列中便腾出一个地方,我们得告诉put方法,这儿现在有块地儿啊
//想占的赶紧过来占吧,来晚了就没有了。
lock.notify();
}
return ret;
}
//获取计数器当前的值
public int getSize(){
return this.count.get();
}
//下面用main方法进行测试
public static void main(String[] args) {
//设置容器的容量为5
final MyQueue mq = new MyQueue(5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("当前容器的长度:"+mq.getSize());
//新建一个线程,该线程要向队列中插入两个元素
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
mq.put("f");
mq.put("g");
}
},"t1");
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
Object o1 = mq.take();
System.out.println("移除的元素为:"+o1);
Object o2 = mq.take();
System.out.println("移除的元素为:"+o2);
}
},"t2");
try {
//等待两秒钟,这句话跟Thread.sleep(2000);是同样的效果,之所以用TimeUnit是因为它给我们
//定义好了枚举值,我们可以指定毫秒、秒、分钟等等,可读性很强。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//之所以要让线程t1执行2秒后t2才执行,就是为了看t1执行时会不会阻塞(因为队列中元素已经满了,而t1
//仍要向队列中添加两个元素)
t2.start();
}
}
我们运行main方法,结果如下,可以看到确实是要先移除元素后才能再向队列中添加元素(最下面四行的打印顺序可能会不同,但是不必在意,那只是打印的问题而已)。
新加入的元素为:a
新加入的元素为:b
新加入的元素为:c
新加入的元素为:d
新加入的元素为:e
当前容器的长度:5
移除的元素为:a
新加入的元素为:f
移除的元素为:b
新加入的元素为:g
下面我们再测试当队列中没有元素时的情况,我们只需要把main函数替换成如下代码即可。
public static void main(String[] args) {
//设置容器的容量为5
final MyQueue mq = new MyQueue(5);
//线程t1要从容器中移除元素
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
Object o1 = mq.take();
System.out.println("移除的元素为:"+o1);
Object o2 = mq.take();
System.out.println("移除的元素为:"+o2);
}
},"t1");
t1.start();
try {
//等待两秒钟,这句话跟Thread.sleep(2000);是同样的效果,之所以用TimeUnit是因为它给我们
//定义好了枚举值,我们可以指定毫秒、秒、分钟等等,可读性很强。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//线程t2要向队列中插入两个元素
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
mq.put("a");
mq.put("b");
}
},"t2");
//之所以要让线程t1执行2秒后t2才执行,就是为了看t1执行时会不会阻塞(因为队列中元素刚开始是0,而t1
//仍要从队列中移除元素)
t2.start();
} 运行main方法,会看到等了2秒钟才打印信息,说明刚开始线程t1去移除元素时是被阻塞了的,直到线程t2向队列中添加了两个元素才能给移除元素。这说明我们模拟的阻塞队列是没问题的。
新加入的元素为:a
新加入的元素为:b
移除的元素为:a
移除的元素为:b
put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。
take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。
下面我们便来写个类模拟BlockingQueue,代码如下
package com.internet.queue;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MyQueue {
//1.需要一个盛放元素的集合
private final LinkedList<Object> list = new LinkedList<Object>();
//2.需要一个计数器,AtomicInteger是具有原子性的,也就是最终结果肯定是正确的
private AtomicInteger count = new AtomicInteger(0);
//3.需要指定上限和下限
private int minSize = 0;
private int maxSize;
//4.使用构造方法来给maxSize赋值,因为这样比较灵活
public MyQueue(int maxSize){
this.maxSize = maxSize;
}
//要有空的构造函数
public MyQueue(){}
//5.初始化一个对象,用于加锁
private final Object lock = new Object();
//6.put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,
//则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。put方法中Object代表
//要添加的对象
public void put(Object obj){
synchronized (lock) {
//判断队列是否已经满了,如果已经满了的话,就不允许再添加了,要排队等着
if(count.get() == this.maxSize){
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
//如果队列还没有加满,那么就允许添加元素,并做下记录
list.add(obj);
//加入一个元素计数器就要加1
count.incrementAndGet();
//这时有可能是这种场景:那就是当前队列中没有任何元素了,有个线程想要获取元素,于是乎
//它便阻塞着,一直等着有新元素加到队列中来,因此我们要提醒get方法我这儿添加元素了,
//需要拿的话就来拿吧
lock.notify();
System.out.println("新加入的元素为:"+obj);
}
}
//7.take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态
//直到BlockingQueue有新的数据被加入。
public Object take(){
Object ret = null;
synchronized (lock) {
//如果当前队列中元素数量已经达到最小了,那么该线程要等待
if(count.get() == this.minSize){
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
//如果还没有达到最小数量,那么就移除第一个元素
ret = list.removeFirst();
//移除元素,计数器要减1
count.decrementAndGet();
//有这种场景:当前队列已经满了,但是还有两个线程想往队列中添加元素,于是乎它们便只能等着,
//这时我们移除一个元素,这样队列中便腾出一个地方,我们得告诉put方法,这儿现在有块地儿啊
//想占的赶紧过来占吧,来晚了就没有了。
lock.notify();
}
return ret;
}
//获取计数器当前的值
public int getSize(){
return this.count.get();
}
//下面用main方法进行测试
public static void main(String[] args) {
//设置容器的容量为5
final MyQueue mq = new MyQueue(5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("当前容器的长度:"+mq.getSize());
//新建一个线程,该线程要向队列中插入两个元素
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
mq.put("f");
mq.put("g");
}
},"t1");
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
Object o1 = mq.take();
System.out.println("移除的元素为:"+o1);
Object o2 = mq.take();
System.out.println("移除的元素为:"+o2);
}
},"t2");
try {
//等待两秒钟,这句话跟Thread.sleep(2000);是同样的效果,之所以用TimeUnit是因为它给我们
//定义好了枚举值,我们可以指定毫秒、秒、分钟等等,可读性很强。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//之所以要让线程t1执行2秒后t2才执行,就是为了看t1执行时会不会阻塞(因为队列中元素已经满了,而t1
//仍要向队列中添加两个元素)
t2.start();
}
}
我们运行main方法,结果如下,可以看到确实是要先移除元素后才能再向队列中添加元素(最下面四行的打印顺序可能会不同,但是不必在意,那只是打印的问题而已)。
新加入的元素为:a
新加入的元素为:b
新加入的元素为:c
新加入的元素为:d
新加入的元素为:e
当前容器的长度:5
移除的元素为:a
新加入的元素为:f
移除的元素为:b
新加入的元素为:g
下面我们再测试当队列中没有元素时的情况,我们只需要把main函数替换成如下代码即可。
public static void main(String[] args) {
//设置容器的容量为5
final MyQueue mq = new MyQueue(5);
//线程t1要从容器中移除元素
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
Object o1 = mq.take();
System.out.println("移除的元素为:"+o1);
Object o2 = mq.take();
System.out.println("移除的元素为:"+o2);
}
},"t1");
t1.start();
try {
//等待两秒钟,这句话跟Thread.sleep(2000);是同样的效果,之所以用TimeUnit是因为它给我们
//定义好了枚举值,我们可以指定毫秒、秒、分钟等等,可读性很强。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//线程t2要向队列中插入两个元素
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
mq.put("a");
mq.put("b");
}
},"t2");
//之所以要让线程t1执行2秒后t2才执行,就是为了看t1执行时会不会阻塞(因为队列中元素刚开始是0,而t1
//仍要从队列中移除元素)
t2.start();
} 运行main方法,会看到等了2秒钟才打印信息,说明刚开始线程t1去移除元素时是被阻塞了的,直到线程t2向队列中添加了两个元素才能给移除元素。这说明我们模拟的阻塞队列是没问题的。
新加入的元素为:a
新加入的元素为:b
移除的元素为:a
移除的元素为:b
相关文章推荐
- 学习互联网架构第七课(ThreadLocal的使用)
- 关于模式的学习和使用交流(设计模式、架构模式。。)
- 一个小型的网站,比如个人网站,可以使用最简单的html静态页面就实现了,配合一些图片达到美化效果,所有的页面均存放在一个目录下,这样的网站对系统架构、性能的要求都很简单,随着互联网业务的不断丰富,网站
- 分布式架构学习之持续集成:012--Hudson(Jekins)持续集成服务器的安装、配置和使用
- 分布式架构学习之:032--使用Redis3.0集群实现Tomcat集群的Session共享
- 分布式架构学习之:031--FastDFS 集群的安装、配置、使用
- ASP.NET基础知识(本文章来自于互联网,感谢原作者的辛勤劳动,摘抄在此仅作为学习使用!)
- 分布式架构学习之:使用Redis3.0集群实现Tomcat集群的Session共享
- 分布式架构学习之持续集成:009--SVN版本管理系统的安装和使用(CentOS+Subversion+Apache+Jsvnadmin)
- 分布式架构学习之:003--使用Dubbo进行规模服务化前的工程结构优化
- 分布式架构学习之:FastDFS分布式文件系统的Linux安装与使用(单节点)
- 一步步学习SPD2010--第六章节--处理数据源(1)--使用数据源
- angular学习笔记(二十八)-$http(6)-使用ngResource模块构建RESTful架构
- 互联网架构学习相关资料(转)
- 分布式架构学习之持续集成:011--SonarQube代码质量管理平台的安装、配置和使用
- 最好的那些新兴互联网公司所使用的技术架构
- 一步步学习SPD2010--第六章节--处理数据源(8)--使用链接源
- 【ios开发学习 - 第六课】UILabel使用
- 分布式架构学习之:016--Redis的安装与使用(单节点)
- 分布式架构学习之:FastDFS 集群的安装、配置、使用