您的位置:首页 > 其它

生产者消费者模型(使用lock.condition进行分组唤醒)

2017-05-24 23:19 441 查看
使用concurrent包完成生产者消费者模型,使用lock.condition进行分组唤醒,摒弃synchonized中的sinalAll(),代码如下:
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Myservice{
Queue<Integer> arrque = new ArrayBlockingQueue<Integer>(100);
Lock lock = new ReentrantLock();
Condition condition_pro = lock.newCondition();
Condition condition_con = lock.newCondition();

// 生产者方法
void produce(){
try {
lock.lock();
while(arrque.size()==100){// 因为使用了分组唤醒,用if while都可以
System.out.println("我是生产者"+Thread.currentThread().getName()+",我被阻塞了");
condition_pro.await();
}
int random = (int)(Math.random()*10000);
System.out.println(Thread.currentThread().getName()+"生产了一条消息"+random);
arrque.offer(random);
if(arrque.size()==1){
condition_con.signalAll();// 唤醒所有消费者
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}

}

// 消费者方法
void consume(){
try {
lock.lock();
while(arrque.isEmpty()){// 因为使用了分组唤醒,用if也都可以
System.out.println("*****我是消费者"+Thread.currentThread().getName()+",我被阻塞了");
condition_con.await();// 消费者自己阻塞自己
}
System.out.println("*****"+Thread.currentThread().getName()+"消费了队头元素"+arrque.peek());
arrque.poll();
if(arrque.size()==100-1){// 并不是每消费一条消息就唤醒一次生产者
condition_pro.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}

// 生产者线程
class produceThread implements Runnable {
Myservice ser;
public produceThread(Myservice ser) {
this.ser = ser;
}

public void run(){
try {
while(true){
Thread.sleep(1000);
ser.produce();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 消费者线程
class consumerThread implements Runnable {
Myservice ser;
public consumerThread(Myservice ser) {
this.ser = ser;
}

public void run(){
try {
while(true){
Thread.sleep(3000);
ser.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 客户端程序
public class Myservice_pro_con{
public static void main(String[] args){
Myservice ser = new Myservice();
produceThread producer = new produceThread(ser);
consumerThread consumer = new consumerThread(ser);

// 如果调用20次生产者方法而不是开启20个生产者线程,那生产者执行完后消费者才能执行,生产者不能和消费者并行
/*for(int i=0;i<20;i++){
ser.produce();
}*/

// 创建proNum个生产者线程
int proNum=20;
int conNum=20;
Thread[] proArr = new Thread[proNum];
for(int i=0;i<proNum;i++){
proArr[i]=new Thread(producer);
}
// 创建conNum个消费者线程
Thread[] conArr = new Thread[conNum];
for(int i=0;i<conNum;i++){
conArr[i]=new Thread(consumer);
}

for(int i=0;i<proNum;i++){
proArr[i].start();
}
for(int i=0;i<conNum;i++){
conArr[i].start();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息