您的位置:首页 > 编程语言 > Java开发

《Java高并发程序设计》学习 --5.3 并行模式之生产者-消费者模式

2017-03-20 19:33 465 查看
生产者-消费者是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。在生产者-消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲进行通信。 
生产者-消费者模式的核心组件是共享内存缓冲区,它作为生产者消费者间的通信桥梁,避免了两者直接通信,从而将生产者和消费者进行解耦。生产者不需要知道消费者存在,消费者也不需要知道生产者的存在。
同时,由于内存缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论谁快谁慢,都可以通过共享缓冲区得到缓解,确保系统稳定允许。
生产者-消费者模式主要角色如下表所示。

角色作用
生产者用于提交用户请求,提取用户任务,并装入内存缓冲区
消费者在内存缓冲区提取并处理任务
内存缓冲区缓存生产者提交的任务或数据,供消费者使用
任务生产者向内存缓冲区提交的数据结构
Main使用生产者和消费者的客户端
实现一个基于生产者-消费者模式的求整数平方的并行程序。
首先,生产者线程的实现如下,它构建PCData对象,并放入BlockingQueue队列中。
public class Producer implements Runnable{
private volatile boolean isRunning = true;
private BlockingDeque<PCData> queue; //内存缓冲区,通过构造时外部引入,保证和消费者用的是同样的内存缓冲区.
private static AtomicInteger count = new AtomicInteger(); //总数,原子操作.
private static final int SLEEPTIME = 1000;

public Producer(BlockingDeque<PCData> queue) {
this.queue = queue;
}

@Override
public void run() {
PCData data = null;
Random random = new Random();
System.out.println("start producter .."+Thread.currentThread().getId());
try {
while (isRunning){
Thread.sleep(random.nextInt(SLEEPTIME)); //模拟执行过程
data = new PCData(count.incrementAndGet()); //现获取当前值再+1
System.out.println(data + " is put into Queue");
//提交数据到缓冲队列中.设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败
if (!queue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("failed to put data "+data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
//因为BlockingQueue的offer操作上的锁是重入锁中的可以中断的锁,所以如果有异常,就中断,防止死锁.
Thread.currentThread().interrupt();
}
}
public void stop(){
isRunning = false;
}
}

对应的消费者线程的实现如下。它从BlockingQueue队列中取出PCData对象,并进行相应的计算。
public class Consumer implements Runnable {
private BlockingDeque<PCData> queue;
private static final int SLEEPTIME = 1000;
//同理,和Producter共用同一个BlockingQueue,保证存/取都在一个缓冲区
public Consumer(BlockingDeque<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id : "+Thread.currentThread().getId());
Random r = new Random();
try {
while (true){
PCData data = queue.take();
if (null != data){
int re = data.getIntData() * data.getIntData();
System.out.println(MessageFormat.format("{0} * {0} = {1}",data.getIntData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

PCData作为生产者和消费者之间的共享数据模型,定义入下:
public class PCData {
private final int intData;
public PCData(int d) {
intData = d;
}
public PCData(String d){
intData = Integer.parseInt(d);
}
public int getIntData() {
return intData;
}
@Override
public String toString() {
return "PCData{" +
"intData=" + intData +
'}';
}
}

在主函数中,创建三个生产者和消费者,并让它们协作运行。在主函数的实现中,定义LinkedBlockingQueue作为BlockingQueue的实现类。
public class Main {
public static void main(String[] a) throws InterruptedException {
//建立共享缓冲区
BlockingDeque<PCData> queue = new LinkedBlockingDeque<>(10);
//建立生产者
Producer producter1 = new Producer(queue);
Producer producter2 = new Producer(queue);
Producer producter3 = new Producer(queue);
Producer producter4 = new Producer(queue);
Producer producter5 = new Producer(queue);
//建立消费者
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
//建立线程池
ExecutorService es = Executors.newCachedThreadPool();
//运行生产者
es.execute(producter1);
es.execute(producter2);
es.execute(producter3);
es.execute(producter4);
es.execute(producter5);
//运行消费者
es.execute(consumer1);
es.execute(consumer2);
es.execute(consumer3);
//运行时间
Thread.sleep(1000 * 10);
//停止生产者
producter1.stop();
producter2.stop();
producter3.stop();
producter4.stop();
producter5.stop();
//停止生产者后,预留时间给消费者执行
Thread.sleep(1000 * 5);
System.out.println("关闭线程池...");
//关闭线程池
es.shutdown();
}
}


注:本篇博客内容摘自《Java高并发程序设计》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: