您的位置:首页 > 编程语言 > Java开发

java多线程 ----生产者消费者模式

2011-04-26 16:20 447 查看
这是java开发时常用到的模式,可以使程序在设计时分开生产者和消费者之间的相互调用。

 

模式中需要建立三个对象:生产者  消费者  仓库

生产者:把生产出来的产品放到仓库总

消费者:从仓库中取出产品进行消费

仓库:采用同步方法,对仓库上锁同步,每次只能有一个对象对其进行操作。

 

 

类图如下:

 


 

 

仓库类代码:使用队列的方式实现

 

package consumer;

import java.util.concurrent.ArrayBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerAlarmQueue {
private static int MAX_SIZE = 1000;
private static 	Logger logger = LoggerFactory.getLogger(ServerAlarmQueue.class);
private static ArrayBlockingQueue<Object> alramQueue=null;
public ServerAlarmQueue(int capacity) {
super();
MAX_SIZE = capacity;
alramQueue = new ArrayBlockingQueue<Object>(capacity);
}

public ServerAlarmQueue() {
super();
alramQueue = new ArrayBlockingQueue<Object>(MAX_SIZE);
}

/**
* 从队列终获取一个对象
*
* @return
*/
public synchronized Object consume() {
Object object = null;
while (getQueueSize() == 0) {
try {
wait();
logger.debug("队列已空,消费等待...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object = alramQueue.poll();
System.out.println(Thread.currentThread().getName()+"消费产品:"+object.toString());
this.notifyAll();
return object;
}

/**
* 向对另种插入一个对象
*
* @param object
* @return
*/
public synchronized boolean produce(Object object) {
boolean flag = false;
while (getQueueSize() == MAX_SIZE) {
try {
wait();
logger.debug("队列已满,生产等待....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = alramQueue.offer(object);
System.out.println(Thread.currentThread().getName()+"生产出产品:"+object.toString());
this.notifyAll();
return flag;
}

public int getQueueSize() {
return alramQueue.size();
}

public void clear() {
alramQueue.clear();
}

}


 

消费者:

 

package consumer;

public class Consumer implements Runnable {
private ServerAlarmQueue serverAlarmQueue = null;
private boolean isStart = true;

public Consumer(ServerAlarmQueue serverAlarmQueue) {
super();
this.serverAlarmQueue = serverAlarmQueue;
}

@Override
public void run() {
while (isStart) {
getProducts();
try {

// 等待30毫秒
Thread.sleep(30);
} catch (Exception e) {
e.printStackTrace();
}
}
}

private void getProducts() {
Object obj = serverAlarmQueue.consume();
System.out.println(obj);
}

public void stop() {
isStart = false;
}
}


 

生产者:

 

package com.consume;

public class Producer implements Runnable{
private  ServerAlarmQueue serverAlarmQueue=null;

public Producer(ServerAlarmQueue serverAlarmQueue) {
super();
this.serverAlarmQueue = serverAlarmQueue;
}

@Override
public void run() {
char c;

for (int i = 0; i < 200; i++) {

// 随机生成大写字母字符
c = (char) (Math.random() * 26 + 'A');
produce(c);

try {

// 等待30毫秒
Thread.sleep(30);
} catch (Exception e) {
e.printStackTrace();
}
}

}

private void produce(Object obj){
serverAlarmQueue.produce(obj);
}
}


 

 

 

 

测试代码

 

 

package com.consume;

public class ThreadMain {

public static void main(String[] args) {
// 1.准备堆栈数据结构
ServerAlarmQueue stack = new ServerAlarmQueue();

// 2.准备生产者线程
Producer producer1 = new Producer(stack);
Thread t1 = new Thread(producer1);
t1.setName("1号生产者");

Producer producer2 = new Producer(stack);
Thread t2 = new Thread(producer2);
t2.setName("2号生产者");

// 3.准备消费者线程
Consumer consumer1 = new Consumer(stack);
Thread t3 = new Thread(consumer1);
t3.setName("1号消费者");

Consumer consumer2 = new Consumer(stack);
Thread t4 = new Thread(consumer2);
t4.setName("2号消费者");

t3.start();
t4.start();

t1.start();
t2.start();
}
}


 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息