您的位置:首页 > 编程语言 > Java开发

多线程设计模式——Producer-Consumer生产者消费者模式

2016-07-08 21:59 369 查看
这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。

模式名称

Producer-Consumer生产者消费者模式

模式面对的问题

有的线程的结果是另外一些线程的原料,也就是说,一些线程是生产者,另外一些线程是消费者,消费者需要生产者生产的东西才能正常运行,协调两者的关系成了一个大的问题。

解决思路

有一个中间的存储位置,用来存储生产者生产出来的东西,称之为通道。

Producer生产者

Product生产者所生产的任务

Channel通道的抽象

BlockingQueueChannel基于阻塞队列的Channel实现

Consumer消费者

Created with Raphaël 2.1.0ClientClientProducterProducterproductproductChannelChannel1service()2creat3put()45

例子代码

某内容管理系统需要支持对文档附件中的文件进行全文检索。改系统中,附件会被上传到专用的文件服务器上,对附件进行全文检索的功能模块也是部署在文件服务器上的。

模式主类

public class AttachmentProcessor {
private final String ATTACHMENT_STORE_BASE_DIR = "/home/viscent/tmp/attachments/";

//模式角色Producer-Consumer.Channer
priuvate final Channer<File> channer = new BlockingQueueChannel<File>(
new ArrayBlockingQueue<File>(200));
//模式角色Producer-Consumer.Consumer
private final AbstractTerminatableThread indexingThread = new
AbstractTerminatableThread(){
@Override
protected void doRun()throws Exception{
File file =null;

file = channel.take();
try{
indexFile(file);
}catch(Exception e){
e.printStackTrace();
}finally{
terminationToken.reservations.decrementAndGet();
}
}

//根据制定文件生成全文搜索所需的索引文件
private void indexFile(File file) throws Exception{
//省略与模式无关的代码

//模拟生成索引文件的时间消耗
Radom rnd =new Random();
try{
Thread.sleep(rnd.nextInt(100));
}catch(InterruptedException e){
;
}
}
};

public void init(){
indexingThread.start();
}

public void shutdown(){
indexingThread.terminate();
}

public void saveAttachment(InputStream in,
String documentId,String originalFileName)throws IOException{
File file = saveAsFile(in,documentId,originalFileName);
try{
channel.put(file);
}catch(InterruptedException e){
;
}
indexingTread.terminationToken.reservations.incrementAndGet();
}

private FTPClient initFTPClient(String ftpServer,String userName,
String password) throws Exception{
FTPClient ftpClient = new FTPClient();

FTOClientConfig config = new FTPClientConfig();
ftpClient.config(config);

int reply;
ftpClient.connect(ftpServer);

System.out.print(ftpClient.getReplyString());

reply = ftpClient.getReplyCode();

if(!dirName.equals(file.getCanoicalFile().getParent())){
throw new SecurityException("Invalid originalFileName:"+originalFileName);
}

BufferedOutputStream bos =null;
BufferedInputStream bis = new BufferedInputStream(in);
byte[]buf = new byte[2048];
int len = -1;
try{
bos =new BufferedOutputStream(new FileOutputSteram(file));
while((len = bis.read(buf) > 0)){
bos.write(buf,0,len);
}
bos.flush();
}finally{
try{
bis.close();
}catch(IOException e){
;
}
try{
if(null != bos){
bos.close();
}
}catch(IOException e){
;
}
}

ftpClient.setFileType(FTP.ASCII_FILE_TYPE);
return ftpClient;
}
}


Channel接口

public interface Channel<P> {
//从通道中取一个"产品"。
P take() throws InterruptedException;

//往通道里面存储一个"产品"。
void put(P product) throws InterruptedException;
}


BlockingQueueChannel类

public class BlockingQueueChannel<P> implements Channel<P> {
private final BlockingQueue<P>queue;
public BlockingQueueChannel(BlockingQueue<P>queue){
this.queue = queue;
}

@Override
public P take() throws InterruptedException{
return queue.take();
}

@Override
public void put(P product) throws InterruptedException{
queue.put(product);
}
}


模式的评价与实现考量

生产这消费者模式是一个经典的线程模式吗,但是它也有一些容易出现的问题:

1. 管道积压:生产者消费者模式中消费者的处理能力往往低于生产这的处理能力,会出现管道挤压的现象。处理这种现象,有集中方法:使用有界阻塞队列,队列到一定数量就不在生产,等待消费;使用有流量控制的无界阻塞队列,在线程的时间分配时对生产者的时间进行限制来平衡。

2. 工作窃取算法:如果是多个消费者从管道中取得产品,会出现线程安全的问题,所以会有一个通道实例对应多个队列实例来处理。

3. 线程的停止:整个模式也可以看做一个线程,这个线程的停止会比一般的线程要复杂一些,需要注意处理。

4. 高性能高可靠性:这里的示例代码是一个比较一般的实现,如果有较高的要求,可以考虑Producer-Consumer模式实现库LMAX Disruptor:https://github.com/LMAX-Exchange/disruptor
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息