您的位置:首页 > 编程语言 > Java开发

【Java基础】并发 - 多线程 - 阻塞队列

2014-05-04 06:44 726 查看

阻塞队列

阻塞队列与普通队列的区别在于:当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲下来。



线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素。

JDK中的阻塞队列

public interface BlockingQueue<E> extends Queue<E>  (java.util.concurrent)




ArrayBlockingQueue:由数组支持的有界阻塞队列,此队列按FIFO先进先出对元素排序;新元素查到队尾tail,retrieval操作返回对头head的元素;
LinkedBlockingQueue:based on linked nodes。optionally-bounded。
SynchronousQueue:同步队列;每个insert操作必须等待直到有另一个线程调用了remove操作,反之依然;
PriorityBlockingQueue:

阻塞队列是线程安全的

阻塞队列提供四种形式的方法:

不阻塞直接抛出异常;
返回特定值;
阻塞直到操作可继续执行;
阻塞给定的时间;

Throws exceptionSpecial valueBlocksTimes out
Insert
add(e)
offer(e)
put(e)
offer(e,
time, unit)
Remove
remove()
poll()
take()
poll(time,
unit)
Examine
element()
peek()
not applicablenot applicable

生产者-消费者

class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}

class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}

class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}


阻塞队列的一个简单实现

public class BlockingQueue {
private List queue = new LinkedList();
private int  limit = 10;

public BlockingQueue(int limit){
this.limit = limit;
}

public synchronized void enqueue(Object item)
throws InterruptedException  {
while(this.queue.size() == this.limit) {
wait();
}

if(this.queue.size() == 0) {
notifyAll();
}

this.queue.add(item);
}

public synchronized Object dequeue()
throws InterruptedException{
while(this.queue.size() == 0){
wait();
}

if(this.queue.size() == this.limit){
notifyAll();
}

return this.queue.remove(0);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐