您的位置:首页 > 编程语言 > Java开发

使用java Condition构建生产者消费者

2015-11-18 21:49 295 查看

1.Condition介绍

在上一篇文章中曾经提到对象的wait和notify方法,condition类其实就是这种方法的一种替代性产物。

Condition对象是同Lock结合使用,由Lock进行创建,并且condition.await的调用者必须获取lock,在await时该线程自动释放锁,当其被其他线程signal之后再重新获取锁,这同上一篇博客中提到的sychronized的用法类似。

2. 基于Condition的生产者消费者程序

(貌似ArrayBlockingQueue)这里使用notFull和notEmpty表示链表中元素有无的一个条件,具体代码如下:

package concurrency;

/**
* Created by wangxiaoyi on 15/11/18.
*/

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerAndConsumer2 {

private List<Integer> data = new LinkedList<>();
private static final int MAX_DATA_LEN = 10;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();// 非空条件
private Condition notFull = lock.newCondition();// 非满条件

class Producer implements Runnable {

private int pid = 0;

public Producer(int pid){
this.pid = pid;
}

public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {

lock.lock();

while (data.size() >= MAX_DATA_LEN) {
System.out.println("Producer" + pid + " waiting ! size : " + data.size());
notFull.await(); // 等待notFull.signal唤醒
}

try{
data.add(pid);
notEmpty.signal(); // 唤醒notEmpty.await()线程
System.out.println("Producer" + pid + " add " + pid + " size: " + data.size());

}finally {
lock.unlock();
}

Thread.sleep(500);
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}

class Consumer implements Runnable{
private int cid = 0;

public Consumer(int cid){
this.cid = cid;
}

public void run(){
try {
while (!Thread.currentThread().isInterrupted()) {

lock.lock();

while (data.isEmpty()) {
System.out.println("Consumer" + cid + " waiting, data size : " + data.size());
notEmpty.await();
}

try{
int pid = data.remove(0);
notFull.signal();
System.out.println("Consumer" + cid + " consuming data " + pid +" data size : " + data.size());

}finally {
lock.unlock();
}

Thread.sleep(500);
}
}catch (InterruptedException ie){
ie.printStackTrace();
}
}
}

public void start(){

ExecutorService executor = Executors.newCachedThreadPool();

for(int i = 0; i < 5; ++ i){
executor.submit(new Producer(i));
executor.submit(new Consumer(i));
}

try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

executor.shutdownNow();
}

public static void main(String []args){

new ProducerAndConsumer2().start();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: