多线程设计模式——Producer-Consumer生产者消费者模式
2016-07-08 21:59
369 查看
这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。
Producer生产者
Product生产者所生产的任务
Channel通道的抽象
BlockingQueueChannel基于阻塞队列的Channel实现
Consumer消费者
Created with Raphaël 2.1.0ClientClientProducterProducterproductproductChannelChannel1service()2creat3put()45
模式主类
Channel接口
BlockingQueueChannel类
1. 管道积压:生产者消费者模式中消费者的处理能力往往低于生产这的处理能力,会出现管道挤压的现象。处理这种现象,有集中方法:使用有界阻塞队列,队列到一定数量就不在生产,等待消费;使用有流量控制的无界阻塞队列,在线程的时间分配时对生产者的时间进行限制来平衡。
2. 工作窃取算法:如果是多个消费者从管道中取得产品,会出现线程安全的问题,所以会有一个通道实例对应多个队列实例来处理。
3. 线程的停止:整个模式也可以看做一个线程,这个线程的停止会比一般的线程要复杂一些,需要注意处理。
4. 高性能高可靠性:这里的示例代码是一个比较一般的实现,如果有较高的要求,可以考虑Producer-Consumer模式实现库LMAX Disruptor:https://github.com/LMAX-Exchange/disruptor
模式名称
Producer-Consumer生产者消费者模式模式面对的问题
有的线程的结果是另外一些线程的原料,也就是说,一些线程是生产者,另外一些线程是消费者,消费者需要生产者生产的东西才能正常运行,协调两者的关系成了一个大的问题。解决思路
有一个中间的存储位置,用来存储生产者生产出来的东西,称之为通道。Producer生产者
Product生产者所生产的任务
Channel通道的抽象
BlockingQueueChannel基于阻塞队列的Channel实现
Consumer消费者
Created with Raphaël 2.1.0ClientClientProducterProducterproductproductChannelChannel1service()2creat3put()45
例子代码
某内容管理系统需要支持对文档附件中的文件进行全文检索。改系统中,附件会被上传到专用的文件服务器上,对附件进行全文检索的功能模块也是部署在文件服务器上的。模式主类
public class AttachmentProcessor { private final String ATTACHMENT_STORE_BASE_DIR = "/home/viscent/tmp/attachments/"; //模式角色Producer-Consumer.Channer priuvate final Channer<File> channer = new BlockingQueueChannel<File>( new ArrayBlockingQueue<File>(200)); //模式角色Producer-Consumer.Consumer private final AbstractTerminatableThread indexingThread = new AbstractTerminatableThread(){ @Override protected void doRun()throws Exception{ File file =null; file = channel.take(); try{ indexFile(file); }catch(Exception e){ e.printStackTrace(); }finally{ terminationToken.reservations.decrementAndGet(); } } //根据制定文件生成全文搜索所需的索引文件 private void indexFile(File file) throws Exception{ //省略与模式无关的代码 //模拟生成索引文件的时间消耗 Radom rnd =new Random(); try{ Thread.sleep(rnd.nextInt(100)); }catch(InterruptedException e){ ; } } }; public void init(){ indexingThread.start(); } public void shutdown(){ indexingThread.terminate(); } public void saveAttachment(InputStream in, String documentId,String originalFileName)throws IOException{ File file = saveAsFile(in,documentId,originalFileName); try{ channel.put(file); }catch(InterruptedException e){ ; } indexingTread.terminationToken.reservations.incrementAndGet(); } private FTPClient initFTPClient(String ftpServer,String userName, String password) throws Exception{ FTPClient ftpClient = new FTPClient(); FTOClientConfig config = new FTPClientConfig(); ftpClient.config(config); int reply; ftpClient.connect(ftpServer); System.out.print(ftpClient.getReplyString()); reply = ftpClient.getReplyCode(); if(!dirName.equals(file.getCanoicalFile().getParent())){ throw new SecurityException("Invalid originalFileName:"+originalFileName); } BufferedOutputStream bos =null; BufferedInputStream bis = new BufferedInputStream(in); byte[]buf = new byte[2048]; int len = -1; try{ bos =new BufferedOutputStream(new FileOutputSteram(file)); while((len = bis.read(buf) > 0)){ bos.write(buf,0,len); } bos.flush(); }finally{ try{ bis.close(); }catch(IOException e){ ; } try{ if(null != bos){ bos.close(); } }catch(IOException e){ ; } } ftpClient.setFileType(FTP.ASCII_FILE_TYPE); return ftpClient; } }
Channel接口
public interface Channel<P> { //从通道中取一个"产品"。 P take() throws InterruptedException; //往通道里面存储一个"产品"。 void put(P product) throws InterruptedException; }
BlockingQueueChannel类
public class BlockingQueueChannel<P> implements Channel<P> { private final BlockingQueue<P>queue; public BlockingQueueChannel(BlockingQueue<P>queue){ this.queue = queue; } @Override public P take() throws InterruptedException{ return queue.take(); } @Override public void put(P product) throws InterruptedException{ queue.put(product); } }
模式的评价与实现考量
生产这消费者模式是一个经典的线程模式吗,但是它也有一些容易出现的问题:1. 管道积压:生产者消费者模式中消费者的处理能力往往低于生产这的处理能力,会出现管道挤压的现象。处理这种现象,有集中方法:使用有界阻塞队列,队列到一定数量就不在生产,等待消费;使用有流量控制的无界阻塞队列,在线程的时间分配时对生产者的时间进行限制来平衡。
2. 工作窃取算法:如果是多个消费者从管道中取得产品,会出现线程安全的问题,所以会有一个通道实例对应多个队列实例来处理。
3. 线程的停止:整个模式也可以看做一个线程,这个线程的停止会比一般的线程要复杂一些,需要注意处理。
4. 高性能高可靠性:这里的示例代码是一个比较一般的实现,如果有较高的要求,可以考虑Producer-Consumer模式实现库LMAX Disruptor:https://github.com/LMAX-Exchange/disruptor
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- Python3写爬虫(四)多线程实现数据爬取
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序