您的位置:首页 > 其它

并发基础_12_并发_容器_阻塞队列

2017-09-13 21:26 197 查看
阻塞队列接口

阻塞队列BlockingQueue接口是一个支持两个附加操作的队列。

这两个附加操作支持阻塞的插入阻塞的移除方法。

a. 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列有空余位置。

b. 支持阻塞的移除方法:队列为空时,获取元素的线程会等待队列中有元素。

阻塞队列常用于生产者与消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程

(JDK API)

BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:

第一种是抛出一个异常,

第二种是返回一个特殊值(null 或 false,具体取决于操作),

第三种是在操作可以成功前,无限期地阻塞当前线程,

第四种是在放弃前只在给定的最大时间限制内阻塞



BlockingQueue不支持null元素。

BlockingQueue可以限定容量。

Java中的阻塞队列

(JDK7)

ArrayBlockingQueue
由数组结构组成的有界阻塞队列
此队列按照FIFO原则对元素进行排序,默认情况下不保证线程公平的访问队列
LinkedBlockingQueue
由链表结构组成的有界阻塞队列
此队列默认和最大长度为Integter.MAX_VALUE,按照FIFO原则对元素进行排序
PriorityBlockingQueue
支持优先级排序的无界阻塞队列
此队列支持优先级的无界阻塞队列,默认情况下采用自然顺序升序排序
DelayQueue
使用优先级队列实现的无界队列
此队列支持延时获取元素的无界阻塞队列,队列元素必须实现Delayed接口。
在创建元素时,可以指定多久才能从队列中获取当前元素,只有在延迟期满时,才能从队列中获取元素。
SynchronusQueue
不存储元素的阻塞队列
此队列不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。
LinkedTransferQueue
由链表结构组成的无界阻塞队列
LinkedBlockingDeque
由链表结构组成的双向阻塞队列
双向队列指的是可以从队列两端插入和移除元素。
阻塞队列Demo就下面一个栗子,阻塞队列另一个使用场景是线程池中的使用。

LinkedBlockingQueue的Demo

LinkedBlockingQueue的实现是线程安全的,实现了FIFO(先进先出)等特性,是作为生产者、消费者的首选。

主要方法:

put:在队列满时会阻塞,直到有队列成员被消费

take:在队列空时阻塞,直到有队列成员进来。

/**
* 装苹果的篮子
*
* @author CYX
* @time 2017年7月31日上午8:52:25
*/
public class Basket {

// 篮子,能够容纳3个苹果
BlockingQueue<String> basket = new LinkedBlockingQueue<>(3);

/**
* 生产苹果,放入篮子
*
* @throws Exception
*/
public void produce() throws Exception {
basket.put("An apple");
}

/**
* 消费苹果,从篮子中拿走
*
* @return
* @throws Exception
*/
public String consume() throws Exception {
return basket.take();
}
}

/**
* 苹果生产者
*
* @author CYX
* @time 2017年7月31日上午9:08:42
*/
public class Producer implements Runnable {

private String instance;
private Basket basket;

public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}

@Override
public void run() {

try {

while (true) {
// 生产苹果
System.out.println("生产者准备生产苹果:" + instance);
basket.produce();
System.out.println("生产者生产苹果完毕:" + instance);
Thread.sleep(300);
}

} catch (Exception e) {
e.printStackTrace();
}

}

}

/**
* 苹果消费者
*
* @author CYX
* @time 2017年7月31日上午9:24:05
*/
public class Consumer implements Runnable {

private String instance;
private Basket basket;

public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}

@Override
public void run() {

try {

while (true) {
// 消费苹果
System.out.println("消费者准备消费苹果:" + instance);
System.out.println(basket.consume());
System.out.println("消费者消费苹果完毕:" + instance);
// 休眠1000ms
Thread.sleep(1000);
}

} catch (Exception e) {
e.printStackTrace();
}

}

}

/**
* 多线程模拟实现生产者/消费者模型
*
* @author CYX
* @time 2017年7月31日上午8:48:43
*/
public class BlockingQueueTest {

public static void main(String[] args) {

Basket basket = new Basket();

ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer("生产者01", basket);
Producer producer2 = new Producer("生产者02", basket);
Consumer consumer = new Consumer("消费者01", basket);

service.submit(producer);
service.submit(producer2);
service.submit(consumer);

}

}

输出结果:
生产者准备生产苹果:生产者01
生产者生产苹果完毕:生产者01
消费者准备消费苹果:消费者01
An apple
消费者消费苹果完毕:消费者01
生产者准备生产苹果:生产者02
生产者生产苹果完毕:生产者02
生产者准备生产苹果:生产者01
生产者生产苹果完毕:生产者01
生产者准备生产苹果:生产者02
生产者生产苹果完毕:生产者02
生产者准备生产苹果:生产者01
生产者准备生产苹果:生产者02
消费者准备消费苹果:消费者01
An apple
消费者消费苹果完毕:消费者01
生产者生产苹果完毕:生产者01
生产者准备生产苹果:生产者01
消费者准备消费苹果:消费者01
An apple
消费者消费苹果完毕:消费者01
生产者生产苹果完毕:生产者02
生产者准备生产苹果:生产者02
消费者准备消费苹果:消费者01
An apple
消费者消费苹果完毕:消费者01
生产者生产苹果完毕:生产者01
生产者准备生产苹果:生产者01
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: