您的位置:首页 > 编程语言 > Java开发

Java并发编程 生产者消费者模式

2012-07-20 12:09 477 查看
生产者消费者模式是并发编程的经典模式,大概原理就是生产者和消费者共享数据区,不断的从数据区中put和take类对象,具体原理在此不再介绍,下面为类图和源码:

类图如下

生产者和消费者类依赖于Stack,详见源码及注释:

Stack 共享数据源码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Logger;

public class ServerAlarmQueue {
private static int MAX_SIZE = 1000;
private static final Logger logger = Logger.getLogger(ServerAlarmQueue.class.getSimpleName());
private static ArrayBlockingQueue<Object> alramQueue=null;
public ServerAlarmQueue(int capacity) {
super();
MAX_SIZE = capacity;
alramQueue = new ArrayBlockingQueue<Object>(capacity);
}

public ServerAlarmQueue() {
super();
alramQueue = new ArrayBlockingQueue<Object>(MAX_SIZE);
}

/**
* 从队列终获取一个对象
*
* @return
*/
public synchronized Object consume() {
Object object = null;
while (alramQueue.isEmpty()){
try {
wait();
logger.warning("队列已空,消费等待...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object = alramQueue.poll();
System.out.println(Thread.currentThread().getName()+"消费产品:"+object.toString());
this.notifyAll();
return object;
}

/**
* 向对另种插入一个对象
*
* @param object
* @return
*/
public synchronized boolean produce(Object object) {
boolean flag = false;
while (alramQueue.size()== MAX_SIZE) {
try {
wait();
logger.warning(Thread.currentThread().getName()+" 队列已满,生产等待....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = alramQueue.offer(object);
System.out.println(Thread.currentThread().getName()+"生产出产品:"+object.toString());
this.notifyAll();
return flag;
}

/**
* 从队列终获取一个对象,这里用到的阻塞队列的put,take 方法;
* 若条件不满足会自动阻塞,但最后需要notifyAll();不然会出现线程饿死
* 这种方法虽然简单,但不如上面的方法记录日志方便
* @return
*/
public synchronized Object consumeSimple(){
Object obj=null;
try {
obj = alramQueue.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"消费产品:"+obj.toString());
this.notifyAll();
return obj;
}
public synchronized void produceSimple(Object obj){
try {
alramQueue.put(obj);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"生产出产品:"+obj.toString());
this.notifyAll(); //这个不要忘了,不然消费者会饿死的
}

public void clear() {
alramQueue.clear();
}

}


消费者源码:

public class Consumer implements Runnable{
private ServerAlarmQueue serverAlarmQueue=null;
private boolean isStart=true;

public Consumer(ServerAlarmQueue serverAlarmQueue){
super();
this.serverAlarmQueue=serverAlarmQueue;
}

@Override
public void run() {
// TODO Auto-generated method stub
while (isStart){
getProducts();

try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

public void getProducts(){
serverAlarmQueue.consume();
//System.out.println(obj);
}

}


生产者源码:

public class Producer implements Runnable{
private ServerAlarmQueue serverAlarmQueue=null;

public Producer(ServerAlarmQueue serverAlarmQueue){
super();
this.serverAlarmQueue=serverAlarmQueue;
}

@Override
public void run() {
// TODO Auto-generated method stub
int val=0;
for (int i=0; i<20; ++i){
val=(int)(Math.random()*100);
produce(val);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" produce done!");
}

public void produce(Object obj){
serverAlarmQueue.produce(obj);
}

}


Client 源码:

import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

public static void main(String[] args) {
// TODO Auto-generated method stub
int val = 0;
System.out.println("please test method :");
System.out.println("1: testThread            2: testByExecutorService");
Scanner scanner = new Scanner(System.in);
val = scanner.nextInt();
switch (val) {
case 1:
testThread();
break;
case 2:
testByExecutorService();
break;
default:
System.out.println("input the error number");
}
System.out.println("The Main Thread Done !");
}

public static void testThread() {
// 1.准备堆栈数据结构
ServerAlarmQueue stack = new ServerAlarmQueue();

// 2.准备生产者线程
Producer producer1 = new Producer(stack);
Thread t1 = new Thread(producer1);
t1.setName("1号生产者");

Producer producer2 = new Producer(stack);
Thread t2 = new Thread(producer2);
t2.setName("2号生产者");

// 3.准备消费者线程
Consumer consumer1 = new Consumer(stack);
Thread t3 = new Thread(consumer1);
t3.setName("1号消费者");

Consumer consumer2 = new Consumer(stack);
Thread t4 = new Thread(consumer2);
t4.setName("2号消费者");

t3.start();
t4.start();

t1.start();
t2.start();
}

public static void testByExecutorService() {
// 1.准备堆栈数据结构
ServerAlarmQueue stack = new ServerAlarmQueue();

// 2.准备生产者线程
Producer producer1 = new Producer(stack);
Thread t1 = new Thread(producer1);
t1.setName("1号生产者");

Producer producer2 = new Producer(stack);
Thread t2 = new Thread(producer2);
t2.setName("2号生产者");
// 3.准备消费者线程
Consumer consumer1 = new Consumer(stack);
Thread t3 = new Thread(consumer1);
t3.setName("1号消费者");

Consumer consumer2 = new Consumer(stack);
Thread t4 = new Thread(consumer2);
t4.setName("2号消费者");

ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.submit(t1);
executorService.submit(t2);
executorService.submit(t3);
executorService.submit(t4);
        //虽然submit的是线程,但是线程名 不一样啦~~ ^_^
        //ExecutorService只是仅仅调用了Runnable接口的run方法
executorService.shutdown();
}
}


Client源码写的有点啰嗦了,呵呵,读者运行一下,看看什么不同(Thread Name),下面在说明自己的一点理解,有不同意见者,欢迎拍砖。

在编写并发编程时,尽量给线程定义一个名字,这样在debug时容易找错,但是jvm会给线程一个默认的Thread Name,所用的方法Thread.setName()也不过是Thread类对Runnable接口实现后的一种“扩展”,但能否给ExecutorService中submit的Runnable自定义个Name呢?是不能还是自己不懂?求解~~

注参考博文:http://blog.csdn.net/caijian521999/article/details/6364595
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息